问题标签 [flink-cep]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
101 浏览

hadoop - 在 Hadoop 中使用流式处理

我正在尝试研究这个 Flink CEP示例。我确实看到在这个例子中,他们创建了一个应用程序(这是一种流应用程序),它正在生成和使用数据并对数据应用模式匹配。他们没有在两者之间放置流媒体层(如 Kafka)。到目前为止,单个应用程序足以满足此目的,这使得它非常优化。现在,我知道如果我使用 Kafka,那么我将需要 2 个应用程序;一个用于将数据摄取到 Kafka 主题中,另一个用于使用来自 Kafka 主题的数据..我有几个问题没有得到回答::

  1. 为什么他们在这个例子中没有使用任何流媒体层(比如 Kafka)?

  2. 何时何地需要流式传输?

  3. 参考 Flink CEP 示例,我想知道流层(如 Kafka/Kinesis)将在哪里以及如何发挥作用?

  4. 如果像 Kafka/Kinesis 这样的流媒体层介于两者之间,会有什么优点/缺点?

0 投票
2 回答
629 浏览

hadoop - Flink on YARN:错误使用 Amazon S3 而不是 HDFS

在 YARN 的设置文档上关注了 Flink。但是当我运行时./bin/yarn-session.sh -n 2 -jm 1024 -tm 2048,在通过 Kerberos 进行身份验证时,我收到以下错误:

我在./flink-1.0.3/conf/flink-conf.yaml中设置了以下属性

如何使用 HDFS 而不是 Amazon 的 S3?

谢谢。

0 投票
1 回答
3455 浏览

scala - scala - 无法解析符号映射

我正在编写一个简单的代码来CEP library在 Scala 中进行测试,maven 依赖版本1.1-SNAPSHOT。我的代码:

selecFn地图中以红色显示cannot resolve symbol Map 问题出在哪里?先感谢您。

0 投票
1 回答
728 浏览

apache-flink - 在 Flink 中动态创建新的数据流

我正在尝试根据数据库中可用的某些规则从一个原始数据流中创建新的数据流。一旦我开始 Flink 作业,之后,如果我需要根据数据库中更新的条目创建一个新的数据流,我该如何实现。所以这里我不想重启 Flink 作业,而是在运行时动态创建新的数据流并将其添加到执行 DAG 中。

0 投票
0 回答
258 浏览

scala - Flink:同步/连接两个流

我有两个流,第一个带有 CEP 库,它用它来检测模式开关。在第二个流中是我必须添加的值。我正在寻找连接两个流的方法,这样当我收到一个 OFF 帧时,我会返回设备状态更改,最终值是第二种帧的总和:

我的代码:

更新:我的结果:

第一个数字(24.2)来自另一个设备,它不应该出现......

知道什么是最好的方法吗?谢谢

0 投票
1 回答
254 浏览

scala - 我可以用 Flink CEP 做一个惰性匹配吗

我想使用 FlinkCEP 只对模式进行“惰性”匹配。我怎样才能做到这一点?例如,我有一个输入流 ACABCABCB,我想在 A followBy C 上进行匹配以仅获得 3 个匹配项而不是 6 个匹配项。

我创建了以下示例来说明我的问题。

这给了我以下输出:

MyAggregatedEvent(1,1=>1)

当我将模式更改为:

然后打印以下内容:

MyAggregatedEvent(1,1=>1)
MyAggregatedEvent(1,1=>2)
MyAggregatedEvent(1,2=>2)
MyAggregatedEvent(1,1=>3)
MyAggregatedEvent(1,2=>3)
MyAggregatedEvent(1,3 =>3)

如何创建一个只匹配每个事件一次的模式,这样我的输出将是:

我的聚合事件(1,1=>1)
我的聚合事件(1,2=>2)
我的聚合事件(1,3=>3)

0 投票
2 回答
3280 浏览

apache-flink - 如何为在同一独立集群上运行的不同 flink 作业指定不同的 log4j.properties 文件

我有多个在独立集群上运行的 flink 作业。我想要不同的日志文件用于不同的 flink 作业。那么如何在提交 flink 作业时传递不同的 log4j.properties 文件。

0 投票
1 回答
508 浏览

apache-flink - Apache Flink CEP 超时模式未由时间窗口定义

PatternTimeoutFunction当事件序列超出定义的时间窗口时,我正在使用它来丢弃它。

我将水印设置如下

当模式超时发生时,timeoutTimestamp应该等于第一个事件时间戳 + 时间窗口的值

但是在接收到下一个水印后会触发超时。收到下一个水印后是否触发超时或基于timeWindow到期?

0 投票
1 回答
341 浏览

apache-flink - #Apache-flink:数据管理用例

我正在尝试构建涉及大量数据摄取的数据管理(DM)解决方案,通过一些数据域规则,替换(丰富),在将错误数据发送到下游系统之前对其进行标记。规则检查和值替换可以是简单的东西,比如数据元素应该满足的允许阈值数值,也可以是更复杂的东西,比如使用域值池的主数据查找。

您认为 Apache Flink 可以成为此类处理的良好候选者吗?是否可以定义 flink 运算符来对流经它的每个元组进行查找(使用主数据)?我认为对于后一个问题使用 Apache Flink 有一些缺点 - 1)查找可能是一个阻塞操作,会降低吞吐量,2)如果操作员函数必须获取主数据,则无法执行检查点和持久化操作员状态从别处。

有什么想法?在上述用例中还有其他最好的工具吗?

谢谢

0 投票
2 回答
529 浏览

apache-flink - Flink CEP 不是确定性的

我在没有集群的情况下在本地运行以下代码:

我的数据是以下格式的 CSV 文件(用户,val):

我正在尝试检测模式 where 的事件event(val=1) -> event(val=2) -> event(val=3)。当我在大型输入流上运行此程序时,我知道流中存在一定数量的事件,我得到检测到的事件计数不一致,几乎总是少于系统中的事件数。如果我这样做env.setParallelism(1)(就像我在代码的第 3 行中所做的那样),则会检测到所有事件。

我假设问题是当并行度大于 1 时,多个线程正在处理来自流的事件,这意味着虽然一个线程具有event(val=1) -> event(val=2),event(val=3)可能会被发送到另一个线程,并且整个模式可能不会被检测到。

我在这里缺少什么吗?我不能丢失流中的任何模式,但是将并行度设置为 1 似乎违背了让像 Flink 这样的系统来检测事件的目的。

更新:

我尝试使用以下方法键入流:

虽然这可以防止不同用户的事件相互干扰:

这并不妨碍 Flink 将事件乱序发送到节点,这意味着非确定性仍然存在。