1

我有一个Zookeeper写的配置信息。我正在使用 Apache Curator 通过 Curator Watcher 读取配置(如果有更好的读取解决方案,我很乐意使用它),因此如果 Zookeeper 中的配置发生更改,我将收到新的配置。我在 Spark 中使用这个配置。如何将它共享给同一应用程序的所有 Spark 执行器?

谢谢!

乐:

谢谢迪凯,

在以下代码中,您将在哪里执行观察程序?我是新来的火花,我不确定每个工人会发生什么。

谢谢!

final JavaDStream<ElementMessage> nodeMessageStream = mapWithStateDistinctAndFiltered.flatMap(pair -> pair._2.buildElementMessages())
            .filter(f -> f != null);

    nodeMessageStream.foreachRDD(rdd -> {
        rdd.foreachPartition(r -> {
            final ElementRecordRestClient rest = new ElementRecordRestClient(
                    startProps.getProperty(InputPropertyKey.WEPAPP_URL.toString()));
            r.forEachRemaining(message -> {
                rest.createObject(message.toElementRecord());
            });
        });
    });
4

1 回答 1

0

在这种情况下,我要做的是在主节点上运行 Curator Watcher,并使用 Spark 的广播变量将配置广播到所有执行程序。每当配置更改时,您都会停止当前的流式传输上下文,并使用新配置启动一个新的流式传输上下文。这将确保您的结果始终一致。

另一种方法是在foreachPartitionlambda 函数中读取 zookeeper 配置。但是由于配置是由每个分区独立读取的,所以同一个RDD的不同分区会得到不同的配置,这可能不是你所期望的。

于 2016-07-06T11:06:52.410 回答