问题标签 [flink-cep]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
52 浏览

apache-flink - warnings.print() 以相反的顺序打印事件(最后一个事件在前),Apache Flink CEP 中的第一个事件除外

我正在尝试使用以下模式过滤Flink中> 10的所有临时事件,

输入是一个文本文件,由输入函数解析为流,输入文件的内容是:-

这里第一个值是 Rack_id,第二个是温度

我在 input-stream 和 WarnigsStream 上都发出了 print() ,如下所示

现在,问题来了,Flink CEP 的输出如下所示

正如我们所看到的,第一个复杂事件(机架 id = 1 和温度 = 98.0))以相同的顺序打印,但在此之后,所有其他温度 > 50 的复杂事件都以与输入相反的顺序打印溪流。

提前致谢

0 投票
2 回答
1685 浏览

maven - Flink-CEP 中缺少“Map”的泛型类型参数

在 Flink-CEP 中检测模式的代码如下所示

如果在 Mac 中使用 command + F9 构建,则会显示以下错误

但是构建 usignmvn clean install然后通过 Control + R 运行会显示输出,

  • 我想知道为什么这一直在发生?

  • 有什么办法可以做到吗?

PS:但是我正在使用 eclipse JDT 插件,即使那样它在日志中也显示错误。POM.XML 的内容是

非常欢迎提出建议。在此先感谢

0 投票
1 回答
190 浏览

sockets - 将数据流从 VM 套接字/远程套接字发送到在 Host OS 上运行的 Flink 程序

如 Flink 文档中所述,我可以通过使用打开本地套接字从文本服务器读取文本输入

然后在 Flink 程序上使用

但是,由于我正在模拟一些场景,所以我想从 VM(然后是各种 VM)获取数据流并将其发送到在主机操作系统上运行的 CEP 程序。

所以,我已经安装了 VM,使用 Vagrant 和 SSH 到它使用vagrant ssh

  1. 来宾操作系统的主机名是精确 64

  2. 使用 ifconfig = 10.0.2.15 的 IP 地址

现在,我现在想做的是看看我是否可以从 VM 发送一些数据并在 Flink 程序中接收它,就像我在本地环境中所做的一样。

我通过使用在来宾操作系统上打开了 Netcat 套接字

我试图通过使用在主机程序上接收它,但出现错误

我也试过上面的precision64@10.0.2.15,但我认为我做错了。

任何想法,我应该如何将数据流从 VM 发送到主机 Flink 程序

欢迎提出建议,在此先感谢!

0 投票
2 回答
964 浏览

serialization - 在 Flink 中使用套接字发送 DataStream;序列化问题

我想将数据流从 VM 发送到主机,我正在使用writeToSocket()如下所示的方法:

这里joinedStreamEventDataStream是类型DataStream<Integer,Integer>

有人可以告诉我应该如何将序列化程序传递给上述方法。

提前致谢

0 投票
1 回答
79 浏览

apache-kafka - 将数据流从 VM 套接字发送到 Kafka 并在主机操作系统的 Flink 程序上接收:反序列化问题

我正在使用下面的代码将数据流从 VM 发送到 Kafka 的测试主题(在 192.168.0.12 IP 的主机操作系统上运行)

JoinedStreamEvent是类型DataSream<Tuple3<Integer,Integer,Integer>>它基本上加入了 2 个流respirationRateStreamheartRateStream

Host OS 上运行的另一个 Flink 程序试图从 kafka 读取数据流。我在这里使用 localhost,因为 kafka 和 zookeper 正在主机操作系统上运行。

我得到这样的输出

在此处输入图像描述

我想我需要实现类型的反序列化器JoinedStreamEvent。有人可以给我一个想法我应该怎么写,类型的反序列JoinedStreamEvent化器DataSream<Tuple3<Integer, Integer, Integer>>

请让我知道是否需要做其他事情。

PS - 我想写下面的反序列化器,但我认为它不正确

0 投票
2 回答
650 浏览

apache-kafka - 从 Apache Flink 中的 Kafka 代理读取最新数据

我想从 Kafka 接收到 Flink 程序的最新数据,但是 Flink 正在读取历史数据。我已经设置auto.offset.resetlatest如下所示,但它没有工作

Flink Programm 正在使用以下代码从 Kafka 接收数据

我正在关注 https://issues.apache.org/jira/browse/FLINK-4280上的讨论,这建议以下面提到的方式添加源

我做了同样的事情,但是我无法访问setStartFromLatest()

我应该怎么做才能接收发送到 Kafka 的最新值而不是从历史中接收值?

0 投票
0 回答
158 浏览

apache-kafka - 如何在检测到 x 个复杂事件后停止 Flink CEP 引擎并计算引擎处理时间

我想通过下面提到的代码找到 Flink 中的处理时间

但是这里的问题是,当产生一定数量的复杂事件时,我不知道如何停止 Flink 程序。我将详细阐述场景

场景: Flink 程序正在接收心率、呼吸率和其他流数据,并正在生成心脏病发作警告。传感器数据是使用一些分布生成的。

  1. 第一个任务是在连续产生 3 个警告时停止引擎并查找引擎的处理时间?

  2. 第二个任务是以一定的速率生成数据流?目前,Flink 程序正在从文件中读取各种传感器数据并从中生成流,并将 Steam 数据发送到 Kafka,然后再发送到 Flink 程序。有人能告诉我如何以x 事件/秒的速率生成原始流吗?

0 投票
1 回答
89 浏览

java-8 - 无法在自定义 DataSink 上初始化字段(Flink CEP)

我对 Apache Flink Streaming API 有疑问。

我可以设法使用自定义数据源设置整个 CEP 环境,并且在该源上使用标准接收器(如“print()”)时,一切正常。

这是我的水槽现在的样子:

我试图实现的目标是将方法引用传递给这个 SinkFunction,它将为我的 DataStream 中的每个元素执行。

这就是我初始化 SinkFunction 的方式:

我的问题是,当我在自定义接收器的“调用”方法中设置断点时,即使我显式调用构造函数来分配消费者,消费者也似乎为空。

0 投票
2 回答
212 浏览

java - 在没有所有 Flink 环境的情况下使用 flink-cep 模式匹配

如标题所示:是否可以仅使用 flink 模式匹配而无需整个其他 flink 环境?

0 投票
2 回答
1232 浏览

log4j - 在 Apache Flink 中使用 SLF4J 记录器时出错

我想在 Flink 中输出日志,我已经创建了如下所述的记录器代码

我已经导入了 LoggerFactory

CEPMonitoring 类包含代码如下

但是,这并没有打印任何东西。另外,我没有https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.htmllogback.xml中提到的文件

PS - 我在使用时遇到错误

这最终导致我写

它给出了一个错误,然后我必须

最重要的是,我在运行程序时收到以下警告

我很困惑该怎么办?