0

我正在尝试简单的 spring XD 应用程序在 HDFS 中加载日志事件。spring-ampq/rabbit log4j appender我已经使用(类)配置了目标应用程序,org.springframework.amqp.rabbit.log4j.AmqpAppender以将日志消息泵送到预先配置的交换器。我设置了以下流以从 HDFS 中提取这些消息并将它们推送到 HDFS,其中 soruce 和 sink 模块都是现成的 XD 模块,

流定义,

xd:>stream create --name demoQ1 --definition "rabbit | hdfs --rollover=15 --directory=/user/root" --deploy

创建并部署了新的流“demoQ1”

xd:>stream list
  Stream Name  Stream Definition                                   Status
  -----------  --------------------------------------------------  --------
  demoQ1       rabbit | hdfs --rollover=15 --directory=/user/root  deployed

AMQP Appender 正在发布要交换的消息并将其路由到 demoQ1 队列,其中 rabbit 源正在接收第一条消息然后卡住,因为它不确认消息。可能是什么原因?

4

2 回答 2

0

在您的容器日志中,您是否看到:“未能将消息有效负载写入 HDFS”?

如果是这样,那么您需要使用模块之间的类型转换。从 rabbit 源到 hdfs sink,消息将只是字节数组。

您的流定义可能是,

流创建 --name demoQ1 --definition "rabbit --outputType=text/plain | hdfs --rollover=15 --directory=/user/root" --deploy

或者,

流创建 --name demoQ1 --definition "rabbit | hdfs --inputType=text/plain --rollover=15 --directory=/user/root" --deploy

分别注意 source/sink 中的 outputType 或 inputType 选项。在这种情况下,hdfs 接收器的 HdfsStoreMessageHandler 期望负载为字符串类型。

有关类型转换的更多详细信息,请查看: https ://github.com/spring-projects/spring-xd/wiki/Type-Conversion

于 2014-04-21T13:03:09.943 回答
-1

在运行rabbit模块的spring XD容器上启用了调试日志,它显示第一条消息重复发生以下异常并且消息被重新排队,因此消息处于未确认状态,并且rabbit源无法处理更多消息。

为了解决这个问题,我从 log4j Appender 属性中删除了这个属性,log4j.appender.amqp.contentEncoding=null. 该属性明确将编码器的名称指定为“null”,这似乎是一个错误。我期待 null 意味着没有指定编码器:)

日志中的异常,随着消息被拒绝并重新排队而不断重复..

19:29:17,713 DEBUG SimpleAsyncTaskExecutor-1 listener.BlockingQueueConsumer:268 - Received message: (Body:'Hello'MessageProperties [headers={categoryName=org.apache.hadoop.yarn.server.nodemanager.NodeManager, level=INFO}, timestamp=Sat Apr 19 19:21:52 PDT 2014, messageId=null, userId=null, appId=NodeManager, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=test-exch, receivedRoutingKey=rk1, deliveryTag=184015, messageCount=0]) 19:29:17,715 WARN SimpleAsyncTaskExecutor-1 listener.SimpleMessageListenerContainer:530 - Execution of Rabbit message listener failed, and no ErrorHandler has been set. org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:751) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:690) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:583) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:154) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1111) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:556) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989) at java.lang.Thread.run(Thread.java:722) Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert text-based Message content at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:100) at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:73) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:688) ... 10 more Caused by: java.io.UnsupportedEncodingException: null at java.lang.StringCoding.decode(StringCoding.java:190) at java.lang.String.(String.java:416) at java.lang.String.(String.java:481) at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:97) ... 12 more 19:29:17,715 DEBUG SimpleAsyncTaskExecutor-1 listener.BlockingQueueConsumer:657 - Rejecting messages (requeue=true)
于 2014-05-11T01:08:11.473 回答