问题标签 [flink-cep]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
801 浏览

apache-kafka - flink 流在时间窗口上创建文件( csv 或文本)

我是 flink 的新手,我有这样的转变假设

现在我想为每 20 秒的时间窗口创建文件

我提供了文件名+秒,这样每次创建文件时都会附加秒数。

但是这里只创建了一个文件,我想每 20 秒窗口创建一个新文件,我该怎么做?

0 投票
1 回答
274 浏览

scala - flink 表 SQL 接口

我想知道我们是否可以在 flink Table 和 SQL api 中使用两个表(join)来编写查询。

我是 flik 的新手,我想从两个不同的数据集创建两个表并查询它们并生成其他数据集。

我的查询是这样的select... from table1, table 2...,我们可以像这样查询两个或更多表的查询吗?

谢谢

0 投票
2 回答
1371 浏览

java - Flink Avro 1.8.1 NoSuchMethodError 在集群上运行时

我们运行 Flink 1.3.0 CEP 作业并依赖 Avro 1.8.1(LogicalType 在 Avro 1.7.7 中不存在)来序列化复杂事件(作为 POJO)。它在 IDE (IntelliJ) 中运行时可以工作,但是当我们打包 jar 文件并将其部署到集群时,我们得到:

但是,似乎我们构建的 jar 包含关于构建输出的正确 Avro 包版本(1.8.1)。

问题:我们如何确保我们的 Flink 集群使用正确的 Avro 版本(1.8.1)?

我们的 pom.xml:

更新:我从源代码构建 flink 1.3 并在项目的 pom.xml 中更新了 avro 版本(从 1.7.7 到 1.8.1),它现在似乎可以工作。仍然不确定为什么在构建胖罐时它不起作用。

0 投票
2 回答
628 浏览

apache-flink - Flink 复杂事件处理

我有一个从套接字读取并检测模式的 flink cep 代码。可以说模式(单词)是“警报”。如果警报一词出现五次或更多次,则应创建警报。但我收到输入不匹配错误。Flink 版本是 1.3.0。提前致谢 !!

在此处输入图像描述

0 投票
1 回答
671 浏览

apache-flink - Flink CEP:哪种方法可以为不同类型的事件加入数据流?

假设我有 2 种不同类型的数据流,一种提供天气数据,另一种提供车辆数据,我想使用 Flink 对数据进行复杂的事件处理。

Flink 1.3.x 中的哪种方法是正确的使用方法?我看到了不同的方法,例如 Union、Connect、Window Join。基本上我只想尝试这样一个简单的 CEP:

谢谢!

0 投票
1 回答
1612 浏览

amazon-s3 - Flink Streaming AWS S3 并行读取多个文件

我是 Flink 新手,我的理解是遵循 API 调用

将为给定的 S3 存储桶路径并行读取文件。

我们将日志文件存储在 S3 中。要求是为多个客户端请求提供服务,以从带有时间戳的不同文件夹中读取。

对于我的用例,为了服务多个客户端请求,我正在评估使用 Flink。所以我希望 Flink 为不同的 AWS S3 文件路径并行执行 AWS S3 读取。

是否有可能在单个 Flink Job 中实现这一点。有什么建议么?

0 投票
1 回答
733 浏览

apache-kafka - Apache Flink CEP 的动态流 SQL

我想将流 SQL 放入 Kafka 以供 Flink 用于 CEP。这是一个好方法吗?

我知道在 Flink 上不允许动态模式定义,我需要应用它们可以在无界事件流上更改的规则。

举个例子;

有一个 UI 供用户为其设备定义规则。想象一下,有一个东西影子服务(例如 AWS IoT 中心)保持物理设备的状态,而且我认为将每个设备的特定规则放入影子规则中以及当影子演员接收到传感器数据时,这将是一个好方法,它可以发出添加了规则的数据,以供 Flink 作为规则引擎(通过 kafka)使用。所以我希望 Flink 执行我传入的传感器数据(及其规则),这些数据可能因每个设备而异。

0 投票
2 回答
1544 浏览

apache-flink - 是否可以在 apache flink CEP 中处理多个流?

我的问题是,如果我们有两个原始事件流,即SmokeTemperature,并且我们想通过将运算符应用于原始流来确定是否发生了复杂事件,即Fire ,我们可以在 Flink 中执行此操作吗?

我问这个问题是因为到目前为止我看到的所有 Flink CEP 示例都只包含一个输入流。如果我错了,请纠正我。

0 投票
2 回答
5173 浏览

java - 以连续方式将数据从自定义源写入 flink

这是我第一次使用 Apache Flink (1.3.1) 并有一个问题。更详细地说,我正在使用 flink-core、flink-cep 和 flink-streaming 库。我的应用程序是一个 Akka ActorSystem,它使用来自 RabbitMQ 的消息,并且各种参与者处理这些消息。在某些演员中,我想StreamExecutionEnvironment从 Flink 实例化 a 并处理传入的消息。因此,我编写了一个自定义源类来扩展RichSourceFunction该类。一切正常,除了一件事:我不知道如何将数据发送到我的 Flink 扩展。这是我的设置:

这是我的自定义源函数:

所以我想从外部调用sendDataFlinkExtension班级中的方法,以连续的方式将数据写入我的FlinkExtension. 这是我的 JUnit 测试应该将数据发送到扩展,然后将数据写入SourceContext.

但是如果我运行测试,什么都没有发生,应用程序挂在CustomSourceFunction. 我还尝试在CustomSourceFunctionrun 方法中创建一个新的无限线程。

总结一下:有人知道如何以连续的方式将数据从应用程序写入 Flink 实例吗?

0 投票
2 回答
368 浏览

apache-flink - 为什么 PatternStream 的相同事件可以同时发送到 PatternSelectFunction 和 PatternTimeoutFunction?

我必须在给定时间内在 3 个具有相同相关 ID 的 kafka 源流中收集 3 个事件,并且如果它们迟到,则能够收集所有或部分这些事件。

我在 3 DataStream 和 CEP 模式上使用了联合。但我注意到,与模式匹配良好并因此在选择函数中收集的事件也会在超时函数中发送,一旦达到超时

我不知道我在示例中做错了什么,或者我不明白什么,但我期待正匹配的事件也不会超时。

我得到的印象是存储了不相交的时间快照。

我正在使用 1.3.0 Flink 版本。

谢谢您的帮助。

控制台输出,我们可以看到 3 个相关事件中的 2 个被选中并超时:

匹配事件:
Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948
Key---f969dd4d-47ff-445c-9182-0f95a569febb
Key---2ecbb89d-1463-4669-a657-555f73b6fb1d

超时事件:

第一次调用超时函数:
Key---f969dd4d-47ff-445c-9182-0f95a569febb
Key---0b3c116e-0703-43cb-8b3e-54b0b5e93948

第二次调用:
Key---f969dd4d-47ff-445c-9182-0f95a569febb

我的测试代码: