3

我想使用 ruby​​ kafka 客户端库来生成事件,但遇到了一个我不知道如何解决的问题。任何帮助,将不胜感激。

我尝试过使用 kafka-rb(acrosa、mheffner 和 bpot 叉子)。问题是,无论我通过图书馆发送什么,例如

require 'kafka'
host = 'localhost'
port = 9092
producer = Kafka::Producer.new(

        :topic => 'login',
        :host => host,
        :port => port
)
producer.send([Kafka::Message.new("aaaaa")])

我得到一个:

java.nio.BufferUnderflowException
    at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127)
    at java.nio.ByteBuffer.get(ByteBuffer.java:675)
    at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:22)
    at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:34)
    at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:34)
    at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:34)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:48)
    at kafka.network.Processor.read(SocketServer.scala:321)
    at kafka.network.Processor.run(SocketServer.scala:231)
    at java.lang.Thread.run(Thread.java:680)

在服务器上。在同一台服务器上,我可以通过提供的控制台生产者发送文本而不会出现任何问题。

如果您之前看到过此内容,我将不胜感激。由于我对 Scala 不是很熟悉,所以我不确定问题出在哪里,但在我看来,引发此异常的那一行与从套接字读取 clientId 有关,而且在我看来, ruby 客户端没有发送这样的东西。

当我查看在 tcpdump 表单 kafka-rb 和提供的生产者上生成的消息时。红宝石的看起来更短。此外,无论我使用 kafka-0.7 还是 0.8,我都会得到完全相同的行为。

4

1 回答 1

1

我可以用 Kafka 0.8 重现你的错误,但是当我尝试这个实现时:

require 'rubygems'
require 'kafka'
producer = Kafka::Producer.new({ :host => "localhost", :port => 9092, :topic => "test" , :compression => 0 })
message = Kafka::Message.new("some random message content")
producer.send(message)

它适用于 Kafka 0.7:

消费者示例

根据 Kafka 文档,您提到的客户端仅支持 0.7.x ( https://cwiki.apache.org/KAFKA/clients.html#Clients-Ruby )。0.8 部分说:

0.8 版本相当大地改变了协议。这个版本还没有发布。此处记录了新协议。许多客户端正在进行中,我们将在完成后更新。

所以我认为https://github.com/acrosa/kafka-rb在这种情况下不起作用:-(。

最好的

于 2013-02-08T15:19:18.843 回答