0

我是 apache camel 和 apache kafka 的新手,并为我的项目做一个小的 POC。尝试使用 Camel-kafka 组件从 kafka 读取时,我收到以下问题错误日志。

[2016-01-20 08:47:10,979] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2016-01-20 08:47:44,643] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2016-01-20 08:47:54,545] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
    at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
    at kafka.network.MultiSend.writeTo(Transmission.scala:101)
    at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
    at kafka.network.Processor.write(SocketServer.scala:472)
    at kafka.network.Processor.run(SocketServer.scala:342)
    at java.lang.Thread.run(Thread.java:745)

我的java代码如下:

public class Main {
public static void main(String[] args) throws Exception {

    CamelContext context = new DefaultCamelContext();
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("kafka:127.0.0.1:9092?topic=TEST&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1")
                    /*.marshal(xmlJsonFormat)*/
            .process(new XmlToJson())
                    /*.to("kafka:localhost:9092?topic=TestJson&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1");*/
            .to("file:/Users/himanshu/Desktop/TransCamelFuse/test.txt");
        }
    });
    context.start();
    Thread.sleep(10000);
    context.stop();
}

}

我从 kafka 生产者控制台工具中放了一些 txt,并尝试使用 kafka 的骆驼组件进行阅读。

4

2 回答 2

0

这不是错误。由于您提供的数据类型而出现此错误。您正在将数据一一手动地提供给 Kafka。

于 2016-12-21T13:29:06.327 回答
0

您需要确保 kafka 服务器和 kafka 客户端版本与我面临的每个版本都兼容,我的 kafka 服务器是 0.8 并且一些 spring bean 使用 kafka 2.4 库,因此每当 2.4 客户端请求任何 kafka 0.8 操作 server.log 时都会记录此错误

解决方案 确保在代码中没有 bean 使用不兼容的客户端进行通信。

于 2020-04-23T13:16:10.637 回答