第一次在这里的 Stack Overflow 提问者......将尝试包含尽可能多的细节。
我正在尝试通过 Avro 接收器将 Apache Flume 日志数据传输到 Node.js 服务器,并在特定端口上进行侦听。我打算使用Collective Media 的 node-avro 库来帮助在 Avro 的二进制格式和 JSON 之间进行序列化,这样我就可以处理 Node.js 中的数据(我通过socket.io pub/sub 将它传递给客户端)。
我很确定我已经正确配置了 Flume,因为我看到数据流过通道并输出到控制台(仅用于调试,我也将数据下沉到控制台)。但是,当我启用 Avro 接收器并启动侦听同一端口的 Node.js 服务器时,Flume 在尝试进行 Avro 传输时会引发异常:
2013-02-15 22:06:09,858 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Failed to send batch
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Exception thrown from remote handler
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: NettyTransceiver closed
at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
... 6 more
Caused by: java.io.IOException: NettyTransceiver closed
at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338)
at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:236)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:93)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:476)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:623)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:101)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
2013-02-15 22:06:14,895 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] Avro sink k1: Building RpcClient with hostname: 127.0.0.1, port: 4242
我不确定的是如何确定我的 Node.js 服务是否至少收到了消息。我对 Node.js 很陌生,所以这无济于事,但这是设置侦听器的代码片段:
var flumeSink = require('http').createServer(flumeHandler);
flumeSink.listen(8000);
function flumeHandler (req, res) {
console.log("Got it!");
//var schema = avro.prepareSchema("string");
//var buffer = schema.encode("foo");
//var value = schema.decode(buffer);
}
我在想我错误地设置了 Node.js 端。我正在使用 HTTP 模块,它可能不是正确的模块。也许我需要考虑在 Node.js 中编写自定义接收器?指针/帮助表示赞赏!