0

我目前正在尝试将消息读取为类似于以下示例KafkaIO 的ByteDeserializer。我的测试设置如下:

选项 1:配置为使用 --runner=PortableRunner

方案二:启动本地的 flink 作业服务器,

docker run --net=host apache/beam_flink1.10_job_server:latest

发布测试 kafka avro 消息

管道参数定义为,

pipeline_args = ['--runner', 'FlinkRunner',
                 '--job_endpoint', 'localhost:8099',
                 '--environment_type', 'LOOPBACK',
                 '--flink_version', '1.10',
                 '--flink_master', 'localhost:8081']
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,streaming=True)   

管道设置,

_ = (pipeline | ReadFromKafka(
                consumer_config= {'bootstrap.servers':'localhost:9092'},
                topic = ['beam-test-topic'])
              | beam.Flatmap(lambda kv: log_topic_contents(kv[1])))

当我执行管道时,使用默认的扩展服务 SDK 映像(apache/beam_python3.7_sdk:2.29.0)并将作业提交到 flink 作业服务器。flink 作业服务器失败,并显示消息“无法提交 JobGraph”和“Rest 端点关闭”。

我会错过管道的任何运行时配置吗?

4

1 回答 1

0

“LOOPBACK”环境类型当前不支持跨语言转换。你能用'DOCKER'类型重试吗?

于 2021-07-14T23:20:58.350 回答