问题标签 [event-stream-processing]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
22 浏览

complex-event-processing - 领域数据/参考数据实现?

相当简单的问题,但我无法通过文档找到解决方法。

我对可以比较所有事件的一组核心参考数据感兴趣。在这个简单的例子中,有一些分段上下文讨论了 nodeID——但这意味着 nodeID 的不同值,因为它们进入分叉分区,然后可以在以后使用(例如聚合)。

我想知道是否有任何方法可以通过提要预填充或预注册整个分区过程 - 并避免冗长的启动过程为我的域中的每个不同 ID 提供事件

0 投票
1 回答
312 浏览

iot - 从云网关摄取物联网数据的方法

我想听听您对 IoT 数据摄取案例的见解。在 AWS IoT 中心,事物影子是物理影子的虚拟表示。我从下图中了解到,每当事物通过消息代理向平台发送数据时,事物影子和规则引擎部分都会同时获取相同的传感器数据并进行处理。

我的结论正确吗?

  • 事物影子系统订阅消息代理并获取传感器数据,更新其影子演员。影子端还负责存储传感器数据,例如事件溯源机制。
  • 事物影子系统不执行任何规则,它仅用于执行事件溯源并在虚拟事物参与者中保持最后已知状态。
  • 同样的传感器数据也是规则引擎的入站数据。规则引擎只是处理流数据并决定如何处理流数据的 ECA(事件条件操作)类型系统。这意味着每个传入的数据最终都将在规则引擎部分进行处理。

https://paolopatierno.wordpress.com/2015/10/13/an-iot-platforms-match-microsoft-azure-iot-vs-amazon-aws-iot/

0 投票
1 回答
3040 浏览

error-handling - Apache Flink 错误处理和条件处理

我是 Flink 的新手,并且已经浏览了站点/示例/博客以开始使用。我正在努力正确使用运算符。基本上我有2个问题

问题一:Flink 是否支持声明式异常处理,我需要处理 parse/validate/... 错误?

  • 我可以使用 org.apache.flink.runtime.operators.sort.ExceptionHandler 或类似的东西来处理错误吗?
  • 还是 Rich/FlatMap 功能我最好的选择?如果 Rich/FlatMap 是唯一的选择,那么有没有办法在 Rich/FlatMap 函数中处理 Stream 以便可以附加 Sink 以进行错误处理?

问题 2:我可以有条件地附加不同的接收器吗?

  • 基于键控拆分流中的某些字段,我需要选择不同的接收器,我是再次拆分流还是使用 Rich/FlatMap 来处理?

我正在使用 Flink 1.3.2。这是我工作的相关部分

我在这里使用正确的运算符吗?

更新1:

尝试使用@alpinegizmo 建议的 ProcessFunction 但这不起作用,因为它取决于我在解析/验证输入之前没有的键控流。我得到“InvalidProgramException:对于非复合类型,字段表达式必须等于'*'或'_'。”。

这是一个非常常见的用例,您的第一个解析/验证输入并且还没有键控流,那么您如何解决它?

感谢您的耐心和帮助。

0 投票
2 回答
981 浏览

python - Docker 映像中的 nginx 缓冲烧瓶事件流

