我目前正在尝试将消息读取为类似于以下示例KafkaIO 的ByteDeserializer。我的测试设置如下:
选项 1:配置为使用 --runner=PortableRunner
方案二:启动本地的 flink 作业服务器,
docker run --net=host apache/beam_flink1.10_job_server:latest
发布测试 kafka avro 消息
管道参数定义为,
pipeline_args = ['--runner', 'FlinkRunner',
'--job_endpoint', 'localhost:8099',
'--environment_type', 'LOOPBACK',
'--flink_version', '1.10',
'--flink_master', 'localhost:8081']
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,streaming=True)
管道设置,
_ = (pipeline | ReadFromKafka(
consumer_config= {'bootstrap.servers':'localhost:9092'},
topic = ['beam-test-topic'])
| beam.Flatmap(lambda kv: log_topic_contents(kv[1])))
当我执行管道时,使用默认的扩展服务 SDK 映像(apache/beam_python3.7_sdk:2.29.0)并将作业提交到 flink 作业服务器。flink 作业服务器失败,并显示消息“无法提交 JobGraph”和“Rest 端点关闭”。
我会错过管道的任何运行时配置吗?