9

我正在使用 Apache Camel SQL 批量插入过程。

  1. 我的应用程序正在从包含大约 2000 个票证的 Active MQ 中读取票证。

  2. 我已将批次更新为 100。

  3. 我正在触发的查询如下:

    sql.subs.insertCdr= insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp) values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)

  4. SQL批处理路由定义如下:

    <pipeline>
       <log message="Going to insert in database"></log>
       <transform>
          <method ref="insertionBean" method="subsBatchInsertion"></method>
       </transform>
       <choice>
           <when>
               <simple>${in.header.subsCount} == ${properties:batch.size}</simple>
               <to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
               <log message="Inserted rows ${body}"></log>
           </when>
       </choice>
    </pipeline>
    
  5. 下面是我的java代码:

    public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
    if (subsBatchCounter > batchSize) {
        subsPayLoad.clear();
        subsBatchCounter = 1;
    }
    subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
    exchange.getIn().setHeader("subsCount", subsBatchCounter);
    subsBatchCounter++;
    return subsPayLoad;
    }
    
    public Map<String, Object> generateInsert(Cdr cdr) {
    Map<String, Object> insert = new HashMap<String, Object>();
    try {
        insert = BeanUtils.describe(cdr);
    } catch (Exception e) {
        Logger.sysLog(LogValues.error, this.getClass().getName()+" | "+Thread.currentThread().getStackTrace()[1].getMethodName(), coreException.GetStack(e));
    } 
    for (String name : insert.keySet()) {
        Logger.sysLog(LogValues.APP_DEBUG, this.getClass().getName(), name + ":"+ insert.get(name) + "\t");
    }
    return insert;
    }
    

现在的问题是当 ActiveMQ 中有大约 120 张票时,SQL 批处理应该已经开始将值插入到数据库中。但这需要更多的时间。当 ActiveMQ 中有大约 500 张票时,它开始插入过程。任何人都可以帮助优化插入过程吗?还是有其他方法?

4

1 回答 1

6

问题出在 ActiceMQ 消费者数量上。

当我将消费者计数改回 1 时,批次会按时更新。

实际上,当消费者数量为 10 时,门票是并行消费的。这意味着,对于 10 个消费者从 activemq 消费的 100 张票,每个消费者大约有 10 张票,因此增加了更多时间。当任何一个消费者获得 100 张票时,该批次正在更新。

因此,将消费者计数更改为 1 使得所有票证都由单个消费者处理,从而可以很好地执行批量更新。

于 2016-11-07T07:15:33.310 回答