问题标签 [stream-processing]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
863 浏览

java - java中的任何集合或数据结构都是非阻塞的

在事件处理中,一个函数将值放入一个集合中,另一个函数从同一个集合中删除。这些项目应该按照它们从源(套接字)接收到的顺序放置在集合中,并以相同的方式读取,否则结果会改变。

队列是大多数人推荐的集合,但同时,队列在添加项目时被阻塞,因此其他功能必须等到添加完成,这使其效率低下,并且操作延迟会随着时间的推移而增加。

例如,一个线程从一个队列中读取,而另一个线程写入同一个队列。任何一个操作都在队列上一次执行,直到它释放一个锁。是否有任何数据结构可以避免这种情况。

0 投票
1 回答
881 浏览

twitter - backtype.storm & org.apache.storm & com.twitter.heron 包中的类之间的差异

我想为 apache heron 编写一些自定义调度程序,并且我正在深入研究源代码。我注意到在苍鹭源代码中有几个具有相似类的包。例如backtype.storm&中的大多数类org.apache.storm都是相似的(完全相似,因此内部代码相同)。这两个包之间也有一些相似的类com.twitter.heron(例如com.twitter.heron.api.tuple.Fields),但其中一些在内部有不同的代码(例如 Fields 类)。我知道在编写拓扑时,我们可以导入我们想要的每个包,我们可以在其中任何一个之间进行选择,但我很好奇它们之间的差异以及它们为什么将所有这些包放在一起。并没有合并它们?如果风暴类是编写拓扑的唯一选择,那么什么是类com.twitter.heron包好用吗?

我知道 heron 被设计为与storm完全向后兼容,这可能是因为向后兼容问题,但我不得不承认这让我很困惑,因为我需要在这些类中编写自己的代码,而我不知道如何选择哪一个,哪个是开发人员不断开发和维护的,我应该选择它们作为修改的候选者。

提前致谢。

0 投票
0 回答
67 浏览

opencl - 如何在 OpenCL 中使用最少的 LOC 使 AMD GPU 达到接近 100% 的工作负载?

出于好奇,我想使用 OpenCL 将我的 HD5750 GPU 施加到几乎 100% 的工作负载,我将如何以最少的努力/代码来做到这一点?显然HD5750有720个流处理器,所以我想我必须在所有内核上做一些并行计算循环?

0 投票
2 回答
2264 浏览

apache-kafka - kappa-architecture 和 lambda-architecture 有什么区别

如果 Kappa-Architecture 直接对流进行分析,而不是将数据分成两个流,那么在 Kafka 这样的消息系统中,数据存储在哪里?还是可以在数据库中进行重新计算?

单独的批处理层是否比使用流处理引擎重新计算进行批处理分析更快?

0 投票
0 回答
39 浏览

apache-kafka - Apache Storm 在 Kafka-topic 中按顺序写入数字

我想用kafka和storm做一些性能测试。在我的 WordCount 测试中,“输出”主题中出现了一个顺序数字:

它应该看起来像:

我的风暴拓扑如下所示:

我究竟做错了什么?有人有线索吗?谢谢!

0 投票
2 回答
1785 浏览

google-cloud-dataflow - 如何在 Apache Beam 中“限制”PCollection?

我有一个似乎很常见的问题,但我无法弄清楚 Beam 推荐的解决方案是什么。

我有一个原始事件流,我正在寻找两个单独的事件来满足滑动窗口(60 分钟)内的条件,以便它“触发”警报。

这很容易做到SlidingWindows,但是问题在于它的滑动性质,我有效地在多个窗口中获得了该警报。我如何最终获得仅输出一次此类警报的 PCollection(在特定时间范围/冷却持续时间内)?

我首先认为最近的状态处理功能将是我的解决方案,但后来意识到它只能在窗口内工作。侧面输入也是如此。所以在我看来,我需要一种打破窗户并在一个(可能的会话)窗口中处理警报“触发”的方法。但是文档没有提到任何有效地将元素重新分配给新窗口的方法

0 投票
1 回答
91 浏览

elasticsearch - 如何在 Elasticsearch 中合并旧数据以节省空间

我试图找到有关此的信息,但我没有找到我要找的东西。

我每分钟将指标存储在 Elasticsearch 数据库中。我的想法是频率只在短时间内很重要。

例如,我想在过去一周的每一分钟都有我的指标,但是我想合并这些指标,以便在过去的每个星期只有一个指标文档。

因此,我有一个想法可以使用诸如 Spark 流或 Flink 之类的流处理框架来实现这一点,但我的问题是:在 Elasticsearch 中是否有一种本地方式/工具/技巧来实现它?

谢谢,希望我的问题足够清楚,否则请发表评论以获取更多详细信息。

0 投票
1 回答
102 浏览

scala - 如何使用 Apache Flink 会话化流?

我想会话这个流:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5, ...这些会议:

我编写了 CustomTrigger 来检测流元素何时从 1 变为 2(2 变为 3、3 变为 0 等等),然后触发触发器。但这不是解决方案,因为当我处理 2 的第一个元素并触发触发器时,窗口将为 [1,1,1,2] 但我需要在 1 的最后一个元素上触发触发器。

这是我的自定义触发器类中我的 onElement 函数的伪代码:

我怎么解决这个问题?

0 投票
1 回答
38 浏览

scala - 在源到达其边界后,到达服务器实现流处理的消息会发生什么?

我正在学习 akka 流,但显然它与任何流框架相关 :)

引用akka文档:

Reactive Streams 只是定义了一种通用机制,用于在不丢失、缓冲或资源耗尽的情况下跨异步边界移动数据

现在,据我了解,如果直到流之前,让我们以 http 服务器为例,请求将会到来,并且当接收者完成一个请求时,因此即将到来的新请求将被收集在一个缓冲区中将保留等待的请求,然后存在这个缓冲区大小未知的问题,如果服务器过载,我们可能会丢失正在等待的请求。

因此,流处理开始发挥作用,他们将此缓冲区限制为可控......所以我们可以预定义我们想要排队的消息(在我的示例中为请求)的数量,并且我们可以一次处理每个消息。

我的问题,如果我们在我们的服务器中实现一个源最多可以有 3 条消息,那么如果第 4 个 id 来了会发生什么?

我的意思是当另一台服务器打电话给我们并且我们已经处理了 3 个请求时……他的请求会发生什么?

0 投票
1 回答
312 浏览

iot - 从云网关摄取物联网数据的方法

我想听听您对 IoT 数据摄取案例的见解。在 AWS IoT 中心,事物影子是物理影子的虚拟表示。我从下图中了解到,每当事物通过消息代理向平台发送数据时,事物影子和规则引擎部分都会同时获取相同的传感器数据并进行处理。

我的结论正确吗?

  • 事物影子系统订阅消息代理并获取传感器数据,更新其影子演员。影子端还负责存储传感器数据,例如事件溯源机制。
  • 事物影子系统不执行任何规则,它仅用于执行事件溯源并在虚拟事物参与者中保持最后已知状态。
  • 同样的传感器数据也是规则引擎的入站数据。规则引擎只是处理流数据并决定如何处理流数据的 ECA(事件条件操作)类型系统。这意味着每个传入的数据最终都将在规则引擎部分进行处理。

https://paolopatierno.wordpress.com/2015/10/13/an-iot-platforms-match-microsoft-azure-iot-vs-amazon-aws-iot/