0

我有一个关于春季消息队列处理的问题 - 我对此很陌生。我正在尝试增加从队列读取并写入数据库实时财务信息的应用程序的吞吐量。问题是消息必须保持有序,即先进先出。这意味着我最初增加并发消费者数量的方法是不可行的,因为可能会出现订购丢失的情况。

将并发消费者从 1 更改为 5 意味着我可以处理 10,000 个并节省大量时间。(约 20 分钟)

作为 Spring 的新手并且是一名研究生开发人员(在我的第一年),我不确定还有哪些选择。Spring batch 已经出现了,但是因为金融信息交易需要尽快从队列中处理,所以我不能等待一批 500 来填满。

请有人建议使用弹簧的这种情况下哪些方法是可行的?

谢谢

4

2 回答 2

0

下面是如何使用消息驱动通道适配器和 Jdbc 出站适配器使用 Spring Integration 进行此类消费的示例。以下是一些会影响您的表现和吞吐量的关键因素;

  • 事务 - 如果它通过队列进入数据库,这将增加一些开销
  • 转换 - 将消息映射转换为表需要多少处理

这是 Spring 集成示例;

<int-jms:message-driven-channel-adapter channel="trade.input.channel"
    concurrent-consumers="1" connection-factory="connectionFactory"
    destination="issue.queue"/>

<int:channel id="trade.input.channel"/>

<int-jdbc:outbound-channel-adapter 
    channel="trade.input.channel"
    data-source="dataSource" query="insert into target_table (issue_code,issue_price,transaction_timestamp) values (:issue_code,:issue_price,:issue_timestamp)"
    sql-parameter-source-factory="spelFactory"/>    

<bean id="spelFactory" class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="issue_code" value="payload.toString().split(',')[0]"/>
            <entry key="issue_price" value="payload.toString().split(',')[1]"/>
            <entry key="issue_timestamp" value="payload.toString().split(',')[2]"/>
        </map>
    </property>
</bean>

这是示例消息

MSFT,100.00,1373761697932

它非常粗糙,但可能是一个起点。简单单元测试的性能平均每秒大约 200 条消息,但这有点依赖于硬件。

于 2013-07-14T00:51:47.490 回答
0

如果要求以与请求相同的顺序写入数据库,那么您完全是单线程的。

但是,如果您希望在从数据库中读回数据时能够确定顺序,那么您只需要确保正确维护一个 order 字段(注意时间戳通常是不够的,因为您可能会在最小精度),因此时间戳加上额外的订单值将允许您确定确切的订单。

所以单线程输入,添加时间戳和订单值,然后将请求传递给数据库写入器池。

订单值可以每分钟或任何适当的时间段重置。

于 2013-07-17T06:25:46.920 回答