0

提前致谢!
我正在尝试/需要将文件从 FTP 写入 AMQ 队列。原因 - 我正在尝试使用 Camel JMS AMQ 在我的路由上添加故障转移。我是 Apache ActiveMQ JMS 的新手。我在 2 个单独的节点上有 2 个 AMQ 代理。在其他 2 个节点/服务器上,我有我的 JBoss Fuse Karaf Containers 客户端应用程序。我正在从客户那里连接到经纪人。AMQ 控制台、日志等。但是,我无法从 FTP 或电子邮件路由将文件写入队列。我猜我做错了什么,希望你能帮助解决这个问题。我想知道这是否可以在我尝试这样做时完成?

AMQ 代理节点配置片段 - 两者都相同,我尝试删除任何相关的超时。没有差异。

活动MQ.xml

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" producerFlowControl="true">
              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
            <policyEntry queue=">" producerFlowControl="true" memoryLimit="1gb">
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>

    <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="90"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>

    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://10.141.145.173:61617?connectionTimeout=0&amp;keepAlive=true&amp;useInactivityMonitor=false&amp;wireFormat.maxInactivityDuration=0&amp;enableStatusMonitor=true"/>
    </transportConnectors>

节点 2

<transportConnectors>
            <transportConnector name="openwire" uri="tcp://10.141.128.182:61617?connectionTimeout=0&amp;keepAlive=true&amp;useInactivityMonitor=false&amp;wireFormat.maxInactivityDuration=0&amp;enableStatusMonitor=true"/>
        </transportConnectors>

系统配置文件

# node 1
activemq.port = 61617
#activemq.host = localhost
activemq.host = 10.141.145.173
activemq.url = tcp://${activemq.host}:${activemq.port}

#
# Activemq configuration node 2
#
activemq.port = 61617
#activemq.host = localhost
activemq.host = 10.141.128.182
activemq.url = tcp://${activemq.host}:${activemq.port}

使用故障转移传输的客户端

jmsSourceDestination=Fleet.InboundFile.Queue
clusteredJmsMaximumRedeliveries=5
amq.url=failover://(tcp://10.141.145.173:61617,tcp://10.141.128.182:61617)?initialReconnectDelay=1000&randomize=false&timeout=5000
amq.username=admin
amq.password=admin


<transportConnectors>
    <transportConnector name="openwire" uri="tcp://10.141.145.173:61617?connectionTimeout=0&keepAlive=true&useInactivityMonitor=false&wireFormat.maxInactivityDuration=0&amp;enableStatusMonitor=true"/>
</transportConnectors>

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://10.141.145.173:61617?connectionTimeout=0&keepAlive=true&useInactivityMonitor=false&wireFormat.maxInactivityDuration=0&amp;enableStatusMonitor=true"/>
</transportConnectors>

路线形成 - 我有一个动态路线构建器,我将配置放入其中以创建路线

@Override
public void process(Exchange exchange) throws Exception {

    final String endpointConfigurationStr = exchange.getIn().getBody(String.class);
    LOG.info(endpointConfigurationStr);
    final String fileName = (String) exchange.getIn().getHeader("CamelFileName");

    if ((null != endpointConfigurationStr) && (null != fileName)) {

        Properties props = new Properties();
        props.load(new StringReader(endpointConfigurationStr));

        if (validateProperties(props)) {

            final String decoderName = props.getProperty("decoderName");
            LOG.info("DECODER NAME: " + decoderName);

            final String fileNameNoExtension = fileName.substring(0, fileName.lastIndexOf('.'));

            final String routeIdStr = String.format("fleet.inboundFile.%s.%s.Route", fileNameNoExtension, props.getProperty("transport"));

            if (props.getProperty("action").equalsIgnoreCase("activate")) {

                ServiceStatus routeStatus = exchange.getContext().getRouteStatus(routeIdStr);

                if ((null == routeStatus) || (routeStatus.isStopped())) {
                    exchange.getContext().addRoutes(new EndpointFileRouteBuilder(routeIdStr,
                    EndpointDescriptorFactory(props), 
                    props.getProperty("errorArchive"),
                    props.getProperty("unhandledArchive"), 
                    destinationEndpoint,
                    decoderName) );
                } else {
                    LOG.info("Route " + routeIdStr + " already started");
                }
            } else if (props.getProperty("action").equalsIgnoreCase("deactivate")) {

                ServiceStatus routeStatus = exchange.getContext().getRouteStatus(routeIdStr);
                if (routeStatus.isStarted()) {
                    exchange.getContext().stopRoute(routeIdStr);
                } else {
                    LOG.debug("Route " + routeIdStr + " already stopped");
                }
            } else {
                LOG.error("Invalid Action in File Properties");
            }
        } else {
            LOG.error("Invalid Properties File ");
        }
    } else {
        LOG.error("File Configuration File or File Name is null");
    }

}

