0

我有一个 Mule 流,我需要从数据库中获取行并写入文件。现在我在数据库中有 100 行,我需要一次从数据库中获取 5 行并写入文件,并在几次间隔后再次写入时间说 30 秒再获取 5 行并将有效负载写入文件.. 现在我的流程如下:-

 <spring:beans>
        <spring:bean id="DB_Source" name="DB_Source" class="org.enhydra.jdbc.standard.StandardDataSource">
            <spring:property name="url" value="${url}"/>
            <spring:property name="driverName" value="${driverName}"/>
        </spring:bean>
     </spring:beans>
    <jdbc-ee:connector name="Database_Global" dataSource-ref="DB_Source" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" transactionPerMessage="true">
        <!-- Here transactionPerMessage="false" so that it retrieve and display all the row at once-->
         <jdbc-ee:query key="RetriveQuery" value="select * from getData"/>  <!-- or we can use CALL sp_retrieveData(@Id=13) -->
    </jdbc-ee:connector>
    <context:property-placeholder location="classpath:conf/DBConnectionProp.properties"/>



    <flow name="InboundJDBC" doc:name="InboundJDBC" initialState="started">
        <jdbc-ee:inbound-endpoint  queryTimeout="-1" pollingFrequency="1000" doc:name="Database"   connector-ref="Database_Global" queryKey="RetriveQuery">

         <jdbc-ee:transaction action="ALWAYS_BEGIN" />

        <!--  <property key="receiveMessageInTransaction" value="true"/> --><!-- This to receive all the row in once -->
        </jdbc-ee:inbound-endpoint>
        <mulexml:object-to-xml-transformer doc:name="Object to XML"/>

      <message-properties-transformer doc:name="Message Properties"> 
      <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="5"/> <!-- Set the number of rows to be return at a time -->
      <add-message-property key="MULE_CORRELATION_ID" value="1"/> 
      </message-properties-transformer> 
      <collection-aggregator timeout="5000" failOnTimeout="false" doc:name="Collection Aggregator"/>

        <logger message="JDBC Transaction #[message.payload] **************" level="INFO" doc:name="Logger"/>
        <file:outbound-endpoint path="E:\backup\test\ss" outputPattern="#[java.util.UUID.randomUUID().toString()].txt" responseTimeout="10000" doc:name="File"/>

    </flow>  
</mule>

现在的问题是,当应用程序启动时,它只从数据库中提取 100 行中的 5 行并写入文件,然后不提取剩余的行并且不创建新文件......但我想获取 5每30秒后行并在最后将其写入一个新文件..我做错了什么吗?我已将以下内容作为参考:-如何让 Mule 从 JDBC 查询中将多行作为单个事务返回?

更新流程:-

<flow name="InboundJDBC" doc:name="InboundJDBC" initialState="started">
        <jdbc-ee:inbound-endpoint  queryTimeout="-1" pollingFrequency="1000" doc:name="Database"   connector-ref="Database_Global" queryKey="RetriveQuery">

         <jdbc-ee:transaction action="ALWAYS_BEGIN" />

     <!--  <property key="receiveMessageInTransaction" value="true"/> --><!-- This to receive all the row in once -->
        </jdbc-ee:inbound-endpoint>
        <set-property propertyName="#[message.inboundProperties['requestId']]" value="#[java.util.UUID.randomUUID().toString()]" doc:name="Property"/>

        <mulexml:object-to-xml-transformer doc:name="Object to XML"/>

      <message-properties-transformer doc:name="Message Properties"> 

      <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="5"/> <!-- Set the number of rows to be return at a time -->
      <add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties['requestId']]"/> 
      </message-properties-transformer> 
      <collection-aggregator timeout="5000" failOnTimeout="false" doc:name="Collection Aggregator"/>

        <logger message="JDBC Transaction #[message.payload] **************" level="INFO" doc:name="Logger"/>
        <file:outbound-endpoint path="E:\backup\test\ss" outputPattern="#[java.util.UUID.randomUUID().toString()].txt" responseTimeout="10000" doc:name="File"/>

    </flow>

现在它正在每行创建文件......

4

2 回答 2

0

我看到几个问题:

  • 查询选择所有行而不是仅 5,
  • 没有用于标记所选记录的更新查询,因此将一次又一次地拾取相同的记录,
  • The correlation ID is fixed with: <add-message-property key="MULE_CORRELATION_ID" value="1"/>. Since the correlation ID is fixed, the collection-aggregator will aggregate 5 messages for this ID and will stop there. Any new message coming for this ID will be discarded. Instead use an MEL expression that produces the same value for the five rows: what expression to use is up to you, it can for example be some sort of modulo of time that provides a constant value for a 30 seconds time window...
于 2014-07-23T19:22:55.027 回答
0

因此,根据大卫对第 3 点的建议,我使用以下方法进行了correlation ID修复:<add-message-property key="MULE_CORRELATION_ID" value="1"/>.
从现在开始,correlation ID已修复,collection-aggregator 聚合此 ID 的所有消息并停止。
希望这可以帮助其他有同样问题的人,并为我工作。

于 2017-04-09T19:39:56.070 回答