提前致谢!
我正在尝试/需要将文件从 FTP 写入 AMQ 队列。原因 - 我正在尝试使用 Camel JMS AMQ 在我的路由上添加故障转移。我是 Apache ActiveMQ JMS 的新手。我在 2 个单独的节点上有 2 个 AMQ 代理。在其他 2 个节点/服务器上,我有我的 JBoss Fuse Karaf Containers 客户端应用程序。我正在从客户那里连接到经纪人。AMQ 控制台、日志等。但是,我无法从 FTP 或电子邮件路由将文件写入队列。我猜我做错了什么,希望你能帮助解决这个问题。我想知道这是否可以在我尝试这样做时完成?
AMQ 代理节点配置片段 - 两者都相同,我尝试删除任何相关的超时。没有差异。
活动MQ.xml
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1gb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="90"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.141.145.173:61617?connectionTimeout=0&keepAlive=true&useInactivityMonitor=false&wireFormat.maxInactivityDuration=0&enableStatusMonitor=true"/>
</transportConnectors>
节点 2
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.141.128.182:61617?connectionTimeout=0&keepAlive=true&useInactivityMonitor=false&wireFormat.maxInactivityDuration=0&enableStatusMonitor=true"/>
</transportConnectors>
系统配置文件
# node 1
activemq.port = 61617
#activemq.host = localhost
activemq.host = 10.141.145.173
activemq.url = tcp://${activemq.host}:${activemq.port}
#
# Activemq configuration node 2
#
activemq.port = 61617
#activemq.host = localhost
activemq.host = 10.141.128.182
activemq.url = tcp://${activemq.host}:${activemq.port}
使用故障转移传输的客户端
jmsSourceDestination=Fleet.InboundFile.Queue
clusteredJmsMaximumRedeliveries=5
amq.url=failover://(tcp://10.141.145.173:61617,tcp://10.141.128.182:61617)?initialReconnectDelay=1000&randomize=false&timeout=5000
amq.username=admin
amq.password=admin
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.141.145.173:61617?connectionTimeout=0&keepAlive=true&useInactivityMonitor=false&wireFormat.maxInactivityDuration=0&enableStatusMonitor=true"/>
</transportConnectors>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.141.145.173:61617?connectionTimeout=0&keepAlive=true&useInactivityMonitor=false&wireFormat.maxInactivityDuration=0&enableStatusMonitor=true"/>
</transportConnectors>
路线形成 - 我有一个动态路线构建器,我将配置放入其中以创建路线
@Override
public void process(Exchange exchange) throws Exception {
final String endpointConfigurationStr = exchange.getIn().getBody(String.class);
LOG.info(endpointConfigurationStr);
final String fileName = (String) exchange.getIn().getHeader("CamelFileName");
if ((null != endpointConfigurationStr) && (null != fileName)) {
Properties props = new Properties();
props.load(new StringReader(endpointConfigurationStr));
if (validateProperties(props)) {
final String decoderName = props.getProperty("decoderName");
LOG.info("DECODER NAME: " + decoderName);
final String fileNameNoExtension = fileName.substring(0, fileName.lastIndexOf('.'));
final String routeIdStr = String.format("fleet.inboundFile.%s.%s.Route", fileNameNoExtension, props.getProperty("transport"));
if (props.getProperty("action").equalsIgnoreCase("activate")) {
ServiceStatus routeStatus = exchange.getContext().getRouteStatus(routeIdStr);
if ((null == routeStatus) || (routeStatus.isStopped())) {
exchange.getContext().addRoutes(new EndpointFileRouteBuilder(routeIdStr,
EndpointDescriptorFactory(props),
props.getProperty("errorArchive"),
props.getProperty("unhandledArchive"),
destinationEndpoint,
decoderName) );
} else {
LOG.info("Route " + routeIdStr + " already started");
}
} else if (props.getProperty("action").equalsIgnoreCase("deactivate")) {
ServiceStatus routeStatus = exchange.getContext().getRouteStatus(routeIdStr);
if (routeStatus.isStarted()) {
exchange.getContext().stopRoute(routeIdStr);
} else {
LOG.debug("Route " + routeIdStr + " already stopped");
}
} else {
LOG.error("Invalid Action in File Properties");
}
} else {
LOG.error("Invalid Properties File ");
}
} else {
LOG.error("File Configuration File or File Name is null");
}
}
从 STR 到 STR 的路线
if (validateConfiguration()) {
switch (transport.toLowerCase()) {
case "ftp":
fromStr = String.format("%s://%s@%s:%s/%s?password=RAW(%s)&recursive=%s&stepwise=%s&useList=%s&passiveMode=%s&disconnect=%s"
+ "&move=.processed"
+ "&maxMessagesPerPoll=1"
+ "&eagerMaxMessagesPerPoll=false"
+ "&sortBy=file:modified"
+ "&sendEmptyMessageWhenIdle=false"
+ "&delay=60000"
+ "&initialDelay=60000"
+ "&connectTimeout=15000"
+ "&localWorkDirectory=/tmp"
+ "&readLockMinLength=0"
, transport, username, host, port, path, password, recursive, stepwise, useList, passiveMode, disconnect);
break;
case "sftp":
fromStr = String.format("%s://%s@%s:%s/%s?password=RAW(%s)&recursive=%s&stepwise=%s&useList=%s&passiveMode=%s&disconnect=%s"
+ "&move=.processed"
+ "&maxMessagesPerPoll=1"
+ "&eagerMaxMessagesPerPoll=false"
+ "&sortBy=file:modified"
+ "&sendEmptyMessageWhenIdle=false"
+ "&delay=60000"
+ "&initialDelay=60000"
+ "&connectTimeout=15000"
+ "&localWorkDirectory=/tmp"
+ "&readLockMinLength=0"
, transport, username, host, port, path, password, recursive, stepwise, useList, passiveMode, disconnect);
break;
case "file":
fromStr = String.format("%s://%s/?recursive=%s"
+ "&move=.processed"
+ "&readLock=changed"
+ "&maxMessagesPerPoll=1"
+ "&sortBy=file:modified"
+ "&delay=60000"
+ "&initialDelay=60000"
+ "&renameUsingCopy=true"
,transport, path, recursive);
break;
default:
LOG.info("Unsupported transport, cannot establish from or source endpoint!");
throw new UnsupportedTransportException("Unsupported transport, cannot establish from or source endpoint!");
}
// Format the To Endpoint from Parameter(s).
final String toStr = String.format("%s", toEndpoint);
LOG.info("*** toStr - toEndpoint: " + toStr);
路线创建
if (Boolean.parseBoolean(isEncryptedWithCompression)) {
//Compression and Encryption
PGPDataFormat pgpVerifyAndDecrypt = new PGPDataFormat();
pgpVerifyAndDecrypt.setKeyFileName("keys/secring.gpg");
pgpVerifyAndDecrypt.setKeyUserid(pgpKeyUserId);
pgpVerifyAndDecrypt.setPassword(pgpPassword);
pgpVerifyAndDecrypt.setArmored(Boolean.parseBoolean(pgpArmored));
pgpVerifyAndDecrypt.setSignatureKeyFileName("keys/pubring.gpg");
pgpVerifyAndDecrypt.setSignatureKeyUserid(pgpKeyUserId);
pgpVerifyAndDecrypt.setSignatureVerificationOption(PGPKeyAccessDataFormat.SIGNATURE_VERIFICATION_OPTION_IGNORE);
from(fromStr)
// .routeId("Compression.with.Encryption")
.routeId(routeId)
.log("Message received ${file:name} for Compression and Encryption " + " from host " + host)
.unmarshal(pgpVerifyAndDecrypt).split(new ZipSplitter())
.streaming().convertBodyTo(String.class)
// .wireTap("file:" + fileArchive)
.split(body()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.to(toStr);
} else if (Boolean.parseBoolean(isEncryptedOnly)) {
//Encryption Only
PGPDataFormat pgpVerifyAndDecrypt = new PGPDataFormat();
pgpVerifyAndDecrypt.setKeyFileName("keys/secring.gpg");
pgpVerifyAndDecrypt.setKeyUserid(pgpKeyUserId);
pgpVerifyAndDecrypt.setPassword(pgpPassword);
pgpVerifyAndDecrypt.setArmored(Boolean.parseBoolean(pgpArmored));
pgpVerifyAndDecrypt.setSignatureKeyFileName("keys/pubring.gpg");
pgpVerifyAndDecrypt.setSignatureKeyUserid(pgpKeyUserId);
pgpVerifyAndDecrypt.setSignatureVerificationOption(PGPKeyAccessDataFormat.SIGNATURE_VERIFICATION_OPTION_IGNORE);
from(fromStr)
// .routeId("Encryption.Only")
.routeId(routeId)
.log("Message received ${file:name} for Encryption Only " + " from host " + host)
.unmarshal(pgpVerifyAndDecrypt)
.convertBodyTo(String.class)
.choice()
.when(simple("${header.CamelFileName} ends with 'gpg'"))
.setHeader("CamelFileName", simple("${file:name.noext.single}"))
// .wireTap("file:" + fileArchive)
.split(body()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.to(toStr);
} else if (Boolean.parseBoolean(isCompressedOnly)) { //Only Zipped or Compressed
ZipFileDataFormat zipFile = new ZipFileDataFormat();
zipFile.setUsingIterator(true);
from(fromStr)
.routeId(routeId)
// .routeId("Zipped.Only")
.log(LoggingLevel.INFO, "Message received ${file:name} for Only Zipped or Compressed files from host " + host)
.unmarshal(zipFile)
.split(body(Iterator.class))
.streaming()
.convertBodyTo(String.class)
// .wireTap("file:" + fileArchive)
.split(body().tokenize("\n"), new FleetAggregationStrategy()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.end()
.choice()
.when(simple("${body.length} > '0'" ))
.to(toStr)
.end();
} else {
//No Compression No Encryption Basic plain data file
from(fromStr)
.routeId(routeId)
.log(LoggingLevel.INFO, "Message Received for No Compression No Encryption Basic plain data file " + " from " + host)
// .wireTap("file:" + fileArchive)
.split(body()).streaming()
.process(new EndpointParametersProcessor(decoderName))
.to(toStr);
}
TOSTR dynamic property
destinationEndpoint=activemq:queue:Fleet.InboundFile.Queue
有效路线
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="fleet.inboundFile.gcms-new-activate.sftp.Route">
<from uri="sftp://500100471@gemft.corporate.ge.com:10022/fromvan/gary?password=RAW(+W93j2Wa)&recursive=false&stepwise=false&useList=true&passiveMode=true&disconnect=false&move=.processed&maxMessagesPerPoll=1&eagerMaxMessagesPerPoll=false&sortBy=file:modified&sendEmptyMessageWhenIdle=false&delay=60000&initialDelay=60000&connectTimeout=15000&localWorkDirectory=/tmp&readLockMinLength=0"/>
<onException id="onException11">
<to id="to14" uri="file:/GlobalScapeSftpRepo/Fleet/ge-digital/fleet/core/error"/>
</onException>
<onException id="onException13">
<to id="to16" uri="file:///GlobalScapeSftpRepo/Fleet/ge-digital/fleet/core/unhandled"/>
</onException>
<log id="log15" loggingLevel="INFO" message="Message received ${file:name} for Only Zipped or Compressed files from host gemft.corporate.ge.com"/>
<unmarshal id="unmarshal1">
<gzip xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="dataFormat"/>
</unmarshal>
<split id="split1" streaming="true">
<simple>${bodyAs(java.util.Iterator)}</simple>
<convertBodyTo id="convertBodyTo1" type="java.lang.String"/>
<split id="split2" streaming="true">
<expressionDefinition>tokenize(simple{${body}},
)</expressionDefinition>
<process id="process1"/>
</split>
<choice id="choice1">
<when id="when1">
<simple>${body.length} > '0'</simple>
<to id="to18" uri="activemq:queue:Fleet.InboundFile.Queue"/>
</when>
</choice>
</split>
</route>
我在代理节点上得到什么
11:13:15,119 | WARN | d]-nio2-thread-2 | ServerSession | 156 - org.apache.sshd.core - 0.14.0.redhat-001 | Exception caught
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)[:1.7.0_241]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)[:1.7.0_241]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)[:1.7.0_241]
at sun.nio.ch.IOUtil.read(IOUtil.java:197)[:1.7.0_241]
at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:387)[:1.7.0_241]
at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)[:1.7.0_241]
at sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)[:1.7.0_241]
at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:293)[:1.7.0_241]
at java.lang.Thread.run(Thread.java:748)[:1.7.0_241]
客户端似乎正在关闭连接而不传输文件?可以这样做还是完全可以这样做。文件将从 500K 到 1 GB。
系统在 xfr 尝试后挂起,必须回收。