1

我正在编写一个简单的示例,用于在 Flink 中测试 CEP 的新 Scala API,使用 1.1-SNAPSHOT 的最新 Github 版本。

Pattern 只是对值的检查,并为每个匹配的模式输出一个字符串作为结果。代码如下:

val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)

val cepEventAlert = CEP.pattern(streamingAlert, pattern)

def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
    val startEvent = pattern.get("start").get
    "Alerta:"+startEvent._1+": Pattern"
}

val patternStreamSelected = cepEventAlert.select(selectFn(_))

patternStreamSelected.print()

它在 1.1-SNAPSHOT 下编译和运行没有问题,但 jobmanager 输出没有显示 print() 的迹象。即使放宽模式条件,仅设置“开始”(接受所有事件)也绝对不会返回任何内容。

此外,在尝试添加阶段时,代码无法编译。如果我将模式更改为(查找第三个字段小于 4 的两个连续事件):

Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

然后编译器抛出:

error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))

显示错误是在“开始”阶段之后的第一个 where() 上。我尝试使用以下方法显式设置参数类型:

(x: (String, Long, Int)) => x._3 < 4

这样它会再次编译,但是当它在 Flink 上运行时,不会显示任何输出。StreamingAlert 是一个 Scala DataStream[(String, Long, Int)],在代码的其他部分,我可以_._ < 4毫无问题地进行过滤,并且输出看起来是正确的。

4

1 回答 1

1

流式 API 中的print()API 调用不会触发急切执行。您仍然需要env.execute()在程序结束时调用。

当您定义您的模式时,您应该在某处提供事件类型。要么你按照你已经完成的那样做,要么你通过一个类型参数来做begin

Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
于 2016-05-25T13:43:20.133 回答