0
import kafka.serializer.Encoder
import kafka.utils.VerifiableProperties
import org.jnetpcap.packet.PcapPacket
class PcapEncoder(verifiableProperties: VerifiableProperties) extends Encoder[PcapPacket]  { override def toBytes(customMessage: PcapPacket): Array[Byte] = customMessage.transferStateAndDataFrom(PcapPacket : Array[Byte]) }

`这是我编写的编码器,用于对使用 jnetpcap 库捕获的数据包进行编码,以将其传递给 kafka 消费者。但是我有错误,这是实现编码器的方法吗?

4

1 回答 1

1

假设您已经安装并运行了 Kafka,您将使用生产者接口之一(可能是 kafka.javaapi.producer.Producer)建立到 Kafka 到特定主题的连接。然后,当您获得捕获的数据包时,将其传递给 API。Kafka 对原始字节没有问题。完成后,您通过该 API 关闭连接。

于 2014-08-22T05:43:08.237 回答