1

我有一个服务激活器,它使用轮询器从通道中提取消息。该通道有一个队列,该队列由数据库的持久存储支持。轮询器还配置了一个任务执行器,以便为来自通道的消息处理添加一些并发性。

任务执行器配置有队列容量。

由于轮询器从数据库中的通道检索消息并且这被配置为事务性的,因此如果任务执行器没有更多可用线程,那么在任务执行器中排队的消息的事务会发生什么。线程的任务执行器上的请求被排队,由于这些消息没有自己的线程,事务会发生什么?我假设轮询器从持久通道存储中删除由任务执行器(在)排队的消息将被提交。因此,如果服务器在任务执行器中排队运行可运行对象时出现故障,它们会丢失吗?

由于事务性持久通道队列的想法是确保在服务器出现故障时不会丢失任何消息,因此如何根据通道数据库支持的队列/存储上的活动事务处理排队的消息(在任务执行器中)?

    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="channelServerDataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="${user.name}_${channel.queue.region:default}"/>
    <property name="usingIdCache" value="false"/>
</bean> 

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory>

<int:channel id="transacitonAsyncServiceQueue">
    <int:queue message-store="store"/> 
    <!--  <int:queue/>  --> 
</int:channel>

<bean id="rxPollingTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="500"/>
    <constructor-arg value="MILLISECONDS"/>
    <property name = "initialDelay" value = "30000"/> 
    <!-- initialDelay important to ensure channel doesnt start processing before the datasources have been initialised becuase we
         now persist transactions in the queue, at startup (restart) there maybe some ready to go which get processed before the
         connection pools have been created which happens when the servlet is first hit -->
</bean> 

<int:service-activator ref="asyncChannelReceiver" method="processMessage" input-channel="transacitonAsyncServiceQueue">
    <int:poller trigger="rxPollingTrigger" max-messages-per-poll="20"  task-executor="taskExecutor" receive-timeout="400">
        <int:transactional transaction-manager="transactionManagerAsyncChannel" /> 
    </int:poller>
    <int:request-handler-advice-chain>
        <ref bean="databaseSessionContext" />
    </int:request-handler-advice-chain>
</int:service-activator>

<task:executor id="taskExecutor" pool-size="100-200" queue-capacity="200" keep-alive="1" rejection-policy="CALLER_RUNS" />  
4

1 回答 1

0

...如果任务执行器没有更多可用线程,那么在任务执行器中排队的消息的事务会发生什么。线程的任务执行器上的请求被排队,由于这些消息没有自己的线程,事务会发生什么?

它不是那样工作的 - 事务直到任务执行(包括receive()来自通道)才开始。

轮询器调度任务,任务启动,事务启动,工作继续,事务提交/回滚。

请参阅AbstractPollingEndpoint - 这pollingTask.call()是事务开始的地方。

于 2015-04-23T07:56:26.540 回答