问题标签 [flink-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 用于 CEP 示例的简单 Scala API 不显示任何输出
我正在编写一个简单的示例,用于在 Flink 中测试 CEP 的新 Scala API,使用 1.1-SNAPSHOT 的最新 Github 版本。
Pattern 只是对值的检查,并为每个匹配的模式输出一个字符串作为结果。代码如下:
它在 1.1-SNAPSHOT 下编译和运行没有问题,但 jobmanager 输出没有显示 print() 的迹象。即使放宽模式条件,仅设置“开始”(接受所有事件)也绝对不会返回任何内容。
此外,在尝试添加阶段时,代码无法编译。如果我将模式更改为(查找第三个字段小于 4 的两个连续事件):
然后编译器抛出:
显示错误是在“开始”阶段之后的第一个 where() 上。我尝试使用以下方法显式设置参数类型:
这样它会再次编译,但是当它在 Flink 上运行时,不会显示任何输出。StreamingAlert 是一个 Scala DataStream[(String, Long, Int)],在代码的其他部分,我可以_._ < 4
毫无问题地进行过滤,并且输出看起来是正确的。
scala - 如何在 Flink 中建立一个 1 小时的回放流缓冲区?
我想动态保留最后 1 小时事件的缓冲区。该缓冲区应该给我一个重播功能,以便可以对最后一小时的数据执行查询。Flink 中是否已经实现了一些东西?还是我需要自己构建它?
我尝试使用 Window API,但似乎 Flink 没有给我一个向前移动的固定宽度时间窗口。
batch-file - Flink CEP Streaming:批处理模式,连续还是微批处理?
我正在尝试理解这个 Flink CEP 示例
https://github.com/tillrohrmann/cep-monitoring ..我在分布式模式(1 个主设备和 3 个核心)上执行了 Flink CEP 的这个示例。现在我正在将输出写入文件,因此我的输出被写入3 个文件,因为使用 3 个核心。在其中一个文件中,我看到记录如下:
现在,如果我们从第 5 行开始查看。我们可以检查从第 5 行打印的所有TemperatureAlerts(即温度:113.65291115679136),我们可以在 TemperatureWarning 中识别以下温度(温度:113.65291115679136 出现在第 15 行)意思是我们可以识别哪些温度,警报已被打印....但是从第 1 行到第 4 行生成的警报呢?您甚至可以在行号中找到相同的记录。11 ..我的意思是我们如何识别已生成警报的温度警告?它是以批处理模式、连续模式还是微批处理模式执行流式传输?
apache-flink - 运行训练示例“写入 Apache Kafka”时遇到错误
我从 Apache Flink 的动手示例中复制了示例代码,并尝试让它运行。代码如下:
我在本地安装并运行了 Apache Kafka。我收到以下错误!
任何帮助表示赞赏!
我为 Kafka 连接器使用了以下两个依赖项之一。
或者
apache-flink - Flink 中两个 DataStream 的高效 zip(以位置作为隐式键连接)
我有兴趣使用 Apache Flink 有效地将两个数据流压缩在一起(但同样的问题也可能适用于数据集)。
作为一个例子(使用Scala符号)我有
我想获得
连接是使用流内数据的位置隐式进行的(非正式地:)combined(i) = (names(i), ages(i))
。我可以通过向每个流添加一个“位置”字段然后使用位置作为键将它们连接在一起来获得这一点,但这非常低效。
有没有更好的方法来做到这一点?谢谢!
apache-kafka - Flink Kafka Consumer 在使用 DataStream 键时抛出空指针异常
我正在使用这个Flink CEP示例,我在其中分离出数据,因为我创建了一个应用程序,该应用程序正在将应用程序发送到 Kafka 和另一个从 Kafka 读取的应用程序......我为类TemperatureWarning生成了生产者,即在 Kafka 中,我正在发送数据与TemperatureWarning相关 以下是我的代码,它正在使用来自 Kafka 的数据......
但是当我执行这个应用程序时,它会抛出以下异常:
以下是我的课TemperatureWarning:
}
以下是我的课程MonitoringEventSchema:
{
}
现在执行keyBy操作需要什么,因为我已经提到流分区所需的密钥?这里需要做什么来解决这个错误??
twitter - Apache Flink vs Twitter Heron?
比较 Flink vs Spark Streaming、Flink vs Storm 和 Storm vs Heron 有很多问题。
这个问题的起源是因为 Apache Flink 和 Twitter Heron 都是真正的流处理框架(不是微批处理,如 Spark Streaming)。Storm 已于去年被 Twitter 停用,他们改用 Heron(这基本上是 Storm 的重新设计)。
Slim Baltagi 有关于 Flink 和 Flink vs Spark 的精彩演讲: https ://www.youtube.com/watch?v=G77m6Ou_kFA
Ilya Ganelin 对各种流媒体框架进行了很好的研究: https ://www.youtube.com/watch?v=KkjhyBLupvs
关于 Flink 与 Storm 的非常有趣的想法: Flink 和 Storm 之间的主要区别是什么?
但我还没有看到任何新的 Storm/Heron 与 Apache Flink 的比较。
这两个项目都很年轻,都支持使用以前编写的 Storm 应用程序和许多其他东西。Flink 更适合 Hadoop 生态系统,Heron 更适合基于 Twitter 的生态系统堆栈。
有什么想法吗?
java - Logstash vs Apache Flink 用于实时数据记录
我正在构建一个仪表板,其中提要由 Java 应用程序实时提供。我想利用 Elasticsearch 和 Kibana 来分析这些提要。但是我很困惑是应该使用 Logstash 还是 Apache Flink 来记录数据,还是应该直接将数据记录到 Elasticsearch 中?
java - Apache Flink,键入具有相似字段字符串值但不相同的两个数据流
我完全绝望了!
我正在将 apache flink 与 java 一起使用,我想知道是否可以修改 keyby 方法以便通过相似性而不是确切名称来键入?
我有两个不同的数据流,我正在做一个联合。在第一个流中,我想要 KeyBy 的字段名称是“John Locke”,而在第二个数据流中,字段值是“John L”。
我有一个算法,可以在一些不同的字符串之间给我一个分数。我的想法是:例如,如果两个字符串之间的分数高于 0'80,那么这两个字符串将被认为是相同的,当我应用 keyby("name") 时,那些相似的字符串将被键入,因为它们具有精确的一样的名字。
视觉示例:
datastream1----- John Locke、Mickey Micke、Will Williams
satastream2----- 米奇 M.、约翰 L.、安东尼布朗
数据流 d3=datastream1.union(datastream2)
d3.key 按分数/相似,而不是确切的名称。
希望大家理解,谢谢!
java - 使用基于计数的窗口加入两个流
我是 Flink Streaming API 的新手,我想完成以下简单 (IMO) 任务。我有两个流,我想使用基于计数的窗口加入它们。我到目前为止的代码如下:
我的代码可以正常工作,但不会产生任何结果。事实上,对apply
方法的调用永远不会被调用(通过在调试模式下添加断点来验证)。我想,前面的主要原因是我的数据没有时间属性。因此,窗口(通过 实现window
)没有正确完成。因此,我的问题是如何表明我希望我的加入基于计数窗口进行。例如,我希望连接从每个流中实现每 100 个元组。前面的在 Flink 中可行吗?如果是,我应该在我的代码中更改什么来实现它。
在这一点上,我必须通知您,我尝试调用该countWindow()
方法,但由于某种原因,Flink 的JoinedStreams
.
谢谢