我有下面的代码,用于使用python beam sdk连接到 kafka 。我知道ReadFromKafka
转换是在 java sdk 线束(docker 容器)中运行的,但我无法弄清楚如何在 sdk 线束的 docker 环境中进行制作ssl.truststore.location
和访问。ssl.keystore.location
论据job_endpoint
指向java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081
pipeline_args.extend([
'--job_name=paul_test',
'--runner=PortableRunner',
'--sdk_location=container',
'--job_endpoint=localhost:8099',
'--streaming',
"--environment_type=DOCKER",
f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
kafka = pipeline | ReadFromKafka(
consumer_config={
"bootstrap.servers": "bootstrap-server:17032",
"security.protocol": "SSL",
"ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness
"ssl.truststore.password": "password",
"ssl.keystore.type": "PKCS12",
"ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness
"ssl.keystore.password": "password",
"group.id": "group",
"basic.auth.credentials.source": "USER_INFO",
"schema.registry.basic.auth.user.info": "user:password"
},
topics=["topic"],
max_num_records=2,
# expansion_service="localhost:56938"
)
kafka | beam.Map(lambda x: print(x))
我尝试将图像覆盖选项指定为--sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'
-beam_java_sdk:latest
我基于apache/beam_java11_sdk:2.27.0
的 docker 图像在哪里,它在其 entrypoint.sh 中提取凭据。但是 Beam 似乎没有使用它,我明白了
INFO org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1
在日志中。很快不可避免地紧随其后
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12
总之,我的问题是,在 Apache Beam 中,是否可以在 python 梁 sdk 的 java sdk 工具 docker 容器中提供文件?如果是这样,它会如何完成?
非常感谢。