0

我的目标是读取 csv 文件,将其转换为 Java 对象(PO​​JO)并将 Java 对象一一发送到 ActiveMQ 队列。下面是代码:

public void configure() throws Exception {
    from("file:src/main/resources?fileName=data.csv")               
    .unmarshal(bindy)
    .split(body())
    .to("file:src/main/resources/?fileName=equityfeeds.txt")
    .split().tokenize(",").streaming().to("jms:queue:javaobjects.upstream.queue");          
}

问题: 1.当我执行代码时,没有文件(equityfeeds.txt)被创建,也没有对象进入队列。怎么了?我现在不需要做任何处理。我只需要将 csv 解组到 POJO 并将 Java 对象一一发送到 ActiveMQ 队列。

EquityFeeds (POJO)

@CsvRecord(separator = ",",skipFirstLine = true)
public class EquityFeeds {

    @DataField(pos = 1) 
    private String externalTransactionId;

    @DataField(pos = 2)
    private String clientId;

    @DataField(pos = 3)
    private String securityId;

    @DataField(pos = 4)
    private String transactionType;

    @DataField(pos = 5, pattern = "dd/MM/YY")
    private Date transactionDate;

    @DataField(pos = 6)
    private float marketValue; 

    @DataField(pos = 7)
    private String priorityFlag;

请帮助。请告诉我哪里出错了。

@pvpkiran:以下是我的生产者骆驼代码:

public void configure() throws Exception {
            from("file:src/main/resources?fileName=data.csv")               
                .unmarshal(bindy)
                .split(body())
                .streaming().to("jms:queue:javaobjects.upstream.queue");
}

以下是我的消费者代码(使用 JMS API):

@JmsListener(destination = "javaobjects.upstream.queue")
public void javaObjectsListener(final Message objectMessage) throws JMSException {
        Object messageData = null;
        if(objectMessage instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) objectMessage;
            messageData = objMessage.getObject();
        }
        System.out.println("Object: "+messageData.toString());
    }

我没有使用 Camel 来使用 JMSMessage。在消费者中,我使用 JMS API 来消费消息。我也没有测试代码。消息已进入 ActiveMQ,我正在使用 JMS API(如上)来使用消息。在终端中我得到 NullPointerException。还有 2 条消息进入ActiveMQ.DLQ,给出以下错误消息:

java.lang.Throwable:Delivery[7] 超过重新传递策略限制:RedeliveryPolicy {destination = null,collisionAvoidanceFactor = 0.15,maximumRedeliveries = 6,maximumRedeliveryDelay = -1,initialRedeliveryDelay = 1000,useCollisionAvoidance = false,useExponentialBackOff = false,backOffMultiplier = 5.0, redeliveryDelay = 1000,preDispatchCheck = true},原因:null

4

1 回答 1

0

尝试这个。这应该工作

from("file:src/main/resources?fileName=equityfeeds.csv")
                    .unmarshal(new BindyCsvDataFormat(EquityFeeds.class))
                    .split(body())
                    .streaming().to("jms:queue:javaobjects.upstream.queue");
// This route is for Testing
from("jms:queue:javaobjects.upstream.queue").to("bean:camelBeanComponent?method=processRoute"); 

并编写一个消费者组件bean

@Component
public class CamelBeanComponent {
    public void processRoute(Exchange exchange) {
        System.out.println(exchange.getIn().getBody());
    }
}

此打印(toString()如果您需要这样的输出,您需要添加)

EquityFeeds(externalTransactionId=SAPEXTXN1, clientId=GS, securityId=ICICI, transactionType=BUY, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=101.9, priorityFlag=Y)
EquityFeeds(externalTransactionId=SAPEXTXN2, clientId=AS, securityId=REL, transactionType=SELL, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=121.9, priorityFlag=N)

如果您使用.split().tokenize(",")then,则每行(不是完整行)中的每个字段都将转换为 EquityFeeds 对象(其他字段为空)作为消息发送到队列

于 2020-03-26T10:17:28.040 回答