问题标签 [flink-cep]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
956 浏览

apache-flink - 在 Flink CEP 中并行处理多个模式

我有以下案例场景

在此处输入图像描述

有 2 台虚拟机正在向 Kafka 发送流,CEP 引擎正在接收这些流,当单个流满足特定条件时会生成警告。

目前,CEP 正在检查两个患者的两条数据流(当心率 > 65 和呼吸率 > 68 时)是否相同,并同时发出警报,如下所示

但我想对两个流使用不同的条件。例如,我想发出警报,如果

我该如何做到这一点?我是否需要在同一环境中创建多个流环境或多个模式。

0 投票
2 回答
666 浏览

cassandra - 在 Flink 错误中添加 Cassandra 作为接收器:所有主机尝试查询失败

我在https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html跟进了一个示例,将 Cassandra 连接为 Flink 中的接收器

我的代码如下所示

我收到以下错误

0 投票
1 回答
378 浏览

java - 使用 apache flink 处理有状态的复杂事件

我想根据两个具有相同标识符的事件来检测两个事件是否在定义的时间范围内发生。例如一个DoorEvent看起来像这样:

下面示例中的我的DoorEventjava 类具有相同的结构。

我想检测到 id 为 1 的门在打开后 5 分钟内关闭。为此,我尝试使用 Apache flink CEP 库。传入的流包含来自 20 个门的所有打开和关闭消息。

如何将门 1 的状态保存为打开状态,door_open以便在door_close步骤中我知道门 1 是关闭的门而不是其他门?

0 投票
1 回答
365 浏览

junit - 带有 flink spector 类型的 JunitTest DataStream

我正在创建一个测试,以查看我的 flink 模式的超时是否正常运行。我为此使用了 flink spector,并且我有以下测试用例:

我的代码确实status statusaaaaaapatternTimeoutFunction. 在定义的时间段内未检测到第二个状态,因此调用超时并将整数添加到模式流中。如何在我的 ExpectedRecords 中说我期望值为 123 的 Either Left?

编辑
我目前遇到的错误是:

0 投票
1 回答
721 浏览

java - 使用 Flink 窗口和折叠功能,缺少元素?

当我尝试使用 window 和 fold 函数聚合元素时,一些元素会因为聚合而丢失。使用来自 Kafka 的元素(value:0, value:1, value:2, value:3)并将它们聚合为奇数和偶数值。

输出是:

缺少 10-13 之间的数字,这发生在一组随机数字上。有人可以建议下面的代码中遗漏了什么吗?我怎样才能确保处理所有元素?

0 投票
1 回答
763 浏览

apache-flink - 在代码中访问 Flink 的系统指标到终端,而不是使用像 JMX 这样的任何指标报告器

我使用 JMX 作为指标报告器来获取 Flink 指标,但是有没有办法将它作为终端的输出?

我想numRecordsInPerSecond为每个运算符绘制性能分析,我该怎么做?

我看过一些累加器的例子,但它并没有让我正确地了解如何对 Flink 进行性能分析。我在这里给你一个例子

在此处输入图像描述

这是我的Flink程序的执行计划,我有多个问题,但我想问一个基本的

  1. 如何测量每个运算符的延迟,然后将其相加以计算复杂事件的总延迟。

  2. 如何测量输出吞吐量?目前,我已经在 select 函数中编写了一些代码,这些代码计算了看到的复杂事件的数量和 Flink 引擎启动的时间。这是最好的方法吗?

但基本问题仍然存在,即如何通过代码获取Flink 指标中提到的系统指标的输出以显示在终端输出中,因为我想绘制性能图表,而 JMX 的问题是它向我显示指标从某种意义上说,当我在 JMX 控制台中单击该特定指标时,我会看到这些值,这并不完全适合分析系统。

PS - 我在 StackOverflow 上发现了一个关于计算吞吐量和延迟的问题,答案是这样的

我也在我的代码中添加了上面的类,但没有看到任何输出,我也想知道这个代码将如何显示吞吐量或延迟,因为我们没有提到我们想要找到哪个运算符的延迟?例如,我想在执行计划中间而不是计划结束时为某个操作员找到吞吐量,上面的代码会为我做吗?

0 投票
1 回答
136 浏览

apache-flink - 如何使用 Flink 中提供的成本估算器类获取 Flink 中的运营成本

我想做 Flink CEP 引擎的性能分析,我遇到了这些类

但问题是我不知道如何使用这两个类。有人可以向我提供代码或暗示,我如何在 Flink 中找到运营商 { 例如加入} 的成本估算。

下面是我在 Flink 中执行的连接代码

如何计算此连接的成本?

0 投票
1 回答
50 浏览

counter - 无法在 Flink 1.3.2 中添加计数器

我正在尝试在 Flink 中添加一个计数器,如此所述,但问题是 counter.inc() 返回的是 void 而不是 Integer。我的指标代码如下

0 投票
2 回答
877 浏览

apache-flink - 在 Flink 中使用计数器获取 numOfRecordsIn

我想为 Flink 中的操作员显示 numRecordsIn,为此我一直在关注数据工匠的ppt。下面给出了计数器的代码

问题是如何指定要显示 number_of_Records_In 的运算符?

0 投票
1 回答
3377 浏览

apache-flink - 提交 jar 文件时无法运行 Flink 作业,但程序在 Intellij 中以其他方式运行

我的 Flink 程序在 IntellijIdea 中成功运行,但是当我将该程序的 jar 文件作为 jar 提交时,它显示以下错误

我已经处理了清单文件中的类路径,以前我也可以使用我的 jar 运行。

我的模式代码似乎存在一些问题,如下所示