0

希望你们一切都好。

我正在尝试部署一个将从 Kafka 摄取数据的 Dataflow 管道。为此,我正在使用 lib confluent-kafka,但出现以下错误。

NameError:名称“confluent_kafka”未定义

我已经用 apt 安装 librdkafka-dev 并在 setup.py 上安装 confluent-kafka 但错误仍然存​​在。

这是我的 setup.py 文件:

from __future__ import absolute_import
from __future__ import print_function

import subprocess

from distutils.command.build import build

import setuptools
# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.
REQUIRED_PACKAGES=[
    #required packages 
    
    'crcmod>=1.7,<2.0',
    'dill>=0.3.1.1,<0.3.2',
    'fastavro==1.3.0',
    'future>=0.18.2,<1.0.0',
    'grpcio>=1.29.0,<2',
    'hdfs>=2.1.0,<3.0.0',
    'httplib2>=0.8,<0.18.0',
    'mock>=1.0.1,<3.0.0',
    'numpy>=1.14.3,<2',
    'pymongo>=3.8.0,<4.0.0',
    'oauth2client>=2.0.1,<5',
    'protobuf>=3.12.2,<4',
    'pyarrow>=0.15.1,<3.0.0',
    'pydot>=1.2.0,<2',
    'python-dateutil>=2.8.0,<3',
    'pytz>=2018.3',
    'requests>=2.24.0,<3.0.0',
    'typing-extensions>=3.7.0,<3.8.0',
    'SQLAlchemy==1.4.7',
    'SQLAlchemy-JSONField==1.0.0',
    'SQLAlchemy-Utils==0.36.8',
    #beam package
    'apache-beam==2.27.0',
    'apache-beam[gcp]==2.27.0',
    #avro
    'apache-beam[avro]', 
    #beam test packages
    'freezegun>=0.3.12',
    'nose>=1.3.7',
    'nose_xunitmp>=0.4.1',
    # TODO(BEAM-11531): Address test breakages in pandas 1.2
    # 'pandas>=1.0,<2',
    'pandas>=1.0,<1.2.0',
    'parameterized>=0.7.1,<0.8.0',
    'pyhamcrest>=1.9,!=1.10.0,<2.0.0',
    'pyyaml>=3.12,<6.0.0',
    'requests_mock>=1.7,<2.0',
    'tenacity>=5.0.2,<6.0',
    'pytest>=4.4.0,<5.0',
    'pytest-xdist>=1.29.0,<2',
    'pytest-timeout>=1.3.3,<2',
    'sqlalchemy>=1.3,<2.0',
    'psycopg2-binary>=2.8.5,<3.0.0',
    'testcontainers>=3.0.3,<4.0.0',
    #gcp required packages
    'cachetools>=3.1.0,<5',
    'google-apitools>=0.5.31,<0.5.32',
    'google-auth>=1.18.0,<2',
    'google-cloud-datastore>=1.7.1,<2',
    'google-cloud-pubsub>=0.39.0,<2',
    # GCP packages required by tests
    'google-cloud-bigquery>=1.6.0,<2',
    'google-cloud-core>=0.28.1,<2',
    'google-cloud-bigtable>=0.31.1,<2',
    'google-cloud-spanner>=1.13.0,<2',
    'grpcio-gcp>=0.2.2,<1',
    # GCP packages required by prebuild sdk container functionality.
    'google-cloud-build>=2.0.0,<3',
    'google-cloud-error-reporting==0.30.0',
    #kafka related
    'confluent-kafka==1.6.0',
    'confluent-kafka[avro]',
    'kafka==1.3.5',
    'kafka-python==2.0.2',
    'python-schema-registry-client==1.8.1',
    #protobuf and related
    'proto-plus==1.18.1',
    'protobuf==3.15.8',
    #workflow for dataflow
    'workflow'
]

CUSTOM_COMMANDS = [
    ['sudo', 'apt-get', 'update'],
    ['sudo', 'apt-get', 'install', '-y', 'librdkafka-dev', 'python-dev']
]

class CustomCommands(setuptools.Command):
    """A setuptools Command class able to run arbitrary commands."""

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        p = subprocess.Popen(
            command_list,
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        # Can use communicate(input='y\n'.encode()) if the command run requires
        # some confirmation.
        stdout_data, _ = p.communicate()

        if p.returncode != 0:
            raise RuntimeError(
                'Command %s failed: exit code: %s' % (command_list, p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)

PACKAGE_NAME = 'dependency_package'
PACKAGE_VERSION = '0.0.1'

setuptools.setup(
    name=PACKAGE_NAME,
    version=PACKAGE_VERSION,
    description='blockEvents Kafka Consumer project packages',
    install_requires=REQUIRED_PACKAGES,
    custom_commands=CUSTOM_COMMANDS,
    packages=setuptools.find_packages(),
    cmdclass={
        'build': build,
        'CustomCommands': CustomCommands,
    })

关于如何解决这个问题的任何想法?

4

0 回答 0