问题标签 [aggregator]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1451 浏览

apache-camel - 骆驼聚合器不会聚合所有

我正在拆分一些 java 对象,然后进行聚合。我有点困惑这种完成策略如何与骆驼(2.15.2)一起使用。我正在使用完成大小和完成超时。如果我理解正确,完成超时对它没有太大影响。因为,这里没有太多的等待。

总共,我有 3000 多个对象。但是,似乎只有一部分是聚合的。但是,如果我改变完成大小值,情况就会改变。如果大小为100,则聚合在800左右,如果是200,则聚合到1600左右。但是,我事先不知道对象的大小,因此不能依赖假设的数字。

谁能向我解释我在这里做错了什么?如果我使用 eagerCheckCompletion,它会一次性聚合整个事情,这是我不想要的。下面是我的路线:

0 投票
0 回答
426 浏览

loops - WSo2 Esb 过滤消息到输出文件第 2 部分

好的,这是我原来问题的延续。(WSo2 Esb 将消息过滤到输出文件)在对迭代和聚合调解器进行了几天的研究之后,我已经到达了进度墙,并且非常感谢任何关于如何解决我的阻塞问题的建议。手头的任务很简单,读取 xml 文件,过滤某些记录,然后只生成一个输出 xml 文件,其中包含那些符合条件的记录。经过先前的讨论和研究,此任务解决方案如下:使用迭代中介访问每个 xml 记录并使用过滤中介来决定我要保留的内容。然后使用聚合器调解器将所有这些记录写入 1 个文件。如果没有聚合器,我的配置会为每个文件生成 1 条记录。这是我在这种状态下的代理:

这是 RenIqtFilterSequence:

我现在面临的问题是我无法使用 outSequence 来执行我的聚合器逻辑。事实是我不需要从 inSequence 为这个过程调用后端服务,所以我正在努力寻找一种干净的方式来访问我的 outSequence。有什么推荐吗?我试图在我的 inSequence 中包含聚合器调解器,但它不起作用,没有产生任何输出。我认为聚合器需要在 outSequence 中以捕获我过滤的所有记录,然后我可以将它们全部写入 1 个文件。我相信我需要在这里进行一些配置:

我已经尝试了几种基于在线文章和博客的方法,但我无法将最后的部分放在适当的位置。任何建议将不胜感激。感谢您花时间阅读本文。

0 投票
1 回答
738 浏览

spring-integration - Spring Integration Aggregator 在特定场景中将过期/超时消息发送到错误的通道

我将消息发送到聚合器的输入通道,然后聚合器将聚合的消息发布到输出通道。聚合器至少需要 2 条消息(用于聚合),否则等待 10 秒以等待超时。我也在使用 jdbc 消息存储。

以下是我测试过的场景。

方案 1 工作正常

发送消息 1 和 2 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)

方案 2 工作正常

发送消息 1 和 2 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output2)

场景 3 工作正常

仅发送消息 1 -> 输入通道 (input1) -> 聚合器 1 -> 输出通道 (output1)

场景 4 失败,因为它不是将过期消息发送到 output2,而是发送到 output1

仅发送消息 1 -> 输入通道 (input2) -> 聚合器 2 -> 输出通道 (output1)

任何人都可以提出为什么方案 4 失败了吗?

以下是我的配置

0 投票
1 回答
150 浏览

spring - Aggregator behavior on server restart - spring integration

Premise -

In spring integration,if i have a aggregator with a message group which is incomplete. Before group release stratergy is met, server is restarted.

  • Current Behavior-> all the messages posted to the aggregator go to the same message group and not a new one, since it is not marked complete, messages keep flowing in.
  • Expected-> If server is restarted, aggregator picks the left over messages from message store, marks already persisted ones complete & then cater new ones,

Is my expectation incorrect? Can somebody guide?

0 投票
1 回答
2293 浏览

java - Spring Integration Java DSL——聚合器的配置

我有一个非常简单的集成流程,其中使用发布-订阅通道将 RESTful 请求转发给两个提供者。然后将两个 RESTful 服务的结果聚合到一个数组中。集成流程示意图如下所示:

但是,在运行我的代码时,结果数组仅包含由 RESTful 服务之一返回的项目。我缺少任何配置步骤吗?

更新

考虑到 Artem 的评论,以下版本对应于完整的解决方案。

0 投票
1 回答
42 浏览

mongodb - 如何使用 MongoDB 聚合器按最大值和最小值之间的统一数据间隔进行分组?

假设我有一大堆数据为特定字段产生一系列整数值......我希望看到那些按发生间隔分组排列的数据,也许是因为我正在聚类......就像这样:

我如何编写一种快速而肮脏的方法来 A)使用 MongoDB 聚合器将集合数据按等距间隔分组 B)最小和最大范围?

0 投票
1 回答
392 浏览

spring-integration - Spring Integration Splitter 后消息丢失。数据未随机到达聚合器

