问题标签 [stream-processing]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
284 浏览

java - 用于增量数据的 Flink 或 Spark

我没有使用Flinkor的经验Spark,我想将其中一个用于我的用例。我想介绍我的用例,并希望了解这是否可以用其中任何一个来完成,如果他们都可以做到,那么哪一个效果最好。

我有一堆实体A存储在数据存储中(准确地说是 Mongo,但这并不重要)。我有一个 Java 应用程序,它可以加载这些实体并在它们上运行一些逻辑以生成某种数据类型的 Stream E(100% 清楚我没有Es任何数据集中,我需要在我之后用 Java 生成它们As从数据库加载)

所以我有这样的东西

数据类型E有点像 Excel 中的长行,它有一堆列。我需要Es像在 Excel 中那样收集所有数据并运行某种数据透视聚合。我可以看到我如何在Spark或中轻松做到这一点Flink

现在是我无法弄清楚的部分。

想象一下,其中一个实体A1被更改(由用户或进程),这意味着所有的Esfor都A1需要更新。当然,我可以重新加载我的所有As,重新计算所有Es,然后重新运行整个聚合。我想知道这里是否可以更聪明一点。

是否有可能只重新计算EsforA1并进行最少的处理。

因为Spark是否有可能在RDD需要时保留并仅更新其中的一部分(这里是Esfor A1)?

对于Flink,在流式传输的情况下,是否可以更新已经处理的数据点?能处理这种情况吗?或者我是否可以为's old生成负面事件(即从结果中删除它们)然后添加新事件?A1Es

这是一个常见的用例吗?这甚至是FlinkSpark旨在做的事情吗?我会这么认为,但我也没有使用过,所以我的理解非常有限。

0 投票
4 回答
17940 浏览

scala - Akka Stream Kafka vs Kafka Streams

I am currently working with Akka Stream Kafka to interact with kafka and I was wonderings what were the differences with Kafka Streams.

I know that the Akka based approach implements the reactive specifications and handles back-pressure, functionality that kafka streams seems to be lacking.

What would be the advantage of using kafka streams over akka streams kafka?

0 投票
1 回答
2443 浏览

apache-flink - Apache Apex 与 Apache Flink

由于两者都是一次处理事件的流式框架,这两种技术/流式框架之间的核心架构差异是什么?

此外,有哪些特定用例比另一种更合适?

0 投票
2 回答
4222 浏览

java - Camel-Kafka 组件不工作错误:“因为必须配置代理”

使用 Apache Camel(版本 2.19.1)的 kafka 组件时出错,我只是想打印主题中的传入消息,我的管道是如此组合:

尝试在端点中使用和不使用“//”。

我得到的是:

我正在尝试解决这个问题,但我真的不明白问题出在哪里,我的 kafka 集群是一个单独的代理,并且一切都已启动并正在运行(动物园管理员和服务器),请寻求帮助

0 投票
0 回答
463 浏览

java - JDK8+ java.util.Base64 wrap() 补充方法

从JDK8开始,增加了成熟的java.util.Base64 API。

流式处理 InputStream(s) 和 OutputStream(s) 的唯一方法分别是:

  • Base64.get*Decoder().wrap(is)
  • Base64.get*Encoder().wrap(os)。

如何以有效的方式使用Java API对 InputStream 进行编码或解码和 OuputStream ?

这些方法没有添加到 Java API 中是否有原因?

例如,此功能存在于net.iharder.base64中,但我正在迁移到 Java API 以消除这种依赖关系。

0 投票
1 回答
26 浏览

distributed-computing - 现实生活中的分布式流应用(图和事务)

我目前正在研究分布式流处理系统,例如 Storm、Flink 和 Spark Streaming。我想在这些系统中实现一些应用程序并简要比较它们。不知道有没有公司使用这些系统来处理以下情况,数据流的规模是多少。

  1. 一个大图可能分布到多台机器的图,我们处理一些更新(添加或删除顶点或边)和图上的查询。到目前为止,我只能在单机上找到一些流图算法。

  2. 必须进行一次性消息传递的事务。S-Store 中存在排行榜维护基准(Meehan, John 等人。“S-store:流式处理满足事务处理。”VLD​​B Endowment 8.13 (2015): 2134-2145 的会议记录)但我找不到它们是如何生成的输入数据。

