希望你们一切都好。
我正在尝试部署一个将从 Kafka 摄取数据的 Dataflow 管道。为此,我正在使用 lib confluent-kafka,但出现以下错误。
NameError:名称“confluent_kafka”未定义
我已经用 apt 安装 librdkafka-dev 并在 setup.py 上安装 confluent-kafka 但错误仍然存在。
这是我的 setup.py 文件:
from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build
import setuptools
# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.
REQUIRED_PACKAGES=[
#required packages
'crcmod>=1.7,<2.0',
'dill>=0.3.1.1,<0.3.2',
'fastavro==1.3.0',
'future>=0.18.2,<1.0.0',
'grpcio>=1.29.0,<2',
'hdfs>=2.1.0,<3.0.0',
'httplib2>=0.8,<0.18.0',
'mock>=1.0.1,<3.0.0',
'numpy>=1.14.3,<2',
'pymongo>=3.8.0,<4.0.0',
'oauth2client>=2.0.1,<5',
'protobuf>=3.12.2,<4',
'pyarrow>=0.15.1,<3.0.0',
'pydot>=1.2.0,<2',
'python-dateutil>=2.8.0,<3',
'pytz>=2018.3',
'requests>=2.24.0,<3.0.0',
'typing-extensions>=3.7.0,<3.8.0',
'SQLAlchemy==1.4.7',
'SQLAlchemy-JSONField==1.0.0',
'SQLAlchemy-Utils==0.36.8',
#beam package
'apache-beam==2.27.0',
'apache-beam[gcp]==2.27.0',
#avro
'apache-beam[avro]',
#beam test packages
'freezegun>=0.3.12',
'nose>=1.3.7',
'nose_xunitmp>=0.4.1',
# TODO(BEAM-11531): Address test breakages in pandas 1.2
# 'pandas>=1.0,<2',
'pandas>=1.0,<1.2.0',
'parameterized>=0.7.1,<0.8.0',
'pyhamcrest>=1.9,!=1.10.0,<2.0.0',
'pyyaml>=3.12,<6.0.0',
'requests_mock>=1.7,<2.0',
'tenacity>=5.0.2,<6.0',
'pytest>=4.4.0,<5.0',
'pytest-xdist>=1.29.0,<2',
'pytest-timeout>=1.3.3,<2',
'sqlalchemy>=1.3,<2.0',
'psycopg2-binary>=2.8.5,<3.0.0',
'testcontainers>=3.0.3,<4.0.0',
#gcp required packages
'cachetools>=3.1.0,<5',
'google-apitools>=0.5.31,<0.5.32',
'google-auth>=1.18.0,<2',
'google-cloud-datastore>=1.7.1,<2',
'google-cloud-pubsub>=0.39.0,<2',
# GCP packages required by tests
'google-cloud-bigquery>=1.6.0,<2',
'google-cloud-core>=0.28.1,<2',
'google-cloud-bigtable>=0.31.1,<2',
'google-cloud-spanner>=1.13.0,<2',
'grpcio-gcp>=0.2.2,<1',
# GCP packages required by prebuild sdk container functionality.
'google-cloud-build>=2.0.0,<3',
'google-cloud-error-reporting==0.30.0',
#kafka related
'confluent-kafka==1.6.0',
'confluent-kafka[avro]',
'kafka==1.3.5',
'kafka-python==2.0.2',
'python-schema-registry-client==1.8.1',
#protobuf and related
'proto-plus==1.18.1',
'protobuf==3.15.8',
#workflow for dataflow
'workflow'
]
CUSTOM_COMMANDS = [
['sudo', 'apt-get', 'update'],
['sudo', 'apt-get', 'install', '-y', 'librdkafka-dev', 'python-dev']
]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
PACKAGE_NAME = 'dependency_package'
PACKAGE_VERSION = '0.0.1'
setuptools.setup(
name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description='blockEvents Kafka Consumer project packages',
install_requires=REQUIRED_PACKAGES,
custom_commands=CUSTOM_COMMANDS,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
})
关于如何解决这个问题的任何想法?