问题标签 [flink-cep]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - Flink CEP 中的序列匹配语义
Apache JIRA 中有一个问题:https ://issues.apache.org/jira/browse/FLINK-3703 。它允许在找到匹配序列后定义匹配算法应该在哪里继续,并且存在三种类型的行为:
from first
- 继续为未来的比赛保留所有事件(即当前行为)after first
- 在第一个元素之后继续(删除第一个匹配事件并继续第二个事件)after last
- 在最后一个元素之后继续(有效地丢弃匹配序列的所有元素)
例如,如果我们有一系列事件:A
、、和模式B
,我们将得到下一个结果:C
first -> second
from first
-A -> B
,A -> C
,B -> C
after first
-A -> B
,B -> C
after last
-A -> B
现在问题已结束,并附有一条评论,所有这些都在其他任务中实现:
这被其他问题所涵盖,包括 Quantifier 实现、skip-till-next 以及循环模式内连续性的一些其他问题。
那么,有人能告诉我如何使用 Flink CEP 库的当前 API 来实现定义类型的行为吗?
apache-flink - 在 Flink 执行计划 UI 中显示的命名运算符、源、接收器和模式
我只想给运营商和消息来源命名。举个例子
这是我从 Flink Dashboard 获取的执行计划的屏幕截图。在这里,我有 2 个 DataStreams 来源,然后我加入了它们。我的问题是,我可以将这些来源命名为EcgStream和Sp02 Stream例如并加入为Join1吗?
我问这个问题的原因是因为它使可视化更容易。另外,当我浏览 Opsclarity页面时,在页面末尾,他们提到了以下内容
请注意,task_name 和 operator_name 已被压缩,因此当跨任务和运算符聚合延迟时,我们仍然可以正确区分任务和运算符。但这些压缩后的名称与在 Flink UI 中看到的不匹配,它将显示一段 Scala 代码作为操作符名称。如果您需要这些名称在公制中有意义,您应该在应用程序的 Flink 代码中提供名称。此压缩值仅适用于那些非常长的默认名称,否则它们不会是合法的度量值。
我还有另一个问题,那就是当我为 CEP 制作一个模式时,执行计划 UI 只是将它显示为一个模式。有什么方法可以显示这种模式是什么样的 A B+ C?D. _ 此外,如果我们有多个模式,我们应该能够将其命名为Patterns{1..n}
apache-flink - 将 Flink 与 Datadog 集成
我有几个关于 Flink 和 Datadog 集成的问题。首先,问题在于 Datadog 使用 dogstatsD 而不是 Flink 文档中未包含的 statsD
另一个问题是,如果您访问 Datadog 的集成页面,则缺少 Flink 集成。我曾尝试安装石墨,但由于 python 3.6,我也遇到了一些问题,我也尝试了 virtualenv,但考虑使用 datadog,这也让我很难过。
apache-flink - 由于 ClassNotFoundException 导致 Flink Dashboard 1.3.2 版本无法执行 CEP 模式
我写了一个这样的简单模式
它在 IntellijIdea 中运行良好。我在仪表板和 IntelliJ-Idea 中都使用 Flink 1.3.2。当我从源代码构建 Flink 时,我看到了很多警告消息,这些消息让我相信迭代条件类没有包含在 jar 中,因为错误也说ClassNotFoundException
。下面是错误
apache-flink - 在 Flink 中加入超过 2 个流并将 CEP 应用于超过 2 个流而不加入它们
问题#1:我正在处理一个案例场景,我们需要融合来自多个传感器[例如 8 个传感器]的数据并将它们以树的形式连接起来。例如,加入 [s1,s2,s3 s4] 形成流 A,然后加入 [s5,s6,s7 和 s8] 形成流 B,然后对流 A 和 B 执行 CEP。我该如何实现?
问题#2:是否可以在多个流上执行 CEP,意味着不止一个流?在 flink 1.3.2 API中明确提到模式将应用于一个流
如果 Pattern 不能应用于多个流,那么 Flink CEP 将如何与 CEP 的经典示例一起工作,该示例涉及烟雾流和温度流,以在发生火灾时创建警报。
根据时间戳等键加入烟雾和临时流是唯一的解决方案吗?
那么如何将 Flink 应用到涉及多个传感器的 IOT 的广泛用例中呢?
apache-flink - 如何将 Datadog 中的指标与 Flink 中的执行计划运算符相关联?
在我的情况下,Flink 将指标发送到 Datadog。Datadog 主机图如下所示{我不知道为什么在这里显示延迟}
Flink 指标被发送到 localhost。这里的问题是,当
flink-conf.yaml
文件配置如下
问题是 Datadog 显示了 163 个我不理解的指标,我稍后会解释
我不理解 datadog 中的指标格式,因为它向我显示了类似这样的指标
现在如上图所示
- 延迟以时间表示
- 每秒事件数是事件/秒
- 计数是一些值
所以我的问题是这是哪个指标?
另外,我工作的执行计划是这样的
如何将 Datadog 中的指标与 Flink 中的执行计划算子联系起来?
我在 Flink API 1.3.2中读到我可以使用标签,我尝试在 flink-conf.yaml 文件中使用它们,但我不知道它们在这里有什么意义。
在这种情况下,我的最终目标是在每个操作员处找到操作员延迟、输出和输入/秒的记录数
java - java - 如何使用睡眠时间在java中生成超过1000个事件/秒
我有一个为 Flink CEP 生成事件的生成器,代码如下。基本上,我正在使用Thread.sleep()
并且我在某处读到即使我们使用 java 的睡眠时间也不能少于 1 毫秒System.nanoTime()
。生成器的代码是
我将在这里用简单的话来说明我的要求。我希望生成器类生成事件,比如说 1200 Hz 的 ECG 流。该生成器将接受我们必须生成流的输入速率和总时间等参数。
到目前为止一切顺利,问题是我需要每秒发送超过 1000 个事件。如何通过使用生成值的生成器函数来做到这一点U[10,20]
?
另外,如果我在上面使用错误的方式生成 x 个事件/秒,请告诉我。
提前致谢
apache-flink - 是否可以在 KeyedStream (Apache Flink) 中为每个键生成水印?
我正在实现一个用例,其中不同的物理设备正在发送事件,并且由于网络/电源问题,在 flink 源接收事件可能会有延迟。flink 作业中的运算符之一是 Pattern 运算符,并且有某些模式是时间敏感的,所以我使用 Event time 特性。但是,当来自特定设备的事件发生不可预测的延迟时,就会出现问题,这会导致这些事件被丢弃(因为我无法真正定义允许延迟的静态绑定)。
由于我使用的是基于源设备 ID 的 KeyedStream,因此有一种方法可以允许每个 CEP 操作员实例(每个键一个)根据相应流分区中的事件时间来推进其时间。或者换句话说,有没有办法在 KeyedStream 中为每个分区生成水印?
java-8 - FLINK 中如何处理超大窗口?
现在我有一个要求汇总某人过去 3 个月的操作。然后将结果放入 ML 模型以获得异常。
考虑到系统的巨大流量,这是一个非常大的窗口。
我该如何处理这种情况?
apache-flink - 动态限制 flink kafka 源
我们正在使用多个 kafka 主题,但希望优先考虑其中一些主题(〜服务质量)。
根据我在网上找到的内容,共识是不限制操作符,而是限制源,更具体地说是反序列化器 [1]。
我们如何在源中访问有关流环境状态的信息(即主题落后于当前偏移量的程度)。
目前,我们计划将我们的整个设置转换为 CoFlatMaps [2],并有一个控制流为所有主题发出当前的偏移滞后 - 低优先级流操作符然后根据高优先级流的滞后休眠。
你将如何解决这个问题?Tl; dr:有没有办法在任务管理器的源/反序列化器之间共享信息?