0

我正在设置一个流程以将数据从一个表中提取到另一个表中,标记已处理的行,然后通过电子邮件发送结果。我使用了聚合器(请参阅: 如何让 Mule 从 JDBC 查询中将多行作为单个事务返回?)将所有行合并到一封电子邮件中,但是在添加聚合器后,SMTP 端点无法在一秒钟内运行迭代...

  1. 启动骡流。
  2. 插入导致流运行的行。
  3. SQL 脚本和 SMTP 端点运行。
  4. 插入更多行,流程再次运行。
  5. 只有 SQL 脚本运行... SMTP 端点根本不运行。

    <flow name="Invoice_Workflow2Flow1" doc:name="Ross_invoice_Workflow2Flow1" processingStrategy="asynchronous">
        <jdbc:inbound-endpoint queryKey="GetUnprocessedInvoices" queryTimeout="10000" pollingFrequency="10000" connector-ref="Database" doc:name="Get invoice run">
            <jdbc:query key="GetUnprocessedInvoices" value="SELECT        INVOICE_NUMBER, ROWID FROM            FIN.LHF_INVOICE_WORKFLOW WHERE        (STATUS_FLAG = 'N')"/>
        </jdbc:inbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="insert_invoice_run" queryTimeout="10000" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Load to custom_app_data">
            <jdbc:query key="insert_invoice_run" value="INSERT INTO lhf_ros_invoice_workflow ([INVOICE_NUMBER]  VALUES #[map-payload:INVOICE_NUMBER])"/>
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MarkAsProcessed" queryTimeout="10000" connector-ref="DatabaseMuleLogin" doc:name="Mark Processed in Ross">
            <jdbc:query key="MarkAsProcessed" value="UPDATE       FIN.LHF_INVOICE_WORKFLOW SET                STATUS_FLAG = 'P' WHERE        (ROWID = #[map-payload:ROWID])"/>
        </jdbc:outbound-endpoint>
        <message-properties-transformer doc:name="Message Properties">
            <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="1000"/>
            <add-message-property key="MULE_CORRELATION_ID" value="1"/>
        </message-properties-transformer>
        <collection-aggregator timeout="1000" failOnTimeout="false" doc:name="Collection Aggregator"/>
        <smtp:outbound-endpoint host="mail.example.com" to="test@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Exceptions" responseTimeout="10000" doc:name="SMTP"/>
    </flow>
    

这是有用时会发生什么的日志...

(第一次运行正常,SMTP 端点运行)

INFO 2012-12-06 09:08:52,143 [[ross_invoice_workflow].DatabaseMuleLogin.dispatcher.02] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:08:52,159 [[ross_invoice_workflow].DatabaseMuleLogin.dispatcher.03] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:08:52,180 [ [ross_invoice_workflow].DatabaseMuleLogin.dispatcher.01] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:08:52,223 [[ross_invoice_workflow].SPTSQL01_APPS_custom_app_data。 dispatcher.01] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:08:52,223 [[ross_invoice_workflow].SPTSQL01_APPS_custom_app_data.dispatcher.03] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:08:52,223 [[ross_invoice_workflow]。 SPTSQL01_APPS_custom_app_data.dispatcher.02] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:更新了 1 行 INFO 2012-12-06 09:08:53,131 [[ross_invoice_workflow].connector.smtp.mule.default.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor:加载默认出站变压器:org.mule.transport.email .transformers.ObjectToMimeMessage 信息 2012-12-06 09:08:53,138 [[ross_invoice_workflow].connector.smtp.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager:初始化:'connector.smtp.mule.default .dispatcher.1935379626'。对象是:SmtpMessageDispatcher INFO 2012-12-06 09:08:53,174 [[ross_invoice_workflow].connector.smtp.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager:开始:'connector.smtp.mule.default .dispatcher.1935379626'。对象是:SmtpMessageDispatcher

(插入更多行,SQL 端点运行但 SMTP 没有...)

INFO 2012-12-06 09:09:22,111 [[ross_invoice_workflow].DatabaseMuleLogin.dispatcher.03] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,129 [[ross_invoice_workflow].DatabaseMuleLogin.dispatcher.02] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新信息 2012-12-06 09:09:22,146 [ [ross_invoice_workflow].DatabaseMuleLogin.dispatcher.01] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,168 [[ross_invoice_workflow].SPTSQL01_APPS_custom_app_data。 dispatcher.03] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,168 [[ross_invoice_workflow].SPTSQL01_APPS_custom_app_data.dispatcher.01] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,168 [[ross_invoice_workflow]。 SPTSQL01_APPS_custom_app_data.dispatcher.02] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:更新了 1 行

(插入更多行,同样的事情)

INFO 2012-12-06 09:09:22,111 [[ross_invoice_workflow].DatabaseMuleLogin.dispatcher.03] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,129 [[ross_invoice_workflow].DatabaseMuleLogin.dispatcher.02] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新信息 2012-12-06 09:09:22,146 [ [ross_invoice_workflow].DatabaseMuleLogin.dispatcher.01] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,168 [[ross_invoice_workflow].SPTSQL01_APPS_custom_app_data。 dispatcher.03] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,168 [[ross_invoice_workflow].SPTSQL01_APPS_custom_app_data.dispatcher.01] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:1 行更新了 INFO 2012-12-06 09:09:22,168 [[ross_invoice_workflow]。 SPTSQL01_APPS_custom_app_data.dispatcher.02] org.mule.transport.jdbc.sqlstrategy.SimpleUpdateSqlStatementStrategy:执行 SQL 语句:更新了 1 行

4

1 回答 1

1

问题来自于MULE_CORRELATION_ID设置为1. 由于该关联ID的群组已被处理,因此该群组下发后无法继续使用该ID。在我的条带化测试中,当我在第一组发布后尝试提供更多具有相同相关 ID 的事件时,我实际上收到了错误。

我建议您在选择查询中添加一列,该列为所有选定的行返回相同的值,但每次 Mule 运行查询时都会更改。例如,查询可以接收 Mule 生成的 UUID,其表达式为 like#[java.util.UUID.randomUUID().toString()]并返回列中的值。然后您可以将此值用作相关 ID,因为它对于所选记录组是相同的。

假设添加的列称为“CID”,那么配置将是:

<set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="3" />
<set-property propertyName="MULE_CORRELATION_ID" value="#[message.payload.CID]" />

请注意,这set-property是一个比 更具表现力的配置元素message-properties-transformer

于 2012-12-07T19:29:04.700 回答