问题标签 [flink-cep]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - warnings.print() 以相反的顺序打印事件(最后一个事件在前),Apache Flink CEP 中的第一个事件除外
我正在尝试使用以下模式过滤Flink中> 10的所有临时事件,
输入是一个文本文件,由输入函数解析为流,输入文件的内容是:-
这里第一个值是 Rack_id,第二个是温度
我在 input-stream 和 WarnigsStream 上都发出了 print() ,如下所示
现在,问题来了,Flink CEP 的输出如下所示
正如我们所看到的,第一个复杂事件(机架 id = 1 和温度 = 98.0))以相同的顺序打印,但在此之后,所有其他温度 > 50 的复杂事件都以与输入相反的顺序打印溪流。
提前致谢
maven - Flink-CEP 中缺少“Map”的泛型类型参数
在 Flink-CEP 中检测模式的代码如下所示
如果在 Mac 中使用 command + F9 构建,则会显示以下错误
但是构建 usignmvn clean install
然后通过 Control + R 运行会显示输出,
我想知道为什么这一直在发生?
有什么办法可以做到吗?
PS:但是我正在使用 eclipse JDT 插件,即使那样它在日志中也显示错误。POM.XML 的内容是
非常欢迎提出建议。在此先感谢
sockets - 将数据流从 VM 套接字/远程套接字发送到在 Host OS 上运行的 Flink 程序
如 Flink 文档中所述,我可以通过使用打开本地套接字从文本服务器读取文本输入
然后在 Flink 程序上使用
但是,由于我正在模拟一些场景,所以我想从 VM(然后是各种 VM)获取数据流并将其发送到在主机操作系统上运行的 CEP 程序。
所以,我已经安装了 VM,使用 Vagrant 和 SSH 到它使用vagrant ssh
来宾操作系统的主机名是精确 64
使用 ifconfig = 10.0.2.15 的 IP 地址
现在,我现在想做的是看看我是否可以从 VM 发送一些数据并在 Flink 程序中接收它,就像我在本地环境中所做的一样。
我通过使用在来宾操作系统上打开了 Netcat 套接字
我试图通过使用在主机程序上接收它,但出现错误
我也试过上面的precision64@10.0.2.15,但我认为我做错了。
任何想法,我应该如何将数据流从 VM 发送到主机 Flink 程序
欢迎提出建议,在此先感谢!
serialization - 在 Flink 中使用套接字发送 DataStream;序列化问题
我想将数据流从 VM 发送到主机,我正在使用writeToSocket()
如下所示的方法:
这里joinedStreamEventDataStream
是类型DataStream<Integer,Integer>
。
有人可以告诉我应该如何将序列化程序传递给上述方法。
提前致谢
apache-kafka - 将数据流从 VM 套接字发送到 Kafka 并在主机操作系统的 Flink 程序上接收:反序列化问题
我正在使用下面的代码将数据流从 VM 发送到 Kafka 的测试主题(在 192.168.0.12 IP 的主机操作系统上运行)
JoinedStreamEvent
是类型DataSream<Tuple3<Integer,Integer,Integer>>
它基本上加入了 2 个流respirationRateStream
和 heartRateStream
Host OS 上运行的另一个 Flink 程序试图从 kafka 读取数据流。我在这里使用 localhost,因为 kafka 和 zookeper 正在主机操作系统上运行。
我得到这样的输出
我想我需要实现类型的反序列化器JoinedStreamEvent
。有人可以给我一个想法我应该怎么写,类型的反序列JoinedStreamEvent
化器DataSream<Tuple3<Integer, Integer, Integer>>
请让我知道是否需要做其他事情。
PS - 我想写下面的反序列化器,但我认为它不正确
apache-kafka - 从 Apache Flink 中的 Kafka 代理读取最新数据
我想从 Kafka 接收到 Flink 程序的最新数据,但是 Flink 正在读取历史数据。我已经设置auto.offset.reset
为latest
如下所示,但它没有工作
Flink Programm 正在使用以下代码从 Kafka 接收数据
我正在关注 https://issues.apache.org/jira/browse/FLINK-4280上的讨论,这建议以下面提到的方式添加源
我做了同样的事情,但是我无法访问setStartFromLatest()
我应该怎么做才能接收发送到 Kafka 的最新值而不是从历史中接收值?
apache-kafka - 如何在检测到 x 个复杂事件后停止 Flink CEP 引擎并计算引擎处理时间
我想通过下面提到的代码找到 Flink 中的处理时间
但是这里的问题是,当产生一定数量的复杂事件时,我不知道如何停止 Flink 程序。我将详细阐述场景
场景: Flink 程序正在接收心率、呼吸率和其他流数据,并正在生成心脏病发作警告。传感器数据是使用一些分布生成的。
第一个任务是在连续产生 3 个警告时停止引擎并查找引擎的处理时间?
第二个任务是以一定的速率生成数据流?目前,Flink 程序正在从文件中读取各种传感器数据并从中生成流,并将 Steam 数据发送到 Kafka,然后再发送到 Flink 程序。有人能告诉我如何以x 事件/秒的速率生成原始流吗?
java-8 - 无法在自定义 DataSink 上初始化字段(Flink CEP)
我对 Apache Flink Streaming API 有疑问。
我可以设法使用自定义数据源设置整个 CEP 环境,并且在该源上使用标准接收器(如“print()”)时,一切正常。
这是我的水槽现在的样子:
我试图实现的目标是将方法引用传递给这个 SinkFunction,它将为我的 DataStream 中的每个元素执行。
这就是我初始化 SinkFunction 的方式:
我的问题是,当我在自定义接收器的“调用”方法中设置断点时,即使我显式调用构造函数来分配消费者,消费者也似乎为空。
java - 在没有所有 Flink 环境的情况下使用 flink-cep 模式匹配
如标题所示:是否可以仅使用 flink 模式匹配而无需整个其他 flink 环境?
log4j - 在 Apache Flink 中使用 SLF4J 记录器时出错
我想在 Flink 中输出日志,我已经创建了如下所述的记录器代码
我已经导入了 LoggerFactory
CEPMonitoring 类包含代码如下
但是,这并没有打印任何东西。另外,我没有https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.htmllogback.xml
中提到的文件
PS - 我在使用时遇到错误
这最终导致我写
它给出了一个错误,然后我必须
最重要的是,我在运行程序时收到以下警告
我很困惑该怎么办?