我正在尝试设置一个 Apache Beam Java 管道:
- 从 Kafka 读取消息
- 调用外部 Python 转换
- 将输出写入 Kafka
在此之前,我尝试了没有 Kafka 的简单管道:例如,使用“创建”在 Java 中生成一些测试值,然后将它们传递给虚拟 Python 转换。到目前为止有效。
这是管道代码的摘录:
public static void main(String[] args) {
// ...
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers(servers)
.withTopic(readTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply(new CrossLanguageTransform(options.getExpansionServiceURL()))
.apply(KafkaIO.<String, String>write()
.withBootstrapServers(servers)
.withTopic(writeTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
);
pipeline.run().waitUntilFinish();
}
这是外部转换包装器:
public class CrossLanguageTransform extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
private static final String URN = "beam:transforms:xlang:pythonwordcount";
private static String expansionAddress;
public CrossLanguageTransform(String expansionAddress) {
this.expansionAddress = expansionAddress;
}
@Override
public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> pcoll) {
return pcoll.apply(
"PythonWordCount",
External.of(URN, new byte [] {}, expansionAddress)
);
}
}
这是我的 Python 转换的摘录:
URN = 'beam:transforms:xlang:pythonwordcount'
@PTransform.register_urn(URN, None)
class PythonWordCount(PTransform):
def expand(self, pcoll: PCollection[Tuple[str, str]]) -> PCollection[Tuple[str, str]]:
return (
pcoll |
'GetValues' >> Values() |
'Split' >> FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str) |
'PairWithOne' >> Map(lambda x: (x, 1)) |
'GruoupAndSumWindow' >> WindowInto(FixedWindows(10)) |
'GroupAndSum' >> CombinePerKey(sum) |
'Format' >> Map(format_result)
)
def to_runner_api_parameter(self, unused_context):
return URN, None
@staticmethod
def from_runner_api_parameter(unused_ptransform, unused_paramter, unused_context):
return PythonWordCount()
我这样设置 Python 的扩展服务:
./expansion_service.py -p 9097
Flink 的作业服务器:
# From Apache Beam's repository
./gradlew :runners:flink:1.13:job-server:runShadow
当我像这样运行 Java 管道时:
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.CrossLanguagePipeline -Pportable-runner -Dexec.args="--runner=PortableRunner --jobEndpoint=localhost:8099 --useExternal=true --expansionServiceURL=localhost:9097 --experiments=beam_fn_api"
我得到以下堆栈跟踪:
java.lang.RuntimeException: expansion service error: Traceback (most recent call last):
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py", line 58, in Expand
producers = {
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py", line 59, in <dictcomp>
pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1350, in from_runner_api
result.outputs = {
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1351, in <dictcomp>
None if tag == 'None' else tag: context.pcollections.get_by_id(id)
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pvalue.py", line 207, in from_runner_api
element_type=context.element_type_from_coder_id(proto.coder_id),
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 267, in element_type_from_coder_id
self.coders[coder_id].to_type_hint())
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 163, in __getitem__
return self.get_by_id(id)
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py", line 364, in from_runner_api
[context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py", line 364, in <listcomp>
[context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py", line 361, in from_runner_api
parameter_type, constructor = cls._known_urns[coder_proto.spec.urn]
KeyError: 'beam:coders:javasdk:0.1'
at org.apache.beam.runners.core.construction.External$ExpandableTransform.expand (External.java:222)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:499)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:376)
at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:19)
at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:8)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:482)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:363)
at org.apache.beam.examples.CrossLanguagePipeline.main (CrossLanguagePipeline.java:49)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
我不知道为什么这不起作用,而其他示例则起作用。
类型似乎在KafkaIO.read
和PythonWordCount
expand
方法之间匹配。