我想就以下问题寻求一些指导。我正在尝试学习如何在没有模式注册表的情况下使用 nodejs 执行 Avro 数据的序列化,将其发布到 Kafka 集群,然后在 Kafka Streams (Java) 中检索它。
在 javascript 方面,我尝试使用 kafka-node 和 avsc 进行序列化。在 Kafka Streams 中,我决定实现一个自定义 Serde,因为据我所知,Streams API 提供的 Avro Serdes 旨在直接从 Schema Registry 获取模式。
这是一个简单生产者的 javascript 代码片段:
const avro = require('avsc');
const messageKey = "1";
const schemaType = avro.Type.forSchema({
type: "record",
name: "product",
fields: [
{
name: "id",
type: "int"
},
{
name: "name",
type: "string"
},
{
name: "price",
type: "double"
},
{
name: "stock",
type: "int"
}
]
});
const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);
以下是我目前尝试实现反序列化器的方式:
public Product deserialize(String topic, byte[] data) {
SeekableByteArrayInput inputstream = new SeekableByteArrayInput(data);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader;
Product product = null;
try {
dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
GenericRecord record = new GenericData.Record(schema);
while(dataFileReader.hasNext()) {
dataFileReader.next();
product = genericRecordToObject(record, new Product());
}
} catch (IOException e) {
e.printStackTrace();
}
return product;
}
但是,当流应用程序尝试反序列化数据时,我遇到了以下错误,特别是在 DataFileReader 被实例化的代码行:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
我不知道如何进行。任何意见,将不胜感激。