问题标签 [event-stream-processing]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
siddhi - Keep only alphanumeric characters in string data
I'm getting string data in a stream and I would like to keep only the alphanumeric characters. I noticed that Siddhi offers a regexp function, as mentioned here. But the thing is it returns a Boolean instead of the modified string. Is there anyway to get the modified string directly? This is my code.
Is there a function that returns the modified regex string?
apache-flink - 事件聚合后未调用 AggregateFunction getResult()
试图实现一个 Flink 作业来读取 Kafka 流和聚合会话,由于某种原因 getResult() 没有被调用。我看到 createAccumulator() 和 add() 被调用,我期待 getResult() 也被调用,以便我可以在目的地接收聚合消息。
可能是什么问题?我配置错误了吗?感谢你的帮助!
apache-kafka - 未使用的 Kafka 主题/分区的成本
在设计流式处理管道时,如果我有许多主题,这些主题至少有一个分区但可能没有数据进入,那么可能会产生什么成本?
例如,对于一个消费者,我可以选择拥有一个包含所有数据和许多分区的“大型主题”,或者我可以选择将该数据(按租户、帐户或用户等)拆分为多个主题,默认情况下,单个分区。我对第二种情况的担心是会有很多主题/分区看不到数据。那么,这个未使用的分区是否会产生任何成本,或者未使用的主题是否不会产生任何成本。
spring-boot - 在 Kafka 流处理中聚合时使用提供的主题进行更改日志和重新分区
我正在使用 Kafka 流处理来使用 Springboot 聚合来自源对象的数据。
在运行此应用程序时,连同提供给processSourceObject的主题一起,它会自动创建另外两个主题
- processSourceObject-applicationId-data-snapshots-changelog
- processSourceObject-applicationId-data-snapshots-repartition
由于某些原因,我想使用现有主题而不是使用这两个主题。我在哪里进行更改以提供预定义主题的名称,以供我的应用程序用于更改日志和重新分区数据?
php - 太多时PHP关闭连接
最近,我决定在一些需要服务器发送事件(事件流)的页面上加载测试我的网站,这意味着 PHP 需要处理许多打开的连接(循环中的脚本)。当接近约 140 个打开的连接时,连接在 1 或 2 秒后开始关闭。在进行负载测试时,我监控了我的服务器,CPU 负载非常低,RAM 使用率低,网络不是瓶颈,所以我知道这可能不是硬件缺陷。Apache 日志显示没有 PHP 警告(也没有致命错误)。php ini 文件中是否有可能负责关闭连接的参数?
如果您需要一些日志或其他我会尽力提供它们。
谢谢。
wso2 - 使用 siddhi 的按需查询失败
我正在使用 wso2 流集成器来运行我的 siddhi 应用程序。运行环境为windows 10 pro。我想就使用 Siddhi 的 REST API 的方式向您寻求帮助。
- 以管理员身份打开命令提示符并转到 \wso2si-1.1.0\bin。然后,执行命令
server.bat --run
。 - 部署 wso2si-1.1.0\wso2\server\deployment\siddhi-files 中的任何 siddhi 文件,它工作正常。
- 打开一个新的命令提示符并执行命令
curl -X GET "http://localhost:9090/siddhi-apps" -H "accept: application/json" -u "admin:admin" -k
。 - 命令成功并显示应用程序列表。
- 接下来,我定义了
ABC
as named-windowFlowApp
并执行了命令curl -X POST "https://localhost:9443/query" -H "content-type: application/json" -u "admin:admin" -d "{"appName" : "FlowApp", "query" : "from ABC select *" }" -k
- 上述查询失败,命令提示符显示“Problem access: /query. Reason: Not Found”。
为什么我找不到“/查询”?
azure - 在带有分区和多个“事件处理器”客户端的 Azure 事件中心中按顺序处理
我计划利用 Azure 事件中心中的所有 32 个分区。要求:每个分区的“有序”处理至关重要。问题:如果我将 TU(吞吐量单位)增加到所有32 个分区的最大可用 20,我得到 40 MB 的出口。假设我计算出我需要 500 个并行客户端线程并行处理 (EventProcessorClient) 来满足我的吞吐量需求。如何在满足我的“订购”要求的同时使用 EventProcessorClient 实现这种级别的并行性?顺便说一句,在 Kafka 中,我可以在一个主题中创建 500 个分区,而 Kafka 每个分区只允许 1 个线程来保证事件顺序。
node.js - 在 AWS Lambda 中读取事件流时出现问题。Nodejs 代码根据需要在本地工作,但不在 AWS Lambda 中
这是工作流程:
获取 https 链接 --> 写入文件系统 --> 从文件系统读取 --> 获取 sha256 哈希。
它在我运行节点 10.15.3 的本地机器上运行良好但是当我在 AWS 上启动 lambda 函数时,输出为空。可读流可能存在一些问题。这是代码。您可以直接在本地计算机上运行它。它将根据需要输出一个 sha256 哈希。如果您希望在 AWS Lambda 上运行,请按照标记添加注释/取消注释。
apache-kafka - 如何用 Kafka 生成连续编号的消息?
上下文:开票系统,发送的发票必须有连续的编号。
每张发票都有一个唯一的发票编号,为简单起见,假设它们是I1
、I2
、I3
等。因此,系统中的第一张发票的编号为I1
,并且每下一张发票都会递增。然后,每张发票都在 Kafka 主题中生成。
所以,总能只根据这个话题的内容来计算下一张发票的数量吧?(主题中的发票数量 + 1 = 下一个数字)我们可以将这样的系统称为事件源。
但是你如何做到这一点?对我来说,这似乎是一个循环数据流:为了生成主题,我首先需要确保我在另一个地方消费了相同的整个主题。
我对事件流有什么误解还是卡夫卡不可能?
发票总是被分配编号并一张一张地发送,而不是并行发送。
scala - 在 Confluent Cloud 上将 Snowplow 与 Kafka 连接的问题
我已经成功地使用 Web Tracker、Scala Stream Kafka Collector、Stream Enrich、Druid 和 Metabase 在 Docker 中实现了端到端的本地实现。
在使用 Helm 在 Kubernetes 中构建工作暂存环境之前,我想尝试将 Scala Stream Kafka Collector 连接到我们的 Kafka Confluent Cloud 帐户。但是,我遇到了 SASL 身份验证问题。关于这个主题的文档非常稀少,只是将我们指向 Kafka 文档。这是我的config.hocon
配置 -
但是,当容器启动时,输出中的配置不匹配 -
此外,我在控制台中收到以下错误 -
在搜索常用渠道寻求帮助时,建议此错误是由于 JAAS 配置对生产者不可见造成的,因此我非常确信这只是我这边的配置问题。除非我遗漏了什么,否则我希望生产者可以使用 SASL 身份验证,因为配置表明配置选项可用。
有没有人有这个问题的经验?