问题标签 [event-stream-processing]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
112 浏览

wso2 - 如何处理传入事件并根据事件中的字段,使用 wso2 写入不同的流?

我正在尝试使用单个流,处理传入的 json 格式并根据事件中的属性写入不同的流。例如,如果输入流包含如下内容:

另一个事件如下所示:

这两个事件都被推送到一个 http 端点(这是我面临的一个限制)

如何使用单个 http 源流、处理这些事件以及是否event_type插入temperaturetemperature_collectionmongo db 中以及是否event_type插入location到 mongo db 中的 location_collection?

  1. 是否可以使用单个流来执行此操作?

  2. 如果不是,我怎样才能避免编写多个端点,每个事件类型一个?

0 投票
2 回答
686 浏览

error-handling - Apache Flink - “keyBy”中的异常处理

由于代码中的错误或缺乏验证,进入 Flink 作业的数据可能会触发异常。我的目标是提供一致的异常处理方式,我们的团队可以在 Flink 作业中使用,不会导致生产中的任何停机。

  1. 重启策略似乎不适用于这里:

    • 简单的重启不会解决问题,我们会陷入重启循环
    • 我们不能简单地跳过事件
    • 它们可能对 OOME 或一些暂时性问题有好处
    • 我们不能添加自定义的
  2. “keyBy”函数中的 try/catch 块并不能完全帮助:

    • 处理异常后无法跳过“keyBy”中的事件

示例代码:

我希望能够跳过导致“keyBy”问题的事件的处理以及应该只返回一个结果的类似方法。

0 投票
1 回答
97 浏览

wso2 - WSO2 SP 中的“Streamingml 不存在扩展”错误

当我尝试#streamingml在 WSO2 SP 中使用扩展时,我收到扩展不可用的错误。
但是,我已确保将siddhi-execution-streamingml-1.0.15其下载并放置在/lib目录中。

我在 Ubuntu 16.04 上将其作为 docker 服务运行。我已经下载了扩展并将其放在 /lib 目录下。

0 投票
1 回答
104 浏览

wso2 - 当尝试使用 JMeter 生成的事件流执行 Siddhi APP 时,RAM 使用失控

当尝试使用 JMeter 模拟事件流并将其用作 siddhi 上的源时,它会工作一段时间,但最终会导致 RAM 被过度使用并且程序的执行停止。

我尝试在没有数据库的情况下使用数据库执行代码,使用分区来一一获取事件。

这是流代码:

这是接收器流:

这是从一个流复制到另一个流所执行的查询:

预期的结果是 wso2worker 将显示所有事件都已处理并插入到接收器流中。在 JMeter 上,我模拟 1 个用户在 1 小时内引入 6000 个事件,看起来内存最终过度使用并且模拟停止。尝试使用分区和内存使用率提高了很多,但仍然以失败告终。我能想到的是这是一个编码问题,但似乎找不到任何可能导致这种情况的东西。

//对不起英语不好,不是我的第一语言//

0 投票
1 回答
305 浏览

javascript - 如何在没有缓冲区的 javascript 中编写异步流生成器

我们希望将来自生成器的流数据事件消耗到转换/转换管道中。

生成器的一种形式如下

假设有一个正在产生值的数据流。有一个工厂函数可以使用处理函数

句柄函数将提供一个表示从流中生成的值的有效负载。

对于流,我们需要提供回调。对于生成器,我们需要在调用处理程序时解析承诺。我们希望能够编写如下代码:

这是一个工厂函数,它产生

  • 发电机
  • 回调

需要将回调传递到流中。生成器需要传递到转换管道。

但是这个声明是不对的。我们不想在 Promise 的每次迭代中定义函数,而是希望在每次迭代中使用该函数来解决 Promise。

基本问题是如何在 promise 中定义回调,以便流可以耦合到生成器。

一种解决方法是在流和生成器之间使用缓冲区。

有没有更简单的方法可以在没有缓冲区的情况下实现这一点?

0 投票
1 回答
53 浏览

java - 使用 Esper java 的订购活动

我正在开发物联网平台,我需要做的是保留事件顺序。例如,有时网络速度慢可能会导致以前的事件在较新的事件之后到达的问题。在这种情况下,我需要在执行某些逻辑之前使用时间戳对事件进行重新排序。

我想我可以在我的项目中使用 Esper,但还不确定。我对埃斯珀真的很陌生。

所以我想知道 Esper 是否适合我的项目。有没有使用 java 使用 esper 订购事件的示例?

谢谢你。

0 投票
1 回答
410 浏览

haskell - 在haskell中处理事件流

我想处理通过 MQTT 接收到的事件流。我正在使用的库使用回调来提供结果。我正在做的处理取决于先前的状态,而不仅仅是最新的事件。此外,将来可能会从其他来源收集事件。

起初,我决定将其编入列表中,这听起来是个好主意。我有一个小问题,因为 IO 阻止了延迟评估并且等待无限流可能很长,但我通过交错 IO 解决了它。

stream :: IO [Event]允许我做一些不错的事情,比如foldl, foldM map, mapM, 等等......不幸的是,使用这种方法我宁愿不能组合两个流,因为那里没有更多的锁定功能。

