0

我正在尝试设置一个 Apache Beam Java 管道:

  • 从 Kafka 读取消息
  • 调用外部 Python 转换
  • 将输出写入 Kafka

在此之前,我尝试了没有 Kafka 的简单管道:例如,使用“创建”在 Java 中生成一些测试值,然后将它们传递给虚拟 Python 转换。到目前为止有效。

这是管道代码的摘录:

    public static void main(String[] args) {
        // ...
        pipeline
            .apply(KafkaIO.<String, String>read()
                .withBootstrapServers(servers)
                .withTopic(readTopic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
            )
            .apply(new CrossLanguageTransform(options.getExpansionServiceURL()))
            .apply(KafkaIO.<String, String>write()
                .withBootstrapServers(servers)
                .withTopic(writeTopic)
                .withKeySerializer(StringSerializer.class)
                .withValueSerializer(StringSerializer.class)
            );

        pipeline.run().waitUntilFinish();
    }

这是外部转换包装器:

public class CrossLanguageTransform extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
    private static final String URN = "beam:transforms:xlang:pythonwordcount";

    private static String expansionAddress;

    public CrossLanguageTransform(String expansionAddress) {
        this.expansionAddress = expansionAddress;
    }

    @Override
    public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> pcoll) {
        return pcoll.apply(
            "PythonWordCount",
            External.of(URN, new byte [] {}, expansionAddress)
        );
    }
}

这是我的 Python 转换的摘录:

URN = 'beam:transforms:xlang:pythonwordcount'

@PTransform.register_urn(URN, None)
class PythonWordCount(PTransform):

    def expand(self, pcoll: PCollection[Tuple[str, str]]) -> PCollection[Tuple[str, str]]:
        return (
            pcoll |
            'GetValues' >> Values() |
            'Split' >> FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str) |
            'PairWithOne' >> Map(lambda x: (x, 1)) |
            'GruoupAndSumWindow' >> WindowInto(FixedWindows(10)) |
            'GroupAndSum' >> CombinePerKey(sum) |
            'Format' >> Map(format_result)
        )

    def to_runner_api_parameter(self, unused_context):
        return URN, None

    @staticmethod
    def from_runner_api_parameter(unused_ptransform, unused_paramter, unused_context):
        return PythonWordCount()

我这样设置 Python 的扩展服务:

./expansion_service.py -p 9097

Flink 的作业服务器:

# From Apache Beam's repository
./gradlew :runners:flink:1.13:job-server:runShadow

当我像这样运行 Java 管道时:

mvn exec:java -Dexec.mainClass=org.apache.beam.examples.CrossLanguagePipeline -Pportable-runner -Dexec.args="--runner=PortableRunner --jobEndpoint=localhost:8099 --useExternal=true --expansionServiceURL=localhost:9097 --experiments=beam_fn_api"

我得到以下堆栈跟踪:

java.lang.RuntimeException: expansion service error: Traceback (most recent call last):
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py", line 58, in Expand
    producers = {
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py", line 59, in <dictcomp>
    pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1350, in from_runner_api
    result.outputs = {
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1351, in <dictcomp>
    None if tag == 'None' else tag: context.pcollections.get_by_id(id)
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pvalue.py", line 207, in from_runner_api
    element_type=context.element_type_from_coder_id(proto.coder_id),
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 267, in element_type_from_coder_id
    self.coders[coder_id].to_type_hint())
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 163, in __getitem__
    return self.get_by_id(id)
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py", line 364, in from_runner_api
    [context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py", line 364, in <listcomp>
    [context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 114, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py", line 361, in from_runner_api
    parameter_type, constructor = cls._known_urns[coder_proto.spec.urn]
KeyError: 'beam:coders:javasdk:0.1'

    at org.apache.beam.runners.core.construction.External$ExpandableTransform.expand (External.java:222)
    at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:499)
    at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:376)
    at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:19)
    at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:8)
    at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:482)
    at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:363)
    at org.apache.beam.examples.CrossLanguagePipeline.main (CrossLanguagePipeline.java:49)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)

我不知道为什么这不起作用,而其他示例则起作用。
类型似乎在KafkaIO.readPythonWordCount expand方法之间匹配。

4

0 回答 0