问题标签 [flink-cep]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
1525 浏览

apache-flink - 如何确保 flink 作业已完成执行,然后执行一些任务

我想在 flink 作业完成后执行一些任务,我在 Intellij 中运行代码时没有任何问题,但是当我在 shell 文件中运行 Flink jar 时出现问题。我正在使用下面的行来确保 flink 程序的执行完成

我不确定上述检查是否正确?

然后我在下面的方法中使用上面的变量来执行一些任务

有什么建议么 ?

0 投票
1 回答
1488 浏览

java - Apache Flink:如何计算 DataStream 中的事件总数

我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和未加入的事件总数。我通过使用地图来做到这一点,joinedEventDataStream如下所示

问题#1:这是计算流中事件数量的适当方法吗?

问题#2:我注意到一种有线行为,你们中的一些人可能不相信。问题是当我在 IntelliJ IDE 中运行我的 Flink 程序时,它显示了正确的值,number_of_joined_events0在我将此程序提交为jar. number_of_joined_events因此,当我将程序作为jar文件而不是实际计数运行时,我得到了初始值。为什么这种情况只发生在jar文件提交的情况下而不是在 IDE 中?

0 投票
1 回答
2493 浏览

apache-flink - 在 Flink 或任何其他系统中合并两种不同类型的数据流

我想将 Flink 用于远程患者监测案例场景,其中包括陀螺仪、加速度计、ECG 流、HR 速率流、RR 速率等各种传感器。因此在这种情况下,我们不可能拥有相同的数据类型或输入率等,但我仍然想检测心律失常或其他涉及在这些多个传感器上进行 CEP 的医疗状况

我所知道的是,如果我想对这些传感器执行一些复杂的事件处理,那么我有 2 个选项需要在 CEP 之前完成

  1. 加入差异流
  2. 合并差异流

之前我是根据传感器的时间戳执行连接,但它不会导致连接所有事件,因为差异流可以在微秒内具有差异速率和不同的时间戳,因此时间戳完全相等的情况很少见。

所以我想使用选项#2,即在执行 CEP 之前执行合并。为此,我在 Flink 文档中发现,我可以合并两个流,但它们应该具有相同的数据类型,我尝试做同样的事情,但我没有成功,因为我得到了以下错误

现在让我们看看我是如何尝试执行合并的。所以基本上我有两个流类,它们的属性如下

RRIntervalStream 事件流

qrsIntervalStream 事件流

这两个流都有生成器类,它们也以指定的速率以相同的数据类型发送事件。下面是我尝试合并它们的代码。

我必须使用new DataStream[],因为刚刚使用qrs_stream_raw导致错误,如下所示。

错误快照

有人可以给我一个想法吗

  1. 我应该如何合并这两个流?
  2. 我应该如何合并两个以上的流?
  3. 是否有一些引擎可以合并两个以上具有不同结构的流,如果是,我应该使用哪个引擎
0 投票
1 回答
361 浏览

streaming - 用于计算流处理中事件发生的设计?

以下讨论是在Apache Flink的上下文中进行的:

想象一下,如果我们要计算每个事件在 10 分钟内keyedStream到达的事件数量,假设我们有一个其键是它的键,事件时间是它的时间戳。id

需要解决的问题是:

  1. 如何设计窗户
    • 我们可以在每个事件到达后创建一个 10 分钟的窗口,但这意味着对于每个事件,都会有 10 分钟的延迟,因为等待 10 分钟的窗口。
    • 我们可以创建一个 10 分钟的窗口,将每个事件的时间戳作为这个窗口的最大时间戳,也就是说我们不需要等待 10 分钟,因为我们取的是元素到达前最后 10 分钟的元素. 但据我所知,这种窗口并不容易定义。
  2. 如何处理内存或其他资源问题?即使我们成功创建了一个窗口,可能事件的id类型是多种多样的,这么多窗口,系统如何将它们的状态保存在内存中?内存溢出的可能性很大。

可能有一些问题我在这里没有提到,或者除了window(即Patterns)之外还有一些好的解决方案。如果你有好的解决方法,请给我一个线索,谢谢。

0 投票
1 回答
117 浏览

apache-flink - 一次存在单个 FLINK CEP 模式

是否可以一次为一个键设置一个模式。例如,我想检查是否在 1 分钟窗口内收到 5 个与 where 条件匹配的事件,如果我在一分钟内收到 10 个此类事件,我应该收到 2 个警报,但我收到 6 个警报。这是因为每个事件都会触发一个新模式。我们可以一次为一个键创建一个模式实例吗?

0 投票
1 回答
360 浏览

apache-flink - Flink CEP 状态存储

Flink CEP 如何管理间歇性状态?它在哪里存储它们?它只是在内存中还是有支持状态的快速持久存储?

该文档在任何地方都没有提到这一点。

0 投票
1 回答
91 浏览

apache-flink - Apache Flink:如何将模式从源应用到另一个数据流?

我有一个事件数据流和另一个模式数据流。这些模式由用户在运行时提供,它们需要通过 Kafka 主题来。我需要使用 Flink-CEP 在事件流上应用每个模式。当我事先不知道模式时,有没有办法从 DataStream 中获取 PatternStream?

0 投票
2 回答
5612 浏览

apache-flink - 如何在 Flink 中调试可序列化的异常?

我遇到了几个可序列化的异常,我在 Flink 的互联网和文档上做了一些搜索;有一些著名的解决方案,如瞬态、扩展可序列化等。每次异常的起源都很清楚,但就我而言,我无法找到它没有序列化的确切位置。

问:我应该如何调试这种异常?

斯卡拉:

B.scala:

例外是:

SinkFunction的实现是不可序列化的。该对象可能包含或引用不可序列化的字段。

我发现了两个可疑点:

  1. 的实例StringSink被传递到另一个文件中。
  2. 在 类中StringSink,它使用了stringList 它的 compagin 对象的静态变量。
0 投票
1 回答
75 浏览

apache-flink - 1.4 Apache Flink DataStream 上是否有等效的 foldLeft 运算符

我有一个这样的流设置,使用 Apache Flink 1.4 从 DataStream 然后 keyBy 然后 window 然后聚合

聚合操作的输出是一个 AggregateResult 对象,因此它现在结束的流是 SingleOutputStreamOperator 类型

我接下来想做的是相当于一个 scala foldLeft。是否有提供该功能的运营商。

谢谢

0 投票
2 回答
491 浏览

apache-flink - Flink CEP 贪心匹配

我正在和 Flink CEP 贪婪的算子打架。

鉴于以下java代码:

我想看看:只发出“5 5 5 5 6”

但是,它匹配“5 5 5 5 6”、“5 5 5 6”、“5 5 6”、“5 6”

如果我做:

但是,(因此提供不同的起始匹配)Greedy 运算符通过发出“3 5 5 5 5 6”按预期工作。

是否有可能让一个贪婪的匹配器在没有不同起始模式的情况下抓取所有匹配项?

还是我错过了什么?

斯蒂芬