1

我一直在尝试使用 polyglot 并构建一个简单的 python 处理器。我遵循了多语言配方,但无法部署流。我最初部署了示例中使用的相同处理器,但出现以下错误:

请求的未知命令行参数:spring.cloud.stream.bindings.input.destination 请求的未知环境变量:SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS

Traceback (most recent call last):
File "/processor/python_processor.py", line 10, in
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 353, in init
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 203, in init
self.cluster = ClusterMetadata(**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 67, in init
self._bootstrap_brokers = self._generate_bootstrap_brokers()
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 71, in _generate_bootstrap_brokers
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1336, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1289, in get_ip_port_afi
host_and_port_str = host_and_port_str.strip()
AttributeError: 'NoneType' object has no attribute 'strip'
Exception AttributeError: "'KafkaClient' object has no attribute '_closed'" in <bound method KafkaClient.del of <kafka.client_async.KafkaClient object at 0x7f8b7024cf10>> ignored

然后我尝试通过部署流传递环境和绑定参数,但这不起作用。当我手动将 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS 和 spring.cloud.stream.bindings.input.destination 参数插入 Kafka 的使用者时,我能够将流部署为一种解决方法。我不完全确定是什么导致了这个问题,在 Kubernetes 上部署它会不会有什么不同,或者这是 Polyglot 和 Dataflow 的问题?对此的任何帮助将不胜感激。

重现步骤:尝试从本地数据流服务器上的多语言配方部署多语言处理器流。我也使用与示例中相同的流定义:http --server.port=32123 | python-处理器--reversestring=true | 日志。

附加上下文:我正在尝试在 SPDF 和 Kafka 的本地安装上部署流,因为我在使用 Docker 部署自定义 python 应用程序时遇到了一些问题。

4

1 回答 1

0

您在上面发布的配方需要SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS环境变量作为服务器配置的一部分存在(由于流是通过 Skipper 服务器管理的,您需要在 Skipper 服务器配置中设置此环境变量)。

您可以查看文档,了解如何SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS在 Skipper 服务器部署中设置为环境属性。

python-processor您还可以在部署流应用程序时将此属性作为部署者属性传递。您可以参考文档,了解如何在流部署时传递部署属性来设置 Spring Cloud Stream 属性(此处为 binder 配置属性)。

于 2020-07-17T03:56:21.637 回答