从 STR 到 STR 的路线

    if (validateConfiguration()) {

        switch (transport.toLowerCase()) {
        case "ftp":
                    fromStr = String.format("%s://%s@%s:%s/%s?password=RAW(%s)&recursive=%s&stepwise=%s&useList=%s&passiveMode=%s&disconnect=%s"
                    + "&move=.processed"
                    + "&maxMessagesPerPoll=1"
                    + "&eagerMaxMessagesPerPoll=false"
                    + "&sortBy=file:modified"
                    + "&sendEmptyMessageWhenIdle=false"
                    + "&delay=60000"
                    + "&initialDelay=60000"
                    + "&connectTimeout=15000"
                    + "&localWorkDirectory=/tmp"
                    + "&readLockMinLength=0"
                    , transport, username, host, port, path, password, recursive, stepwise, useList, passiveMode, disconnect);
                    break;

        case "sftp":
                    fromStr = String.format("%s://%s@%s:%s/%s?password=RAW(%s)&recursive=%s&stepwise=%s&useList=%s&passiveMode=%s&disconnect=%s"
                    + "&move=.processed"
                    + "&maxMessagesPerPoll=1"
                    + "&eagerMaxMessagesPerPoll=false"
                    + "&sortBy=file:modified"
                    + "&sendEmptyMessageWhenIdle=false"
                    + "&delay=60000"
                    + "&initialDelay=60000"
                    + "&connectTimeout=15000"
                    + "&localWorkDirectory=/tmp"
                    + "&readLockMinLength=0"
                    , transport, username, host, port, path, password, recursive, stepwise, useList, passiveMode, disconnect);
                    break;
        case "file":
                    fromStr = String.format("%s://%s/?recursive=%s"
                    + "&move=.processed"
                    + "&readLock=changed"
                    + "&maxMessagesPerPoll=1"
                    + "&sortBy=file:modified"
                    + "&delay=60000"
                    + "&initialDelay=60000"
                    + "&renameUsingCopy=true"
                    ,transport, path, recursive);
                    break;
        default:
            LOG.info("Unsupported transport, cannot establish from or source endpoint!");
            throw new UnsupportedTransportException("Unsupported transport, cannot establish from or source endpoint!");
        }

        // Format the To Endpoint from Parameter(s).
        final String toStr = String.format("%s", toEndpoint);
        LOG.info("*** toStr - toEndpoint: " + toStr);

路线创建

            if (Boolean.parseBoolean(isEncryptedWithCompression)) { 
                //Compression and Encryption
                PGPDataFormat pgpVerifyAndDecrypt = new PGPDataFormat();
                pgpVerifyAndDecrypt.setKeyFileName("keys/secring.gpg");
                pgpVerifyAndDecrypt.setKeyUserid(pgpKeyUserId);
                pgpVerifyAndDecrypt.setPassword(pgpPassword);
                pgpVerifyAndDecrypt.setArmored(Boolean.parseBoolean(pgpArmored));
                pgpVerifyAndDecrypt.setSignatureKeyFileName("keys/pubring.gpg");
                pgpVerifyAndDecrypt.setSignatureKeyUserid(pgpKeyUserId);
                pgpVerifyAndDecrypt.setSignatureVerificationOption(PGPKeyAccessDataFormat.SIGNATURE_VERIFICATION_OPTION_IGNORE);

                from(fromStr)
//              .routeId("Compression.with.Encryption")
                .routeId(routeId)
                .log("Message received ${file:name} for Compression and Encryption " + " from host " + host)
                .unmarshal(pgpVerifyAndDecrypt).split(new ZipSplitter())
                .streaming().convertBodyTo(String.class)
//              .wireTap("file:" + fileArchive)
                .split(body()).streaming()
                .process(new EndpointParametersProcessor(decoderName))
                .to(toStr);

            } else if (Boolean.parseBoolean(isEncryptedOnly)) { 
                //Encryption Only
                PGPDataFormat pgpVerifyAndDecrypt = new PGPDataFormat();
                pgpVerifyAndDecrypt.setKeyFileName("keys/secring.gpg");
                pgpVerifyAndDecrypt.setKeyUserid(pgpKeyUserId);
                pgpVerifyAndDecrypt.setPassword(pgpPassword);
                pgpVerifyAndDecrypt.setArmored(Boolean.parseBoolean(pgpArmored));
                pgpVerifyAndDecrypt.setSignatureKeyFileName("keys/pubring.gpg");
                pgpVerifyAndDecrypt.setSignatureKeyUserid(pgpKeyUserId);
                pgpVerifyAndDecrypt.setSignatureVerificationOption(PGPKeyAccessDataFormat.SIGNATURE_VERIFICATION_OPTION_IGNORE);

                from(fromStr)
//              .routeId("Encryption.Only")
                .routeId(routeId)
                .log("Message received ${file:name} for Encryption Only " + " from host " + host)
                .unmarshal(pgpVerifyAndDecrypt)
                .convertBodyTo(String.class)
                .choice()
                .when(simple("${header.CamelFileName} ends with 'gpg'"))
                .setHeader("CamelFileName", simple("${file:name.noext.single}"))
//              .wireTap("file:" + fileArchive)
                .split(body()).streaming()
                .process(new EndpointParametersProcessor(decoderName))
                .to(toStr);

            } else if (Boolean.parseBoolean(isCompressedOnly)) { //Only Zipped or Compressed

                ZipFileDataFormat zipFile = new ZipFileDataFormat();
                zipFile.setUsingIterator(true);

                from(fromStr)
                .routeId(routeId)
//              .routeId("Zipped.Only")
                .log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
                .unmarshal(zipFile)
                .split(body(Iterator.class))
                .streaming()
                .convertBodyTo(String.class)
//              .wireTap("file:" + fileArchive)
                .split(body().tokenize("\n"), new FleetAggregationStrategy()).streaming()
                .process(new EndpointParametersProcessor(decoderName))
                .end()
                .choice()
                .when(simple("${body.length} > '0'" ))
                .to(toStr)
                .end();

            } else {
                //No Compression No Encryption Basic plain data file
                from(fromStr)
                .routeId(routeId)
                .log(LoggingLevel.INFO, "Message Received for No Compression No Encryption Basic plain data file " +  " from " + host)
//              .wireTap("file:" + fileArchive)
                .split(body()).streaming()
                .process(new EndpointParametersProcessor(decoderName))
                .to(toStr);
            }

TOSTR dynamic property
destinationEndpoint=activemq:queue:Fleet.InboundFile.Queue

有效路线

<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="fleet.inboundFile.gcms-new-activate.sftp.Route">
    <from uri="sftp://500100471@gemft.corporate.ge.com:10022/fromvan/gary?password=RAW(+W93j2Wa)&amp;recursive=false&amp;stepwise=false&amp;useList=true&amp;passiveMode=true&amp;disconnect=false&amp;move=.processed&amp;maxMessagesPerPoll=1&amp;eagerMaxMessagesPerPoll=false&amp;sortBy=file:modified&amp;sendEmptyMessageWhenIdle=false&amp;delay=60000&amp;initialDelay=60000&amp;connectTimeout=15000&amp;localWorkDirectory=/tmp&amp;readLockMinLength=0"/>
    <onException id="onException11">
        <to id="to14" uri="file:/GlobalScapeSftpRepo/Fleet/ge-digital/fleet/core/error"/>
    </onException>
    <onException id="onException13">
        <to id="to16" uri="file:///GlobalScapeSftpRepo/Fleet/ge-digital/fleet/core/unhandled"/>
    </onException>
    <log id="log15" loggingLevel="INFO" message="Message received ${file:name} for Only Zipped or Compressed files from host gemft.corporate.ge.com"/>
    <unmarshal id="unmarshal1">
        <gzip xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="dataFormat"/>
    </unmarshal>
    <split id="split1" streaming="true">
        <simple>${bodyAs(java.util.Iterator)}</simple>
        <convertBodyTo id="convertBodyTo1" type="java.lang.String"/>
        <split id="split2" streaming="true">
            <expressionDefinition>tokenize(simple{${body}},
)</expressionDefinition>
            <process id="process1"/>
        </split>
        <choice id="choice1">
            <when id="when1">
                <simple>${body.length} &gt; '0'</simple>
                <to id="to18" uri="activemq:queue:Fleet.InboundFile.Queue"/>
            </when>
        </choice>
    </split>
</route>

我在代理节点上得到什么

11:13:15,119 | WARN  | d]-nio2-thread-2 | ServerSession                    | 156 - org.apache.sshd.core - 0.14.0.redhat-001 | Exception caught
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)[:1.7.0_241]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)[:1.7.0_241]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)[:1.7.0_241]
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)[:1.7.0_241]
        at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:387)[:1.7.0_241]
        at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)[:1.7.0_241]
        at sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)[:1.7.0_241]
        at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:293)[:1.7.0_241]
        at java.lang.Thread.run(Thread.java:748)[:1.7.0_241]

客户端似乎正在关闭连接而不传输文件?可以这样做还是完全可以这样做。文件将从 500K 到 1 GB。
系统在 xfr 尝试后挂起,必须回收。

4

0 回答 0