我正在尝试并行处理 SQL 查询的输出。下面给出的是我的代码。我在聚合器中有 sysout。但是我随机看到 Aggregator 中的 sysout 没有被打印出来。此外,聚合器中的发布方法也不是打印系统输出。我想,我在某些地方丢失了消息。任何人都可以解释一下吗?

0 投票
2 回答
518 浏览

spring-integration - 无法让聚合器工作

我正在尝试了解聚合器的基础知识。以下是我尝试实现的用例:

1) 从队列中读取消息(订单详情)。

一条订单消息将包含多个orderItem。我们可以期待队列中有数百条订单消息。

2) 最终结果 ::

a) 每个订单项都应该写入一个文件。

b) 4 个这样的文件应该被写入一个唯一的文件夹。

举个例子,假设我们收到了两条订单消息——每条都包含三个orderitem

所以我们需要创建 2 个文件夹:

在“文件夹 1”中,应该有 4 个文件(每个文件中有 1 个订单项)

在“文件夹 2”中,应该有 2 个文件(每个文件中有 1 个订单项)。在这里为简单起见,我们假设没有更多的订单消息出现,我们可以在 5 分钟后写入。

执行:


  1. 我能够从队列(websphere MQ)中读取消息并成功解组消息。
  2. 使用拆分器根据订单项计数拆分消息。
  3. 使用 Aggregator 将消息分组为 4 大小。

根据我的理解,我无法让聚合器工作。

  1. 我在 4 个orderitem时推送一个订单,消息被正确聚合。
  2. 我推送一个带有 5 个orderitem的订单,前 4 个被聚合,但最后一个被发送到丢弃通道。这是预期的,因为 MessageGroup 被释放,所以最后一条消息被丢弃。
  3. 我推送两个订单,每个订单包含 2 个orderitem。最后 2 个订单项被发送到丢弃通道。
    关联策略是硬编码的(OrderAggregator.java),但上述情况应该有效。

需要有关如何实现此用例的指示,我可以将它们分组为 4 并写入唯一的文件夹。请注意,orderitem都是独立的书单,它们之间没有任何关系。

下面是配置。

spring-bean.xml

OrderAggregator.java

DisplayAggregatedList.java

0 投票
0 回答
301 浏览

spring-integration - Using JDBCMessageStore in Aggregator

I am trying to understand the behaviour of Aggregator when using JDBCMessageStore. Below is my use case:

1) Read message(order details) from queue.

#xA;

One order message will contain multiple orderItem.

2) Split the order message based on the count of orderItem

3) Aggregate into 4 orderItem message; messageCountReleaseStrategy is used with threshold = 4

4) Forward to aggregated message to service activator.

I was observing the data in 3 tables : INT_MESSAGE_GROUP, INT_MESSAGE and INT_GROUP_TO_MESSAGE and seems like data is initially inserted in these 3 tables and when the group is released , the date is removed/deleted from these tables.

I wanted to check DB insert failures in one of the tables (INT_GROUP_TO_MESSAGE). To simulate the error , I dropped the table INT_GROUP_TO_MESSAGE.

The below test was run:

1) Posted 1 order message with 3 orderItem

2) Got an error with message - Table does not exist - (Expected as INT_GROUP_TO_MESSAGE does not exist)

3) I could find records in 2 tables

a) INT_MESSAGE_GROUP had 1 record.

b) INT_MESSAGE had 23 records. Seems like as the message was retried multiple times, multiple entries was done.

4) Created the table INT_GROUP_TO_MESSAGE

5) Posted 1 order message with 1 orderItem

6) The original message(step 1) alongwith the new one (Step 5) was picked up from queue and processed.

7) Aggregator released a group a 4 messages.

8) However in database table - INT_MESSAGE there still exists 23 records, whereas other tables are empty.

I simulated the Db error my dropping the table , but in production we may encounter memory, tablespace issues that can cause these inserts to fail.

My question is can we set any parameter so that we have transaction in 3 tables : INT_MESSAGE_GROUP, INT_MESSAGE and INT_GROUP_TO_MESSAGE?

Below is my configuration

#xA;
0 投票
1 回答
182 浏览

spring - 最后一条记录过滤器时的Spring Integration聚合器

我们有一个 Spring Integration 项目,其中一个大的输入文件被分解成许多单独的文件,然后使用 Spring Integration Aggregator 重新聚合在一起。

管道中有许多过滤器可以过滤掉不需要的单个文件。我们跟踪为每个相关输入文件过滤的文件数量。我们的@ReleaseStrategy 检查我们是否收到了单个文件的数量减去过滤的单个文件的数量。

如果最后一个要处理的文件在到达我们的 ReleaseStrategy 之前被过滤,会发生什么?ReleaseStrategy 为到达它的每个单独文件调用,如果最后一个单独的文件被过滤,我将不会再次被轮询,但我也希望 Spring 已经预料到这个用例并为它做出了一些非骇客的规定,仍然提供我与@Aggregator 事件。如果我超时或者如果我让所有过滤点检查它们是否是最后一个文件,我不会收到 @Aggregator 事件。

谢谢!