1

我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和未加入的事件总数。我通过使用地图来做到这一点,joinedEventDataStream如下所示

joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {

            @Override
            public Object map(JoinedEvent joinedEvent) throws Exception {

                number_of_joined_events += 1;

                return null;
            }
        });

问题#1:这是计算流中事件数量的适当方法吗?

问题#2:我注意到一种有线行为,你们中的一些人可能不相信。问题是当我在 IntelliJ IDE 中运行我的 Flink 程序时,它显示了正确的值,number_of_joined_events0在我将此程序提交为jar. number_of_joined_events因此,当我将程序作为jar文件而不是实际计数运行时,我得到了初始值。为什么这种情况只发生在jar文件提交的情况下而不是在 IDE 中?

4

1 回答 1

2

你的方法不起作用。您在通过 JAR 文件执行程序时注意到的行为是预期的。

我不知道如何number_of_joined_events定义,但我假设它是您程序中的静态变量。当您在 IDE 中运行程序时,它会在单个 JVM 中运行。因此,所有操作员都可以访问静态变量。当您将 JAR 文件提交到远程进程时,程序会在不同的 JVM(可能是多个 JVM)中执行,并且客户端进程中的静态变量永远不会更新。

您可以使用 Flink 的 metrics 或ReduceFunction求和1s 的 a 来统计处理的记录数。

于 2017-11-13T09:12:40.193 回答