我正在挖掘许多库,例如发现带有 TQueue 的 STM。不幸的是,这不是我真正想要的。

我决定创建自定义类型并制作它,Foldable这样我就可以折叠它。我因为IO失败了。

Whih 可以像这样使用main = foldStream (\x _ -> print x) () =<< events

是否可以像使用常规列表一样实现一些基类以使用此流?

0 投票
1 回答
220 浏览

wso2 - 在 siddhi 中合并来自两个流的属性

我的意图是在 siddhi 中合并来自两个流的属性。

我正在使用带有 window 属性的“join”来合并我的 siddhi 查询中的两个流,并将连接的结果输入到另一个流中以丰富它。

当传入事件以 1 秒或更长的时间间隔出现时,窗口属性(window.time(1 sec) 或 window.length(1))效果很好。

当(例如 10 或 100 个)事件同时(在一秒钟内)发送时。那么连接的结果不是预期的。

请让我知道是否有任何方法可以合并两个流,即使一次(在一秒钟内)发送的事件数量很大,也必须对传入事件进行唯一处理/处理。

0 投票
2 回答
114 浏览

wso2 - 如何使用接收器类型为“http-response”的另一个流的响应更新流

我正在尝试使用通过“http-response”响应接收器填充的附加属性来丰富我的输入流。

我尝试使用带有 window 属性的“join”和“every”关键字来合并两个流并将生成的合并流插入另一个流以丰富它。

当传入事件以 1 秒或更长的时间间隔出现时,窗口属性(window.time(1 sec) 或 window.length(1))和“every”关键字效果很好。

当(例如 10 或 100 个)事件同时(在一秒钟内)发送时。然后合并的结果不是预期的。

具有“window”属性的那个(join)

**

**

带有“every”关键字的那个

**

**

有没有更好的方法来合并两个流以更新第三个流?

0 投票
1 回答
455 浏览

cassandra - RethinkDB 是否适合通用实时聚合平台?

我需要你的帮助来验证 RethinkDB 是否适合我的用例。

用例

我的团队正在构建一个通用的实时聚合平台,它需要:

  • 加入来自许多 Kafka 主题的数据
  • 需要对原始数据进行连接
  • 主题具有相同的键
  • 主题中的数据有时是“快照”(可更新),有时是“事件”(不可更新)
  • 连接数据的目的地将是一些分析型 OLAP DB。Clickhouse、Druid 等。视情况而定。这些系统与“增量”(SCD)一起工作。因为“快照”,我需要有状态的处理。
  • 最多可在 7 天后更新快照
  • 主题接收大约 20k msg/s,峰值高达 200k msg/s
  • 主题中的数据是从 100 字节到 5kB 的 json
  • 主题中的数据可以有重复
  • 重复项使用“版本” json 字段进行重复数据删除,该字段是每个主题的一部分。仅当 new_version > old_version 时才应处理数据。或者如果 old_version 不存在。

我已经有了一个包含五个阶段的 Cassandra 的 POC:

  1. Cassandra Inserter - 使用来自所有 Kafka 主题。仅对同一个 Cassandra 表中的所有主题进行插入。分片是在作为所有 Kafka 主题的键的列上完成的。因此,具有相同密钥的所有消息最终都在同一个分片中。
  2. 对于每个 Cassandra 插入,都会向 Kafka 生成一个 InsertEvent
  3. Delta 计算器 - 使用 InsertEvents 并通过分片键查询 Cassandra。获取所有原始数据,然后删除重复数据并创建增量。状态保存在另一个 Cassandra 集群中。通过保存所有已处理的“版本”。下次有新的 InsertEvent 出现时,我们使用保存的状态“版本”只获取两个事件:previous 和 current,因此我们可以创建一个 DeltaEvent
  4. DeltaEvent 生成到 Kafka
  5. ClickHouse / Druid 摄取数据

所以它基本上是一个 50/50 的插入/读取工作负载,没有更新 Cassandra。

使用 14 个 Cassandra 数据节点和 8 个状态节点节点,它可以在高达 20k InsertEvent/s 的情况下正常工作。在 25k InsertEvent/s 时,系统开始滞后。节点有 16GB 内存,磁盘是由 SSD 支持的网络存储(我知道这并不理想,但现在无法更改)。网络 10 Gbit。

重新思考数据库的想法

我想做一个新的 POC 来尝试 RethinkDB 并使用 changefeeds 创建增量和重复数据删除。为此,我将使用一张桌子。主键/分片键将是 Kafka 键,来自具有相同键的所有主题的所有 Kafka 数据将在单个文档中加入/更新。

工作量可能是 10/90 插入/更新。我会使用 squash: true,以避免过度读取并减少 DeltaEvents 的数量。

  1. 你认为这是 RethinkDB 的一个很好的用例吗?
  2. 它会扩展到 200k msg/s,即 20k insert/s、180k update/s 和大约 150k/reads 通过 changefeeds?
  3. 我需要删除超过 7 天的数据,这将如何影响插入/更新/查询工作量?
  4. 你有一个更适合这个用例的系统的建议吗?

非常感谢,达沃尔

PS:如果你喜欢阅读文档,这里是:RethinkDB use case question