3

我正在尝试从 Google MemoryStore 读取数据并转换数据并使用 RedisIO API 写回 MemoryStore。我能够读取数据但无法写回 MemoryStore。当我使用 Dataflow DirectRunner 尝试本地 Redis 服务器时,它正在工作。当我使用 DirectRunner 尝试 Google MemoryStore 时,它​​正在工作。但是使用 DataFlowRunner 和 MemoryStore 它不起作用。(程序以 0 状态退出)。MemoryStore 和 DataFlow 作业在同一区域 (us-central1-a) 内运行。

这是我正在使用的代码:

package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;




public class RedisReadRedisWrite {

/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, String>, KV<String, String>> {

    private static final long serialVersionUID = 1L;

    @Override
    public KV<String, String> apply(KV<String, String> input) {
        System.out.println("Original: " + input.getKey() + " " + input.getValue());
        System.out.println("Modified: " + "X_" + input.getKey() + "X_" + input.getValue());
        return KV.of("X_" + input.getKey(), "X_" + input.getValue().substring(3));
    }
}

public interface WordCountOptions extends PipelineOptions {

}

static void runWordCount(WordCountOptions options) {
    Pipeline p = Pipeline.create(options);

    p.apply(RedisIO.read().withEndpoint("10.0.0.12", 6379).withKeyPattern("FOO*"))
            .apply(MapElements.via(new FormatAsTextFn())).apply(RedisIO.write().withEndpoint("10.0.0.12", 6379));

    p.run().waitUntilFinish();
}

public static void main(String[] args) {
    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);

    runWordCount(options);
}
}
4

0 回答 0