我有一个带有 python/flask 的 REST API 后端,并希望在事件流中流式传输响应。一切都在带有 nginx/uwsgi ( https://hub.docker.com/r/tiangolo/uwsgi-nginx-flask/ )的 docker 容器中运行。

API 工作正常,直到涉及事件流。似乎某些东西(可能是 nginx)正在缓冲“收益”,因为在服务器完成计算并将所有内容一起发送之前,任何类型的客户端都没有收到任何内容。

我尝试使用附加配置(nginx_streaming.conf)文件来调整 nginx 设置(根据 docker 映像说明):

码头文件:

但是我不太熟悉 nginx 设置,并且确定我在这里做什么^^这至少不起作用..有什么建议吗?

我的服务器端实现:

0 投票
3 回答
301 浏览

wso2 - esbAnalytics 不存在扩展名:解压缩

我正在尝试使用 WSO2 流处理器上的 siddhi 应用程序解压缩传入事件。在早期版本(例如 DAS)上,有 siddhi 扩展 esbAnalytics:decompress,但如果我在 SP 中使用此扩展,则会出现错误“esbAnalytics:decompress 不存在扩展”

有什么方法可以在 SP 上使用 esbAnalytics:decompress 或如何解压缩传入的 flowEntry 事件?提前致谢。

0 投票
1 回答
440 浏览

java - 数据流上的 Flink sql 查询(Apache Flink Java)

我对 Apache flink 完全是菜鸟。只是想弄脏我的手。我有以下情况。

  1. 事件数据流
  2. 事件数据流
  3. 规则的数据流
  4. 根据 ruleID 组合这两个数据流

现在我有一个 tuple3 的数据流,看起来像<ruleId, Rule, Event>. 这些规则是我想在事件上运行的 SQL 查询。

我正在研究动态表和 Flink SQL 的概念。我不确定如何进一步处理。有人可以帮我解决这个问题吗?

0 投票
1 回答
154 浏览

analytics - 使用 Kinesis Analytics 分析事件和相关的缺失事件,及时分开?

我有各种设备的事件流,可以“连接”或“断开”。

即一个事件具有以下结构:

  • 时间戳
  • 设备ID
  • 事件(“已连接”或“已断开”)

当设备在(特定于设备的可配置)时间段(例如 1 小时)内断开连接且未连接时,我想立即触发操作。我只想在每个“断开连接”事件中触发一次。

这是否可以使用 AWS Kinesis Analytics 完成?如果可以,查询会是什么样子?如果没有,可以使用其他工具完成还是我必须自定义构建它?

0 投票
1 回答
1284 浏览

java - 生成“假”流数据。卡夫卡-Flink

我正在尝试生成流数据,以模拟我在不同的时间范围内接收到两个值(整数类型)、时间戳和 Kafka 作为连接器的情况。

我正在使用 Flink 环境作为消费者,但我不知道哪个是生产者的最佳解决方案。(如果可能,Java 语法比 Scala 更好)

我应该直接从 Kafka 生成数据吗?如果是,最好的方法是什么?或者,如果我作为生产者从 Flink 生成数据,将其发送到 Kafka 并在最后由 Flink 再次使用它,也许会更好?我怎么能从 flink 做到这一点?或者也许还有另一种简单的方法来生成流数据并将其传递给 Kafka。

如果是,请让我走上实现它的轨道。

0 投票
2 回答
239 浏览

java - 流处理架构

我正在设计一个系统,其中有一个主要的对象流,并且有多个从该对象产生一些结果的工作人员。最后,有一些特殊/独特的工作人员(就图论而言,有点像“接收器”),它获取所有结果,并将它们处理为写入某个数据库的最终对象。

一个工人可能依赖于其他一些工人的结果(因此,等待他们的结果)

现在,我面临几个问题:

  1. 可能是一名工人比另一名工人慢得多。你怎么处理那件事呢?添加更多较慢类型的工人(=缩放)?(也许是动态的)
  2. 假设 W_B 依赖于 W_A。如果 W_B 由于某种原因关闭,则流程将停止,系统将停止工作。所以我希望系统以某种方式绕过这个工人。
  3. 此外,最终工作人员如何决定何时对结果集进行操作?假设它有 A 和 B 的结果,但缺少 C 的结果。可能是 C 已关闭,或者目前非常慢。它如何做出决定?

值得一提的是,它不是一个实时应用程序,而是一个离线处理系统(即您可以访问数据库并更改记录),但同时它必须以“高速度”处理相对大量的对象”。

关于技术,
我正在使用 Java 开发系统,但我不受特定技术的限制。

如果您能帮助我进行系统的总体设计,我会很高兴。

非常感谢!

0 投票
1 回答
458 浏览

apache-beam - 如何使用 Apache Beam 创建滚动窗口?不是滑动的或固定的,而是一个滚动的窗口

假设我想计算每分钟后过去 10 分钟内某个指标的平均值,并将其与每分钟后过去 20 分钟内相同指标的平均值进行比较。我需要 2 个窗口(不是 10 个滑动窗口与 20 个滑动窗口)或 2 个固定持续时间的窗口,并提前触发。我需要 2 个窗口,它们应该每分钟向前滚动一分钟(每个持续时间为 10 分钟和 20 分钟)。或者,如果我可以丢弃除最新的滑动窗口之外的所有内容,我的问题就可以解决。否则多个滑动窗口非常昂贵。

你能帮忙吗?自定义 WindowFn() 函数将非常有帮助