2

我在没有集群的情况下在本地运行以下代码:

val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
  println("Found: " + (patternName, eventMap))
  count.incrementAndGet()
})

env.execute()
println(count)

我的数据是以下格式的 CSV 文件(用户,val):

1,1
1,2
1,3
2,1
2,2
2,3
...

我正在尝试检测模式 where 的事件event(val=1) -> event(val=2) -> event(val=3)。当我在大型输入流上运行此程序时,我知道流中存在一定数量的事件,我得到检测到的事件计数不一致,几乎总是少于系统中的事件数。如果我这样做env.setParallelism(1)(就像我在代码的第 3 行中所做的那样),则会检测到所有事件。

我假设问题是当并行度大于 1 时,多个线程正在处理来自流的事件,这意味着虽然一个线程具有event(val=1) -> event(val=2),event(val=3)可能会被发送到另一个线程,并且整个模式可能不会被检测到。

我在这里缺少什么吗?我不能丢失流中的任何模式,但是将并行度设置为 1 似乎违背了让像 Flink 这样的系统来检测事件的目的。

更新:

我尝试使用以下方法键入流:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

虽然这可以防止不同用户的事件相互干扰:

1,1
2,2
1,3

这并不妨碍 Flink 将事件乱序发送到节点,这意味着非确定性仍然存在。

4

2 回答 2

1

最有可能的问题在于在 map 运算符之后应用 keyBy 运算符。

所以,而不是:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

应该有:

val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...)

我知道这是一个老问题,但也许它可以帮助某人。

于 2017-02-13T17:22:38.763 回答
0

您是否考虑过使用用户 ID(您的第一个值)来键入您的流?Flink 保证一个 key 的所有事件都到达同一个处理节点。当然,这只会有帮助,如果您想检测每个用户的 val=1->val=2->val=3 模式。

于 2016-08-11T06:49:33.097 回答