问题标签 [flink-cep]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
873 浏览

apache-flink - 未遵循的 Apache Flink CEP 模式操作

我有一个场景,如果第二个事件在 x 秒内没有跟随第一个事件,我必须更改状态。例如,用户在 100 分钟内没有注销,则认为他处于无效状态。如何使用当前的模式操作来设计?

0 投票
0 回答
278 浏览

apache-flink - flink流 - cep找不到以前的共享缓冲区条目与密钥

我尝试在 Flink 上运行 cep,并从本地路径获取测试数据,一开始,我将文件大小设置为 1G 左右,它运行良好。但是当我将文件大小设置为 10G 时,出现了下面的问题.

这是我的代码。谢谢帮助

0 投票
2 回答
900 浏览

java - Flink Eclipse JDT 编译器问题

我正在尝试运行这个程序:[Flink CEP Monitoring][1]

[1]:https ://github.com/tillrohrmann/cep-monitoring在 Amazon EC2 上安装了 openjdk1.8 和 Flink 1.0.2。但是当我试图运行这个程序时,它会抛出以下异常:

缺少“地图”的泛型类型参数。您的编译器似乎没有将它们存储到 .class 文件中。目前,只有 Eclipse JDT 编译器保留了安全使用 lambdas 特性所需的类型信息。有关如何编译包含 lambda 表达式的作业的更多信息,请参阅文档。org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1316) org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1302) org.apache.flink。 api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:346) org.apache.flink.cep.PatternStream.select(PatternStream.java:64) org.stsffap.cep.monitoring.CEPMonitoring.main(CEPMonitoring.java: 95)

由于 Lambda 表达式,有什么方法可以用 Java 8 编译 Flink 程序吗?我该如何解决这个错误?

0 投票
2 回答
1351 浏览

apache-flink - Apache Flink 联合运算符给出错误响应

我在两个DataStream通用记录类型上应用联合运算符。

输出:

如您所见,合并后 dataMessageGenericRecordStream 中的记录不正确。所有字段值都将被第一个字段值替换。

0 投票
2 回答
2418 浏览

java - Flink Table API 无法将 DataSet 转换为 DataStream

我正在使用使用 Java 的 Flink Table API,我想将 DataSet 转换为 DataStream .... 以下是我的代码:

但是当我尝试执行这个程序时,它会抛出以下异常:

org.apache.flink.api.table.ExpressionException:JavaStreamingTranslator 的无效根:根(ArraySeq((related_value,Double),(ref_id,String)))。您是否尝试将基于 DataSet 的 Table 转换为 DataStream,反之亦然? 我想知道我们如何使用 Flink Table API 将 DataSet 转换为 DataStream ?

我想知道的另一件事是,对于模式匹配,有可用的 Flink CEP 库。但是使用 Flink Table API 进行模式匹配是否可行?

0 投票
1 回答
1033 浏览

mysql - 在 Apache Flink 的 Table API 中使用 CASE WHEN 进行查询

我正在使用 Flink 的 Table API。我想CASE WHEN在 Table API 查询中使用。我的查询使用三个字段:routeid, startlocation, distance并且我想使用一个CASE WHEN子句来识别基于distance值的因素,这些因素是基于routeid.

示例

Flink 的 Table API 可以做到这一点吗?如果是,该怎么做?

0 投票
1 回答
231 浏览

scala - Scala:每个范围内的对象都是相同的单例吗?

我有一个 flink 应用程序。我在 map 函数中使用了一个对象。像这样:

streamProcessor 是一个对象。该对象内部是数据库的另一个服务对象。每当应用程序有事件发生时,Flink 都会执行这个 map 函数。我想知道的是:每次对象都是相同的单例对象吗?

一个例子:

-> 事件到达应用程序 -> map 函数将执行并创建一个单例对象

-> 下一个事件到达应用程序 -> 地图函数将再次执行 -> 对象将再次调用

第二个对象是相同的实例吗?

0 投票
1 回答
815 浏览

scala - 用于 CEP 示例的简单 Scala API 不显示任何输出

我正在编写一个简单的示例,用于在 Flink 中测试 CEP 的新 Scala API,使用 1.1-SNAPSHOT 的最新 Github 版本。

Pattern 只是对值的检查,并为每个匹配的模式输出一个字符串作为结果。代码如下:

它在 1.1-SNAPSHOT 下编译和运行没有问题,但 jobmanager 输出没有显示 print() 的迹象。即使放宽模式条件,仅设置“开始”(接受所有事件)也绝对不会返回任何内容。

此外,在尝试添加阶段时,代码无法编译。如果我将模式更改为(查找第三个字段小于 4 的两个连续事件):

然后编译器抛出:

显示错误是在“开始”阶段之后的第一个 where() 上。我尝试使用以下方法显式设置参数类型:

这样它会再次编译,但是当它在 Flink 上运行时,不会显示任何输出。StreamingAlert 是一个 Scala DataStream[(String, Long, Int)],在代码的其他部分,我可以_._ < 4毫无问题地进行过滤,并且输出看起来是正确的。

0 投票
1 回答
272 浏览

batch-file - Flink CEP Streaming:批处理模式,连续还是微批处理?

我正在尝试理解这个 Flink CEP 示例

https://github.com/tillrohrmann/cep-monitoring ..我在分布式模式(1 个主设备和 3 个核心)上执行了 Flink CEP 的这个示例。现在我正在将输出写入文件,因此我的输出被写入3 个文件,因为使用 3 个核心。在其中一个文件中,我看到记录如下:

现在,如果我们从第 5 行开始查看。我们可以检查从第 5 行打印的所有TemperatureAlerts(即温度:113.65291115679136),我们可以在 TemperatureWarning 中识别以下温度(温度:113.65291115679136 出现在第 15 行意思是我们可以识别哪些温度,警报已被打印....但是从第 1 行到第 4 行生成的警报呢?您甚至可以在行号中找到相同的记录。11 ..我的意思是我们如何识别已生成警报的温度警告?它是以批处理模式、连续模式还是微批处理模式执行流式传输?

0 投票
1 回答
1968 浏览

apache-kafka - Flink Kafka Consumer 在使用 DataStream 键时抛出空指针异常

我正在使用这个Flink CEP示例,我在其中分离出数据,因为我创建了一个应用程序,该应用程序正在将应用程序发送到 Kafka 和另一个从 Kafka 读取的应用程序......我为类TemperatureWarning生成了生产者,即在 Kafka 中,我正在发送数据与TemperatureWarning相关 以下是我的代码,它正在使用来自 Kafka 的数据......

但是当我执行这个应用程序时,它会抛出以下异常:

以下是我的课TemperatureWarning

}

以下是我的课程MonitoringEventSchema

{

}

现在执行keyBy操作需要什么,因为我已经提到流分区所需的密钥?这里需要做什么来解决这个错误??