我一直在尝试使用 polyglot 并构建一个简单的 python 处理器。我遵循了多语言配方,但无法部署流。我最初部署了示例中使用的相同处理器,但出现以下错误:
请求的未知命令行参数:spring.cloud.stream.bindings.input.destination 请求的未知环境变量:SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
Traceback (most recent call last):
File "/processor/python_processor.py", line 10, in
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 353, in init
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 203, in init
self.cluster = ClusterMetadata(**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 67, in init
self._bootstrap_brokers = self._generate_bootstrap_brokers()
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 71, in _generate_bootstrap_brokers
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1336, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1289, in get_ip_port_afi
host_and_port_str = host_and_port_str.strip()
AttributeError: 'NoneType' object has no attribute 'strip'
Exception AttributeError: "'KafkaClient' object has no attribute '_closed'" in <bound method KafkaClient.del of <kafka.client_async.KafkaClient object at 0x7f8b7024cf10>> ignored
然后我尝试通过部署流传递环境和绑定参数,但这不起作用。当我手动将 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS 和 spring.cloud.stream.bindings.input.destination 参数插入 Kafka 的使用者时,我能够将流部署为一种解决方法。我不完全确定是什么导致了这个问题,在 Kubernetes 上部署它会不会有什么不同,或者这是 Polyglot 和 Dataflow 的问题?对此的任何帮助将不胜感激。
重现步骤:尝试从本地数据流服务器上的多语言配方部署多语言处理器流。我也使用与示例中相同的流定义:http --server.port=32123 | python-处理器--reversestring=true | 日志。
附加上下文:我正在尝试在 SPDF 和 Kafka 的本地安装上部署流,因为我在使用 Docker 部署自定义 python 应用程序时遇到了一些问题。