0 投票
0 回答
53 浏览

amazon-web-services - 如何在 Kinesis 分析中填充没有数据的时间窗口

当流中没有数据时,有什么方法可以触发运动分析?尽管有数据还是没有数据,但还是像定期运行一样?

我有一个来自传感器的数据流,如下所示:

reading我的用例是在 1 分钟的桶中实时计算 a 为 1 的秒数。

拜托,我打算只使用 Kinesis Analytics SQL 或 AWS 工具,而不是像 drools 或任何其他初创公司这样的第三方服务。

0 投票
1 回答
1928 浏览

scala - akka 流 asyncBoundary 与 mapAsync

我试图理解 和 之间的asyncBoundary区别mapAsync。乍一看,我想它们应该是一样的。但是,当我运行代码时,它的性能看起来asyncBoundarymapAsync

这是代码

输出:异步边界总是比 mayAsync 更快地完成。

从关于 asyncBoundary 的文档描述(https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html),我可以看到它正在运行在不同的 CPU 上,但 mapAsync 使用 Future 是多线程的。Future 也是异步的。

我可以要求更多关于这两个 API 的说明吗?

0 投票
0 回答
215 浏览

apache-kafka - 疑难解答 onyx-kafka 不写入主题。如何在 docker swarm 中运行 kafka。设置运行时卷大小 (/dev/shm) 时出错?

我正在尝试i)对一个简单的onyx-kafka作业进行故障排除,而不是写入主题。此处提供了更多详细信息。您可以在这个示例项目中尝试一下。我认为原因是因为只有一个 kafka 节点。

所以我尝试了ii)以 swarm mode启动运行 docker (17.09.0-ce, build afdb6d4)的kafka confluentinc/cp-kafka:3.3.1(with zookeeper ) 。但后来我得到了这个错误。confluentinc/cp-zookeeper:3.3.1

A)使用 docker compose,我可以通过配置选项(请参阅此处/dev/shm)将已安装的磁盘配置为具有更多容量。shm_size: 1G

docker swarm 是否有等价物?我只需要控制磁盘卷的大小。deploy但我在 docker compose 的配置中没有看到这样的选项(请参见此处)。而且该docker service --mount 选项似乎无法解决问题。这里有什么选择吗?

B)我不太了解这里描述Memory的卷安装。所以不知道这是否是一个可行的选择。

C)感谢来自devops-engineersSlack 频道的 Andrew Mulholland (@itwasntandy),我可以通过--mount type=tmpfs,dst=/dev/shm,tmpfs-size=768000000作为选项传入来伪造它docker service create ...(参见此处)。

但是官方文档说不支持tmpfs选项。此选项仅适用于 docker compose 的服务配置。它不适用于 docker 的 swarm 模式。docker stack deploy

总结。那么有没有办法在 swarm 配置中将该选项放入我现有的 docker-compose.yml文件中?

我很想能够跑步docker stack deploy --compose-file docker-compose.yml my-app。详情在这里。示例项目在这里。最终,我想:

  • i)运行一个 docker 堆栈,其服务具有 onyx-kafka 写入主题(请参见此处)。为此,我想我需要...
  • ii)以 swarm 模式启动 docker (17.09.0-ce, build afdb6d4),使用图像 kafka confluentinc/cp-kafka:3.3.1(with zookeeper confluentinc/cp-zookeeper:3.3.1)。但我遇到了这个内存错误问题。
0 投票
0 回答
90 浏览

cassandra - 我们是否应该在无状态流处理中将数据存储在中间数据库中?

我正在研究一个研究案例,该案例包括为实时流处理问题提出技术架构。问题是一家运输公司想要近乎实时地跟踪其公交车上的速度和乘客数量。我提出的初始架构是这样的:

  1. 总线将数据实时发送到 MQQT 服务器
  2. Apache Kafka 通过 MQQT 连接器从该服务器获取数据
  3. 使用 Kafka Streams API 或 Spark 流计算“速度”和“乘客数量”
  4. “速度”和“乘客数量”的可视化。

我的问题如下

  • 架构,对吗?
  • 这种情况下的流处理问题,是无状态的吗?
  • 最后,我想知道在进行可视化之前是否必须将数据存储在像 cassandra 这样的中间数据库中?
  • 如果没有,是否有可以直接与运动中的流交互的开源可视化工具?