0

我想就以下问题寻求一些指导。我正在尝试学习如何在没有模式注册表的情况下使用 nodejs 执行 Avro 数据的序列化,将其发布到 Kafka 集群,然后在 Kafka Streams (Java) 中检索它。

在 javascript 方面,我尝试使用 kafka-node 和 avsc 进行序列化。在 Kafka Streams 中,我决定实现一个自定义 Serde,因为据我所知,Streams API 提供的 Avro Serdes 旨在直接从 Schema Registry 获取模式。

这是一个简单生产者的 javascript 代码片段:

const avro = require('avsc');

const messageKey = "1";

const schemaType = avro.Type.forSchema({
    type: "record",
    name: "product",
    fields: [
        {
        name: "id",
        type: "int"
        },
        {
        name: "name",
        type: "string"
        },
        {
        name: "price",
        type: "double"
        },
        {
        name: "stock",
        type: "int"
        }
    ]
});

const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);

以下是我目前尝试实现反序列化器的方式:

public Product deserialize(String topic, byte[] data) {
    SeekableByteArrayInput inputstream =  new SeekableByteArrayInput(data);

    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

    DataFileReader<GenericRecord> dataFileReader;

    Product product = null;
    try {
        dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
        GenericRecord record = new GenericData.Record(schema);

        while(dataFileReader.hasNext()) {
            dataFileReader.next();
            product = genericRecordToObject(record, new Product());
        }

    } catch (IOException e) {
        e.printStackTrace();

    }   
    return product;
}

但是,当流应用程序尝试反序列化数据时,我遇到了以下错误,特别是在 DataFileReader 被实例化的代码行:

org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

我不知道如何进行。任何意见,将不胜感激。

4

1 回答 1

2

也许我错了,但我认为你不应该使用 DataFileReader,只能使用 DatumReader。

我在kafka(不是Kafka Streams)中做了类似的事情,也许可以给你一些想法:

完整的例子(非常简单)在这里: https ://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java

如您所见,我没有创建序列化程序,而是反序列化值并获取通用记录。

public static void main(String[] args) {
        final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
        consumer.subscribe(Collections.singleton(TOPIC));
        ConsumerRecords<String, byte[]> consumerRecords;
        String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
                + "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
                + "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
                + "]}}";
        Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
        SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
        try {
            while (true) {
                consumerRecords = consumer.poll(1000);

                consumerRecords.forEach(record -> {
                    ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
                    GenericRecord deserializedValue = null;
                    try {
                        deserializedValue = datumReader.read(null, binaryDecoder);
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
                });

                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
            System.out.println("DONE");
        }

    }

我希望它有所帮助。

于 2020-03-08T10:39:04.133 回答