1

我有一个像下面这样的 json 记录-:

val warningJson = """{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"""

但是,我希望它解析为 avro 数据记录并传递 scema,如 - :

val outputSchemaStringTestData = """{"type":"record","name":"Warning","namespace":"test","fields":[{"name":"p1","type":"string"},{"name":"p2","type":"string"},{"name":"p3","type":"string"}]}"""

在这里,当我使用以下代码创建通用记录时使用此模式,我可以做到 - :

val genericRecord: GenericRecord = new GenericData.Record(outputSchemaTestData)
 genericRecord.put("p1", """{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}""")
 genericRecord.put("p2", """{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}""")
 genericRecord.put("p3", """{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}""")

但是,我使用相同的模式warningJson通过传递使用下面提到的代码进行解析outputSchemaStringTestData,我收到一个错误,如下所述 - :

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('t' (code 116)): was expecting comma to separate Object entries
 at [Source: (String)"{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"; line: 1, column: 11]
org.apache.avro.SchemaParseException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('t' (code 116)): was expecting comma to separate Object entries
 at [Source: (String)"{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"; line: 1, column: 11]

我的转换代码-:

package com.Izac.Cep.KafkaSourceAndSink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;

import java.io.*;

public class JsonToAvro {
    public static GenericRecord createGenericRecord(final String schemastr, final String json) throws Exception {
        //Schema.Parser schemaParser = new Schema.Parser();
        //Schema schema = schemaParser.parse(schemastr);
        Schema schema= Schema.parse(schemastr);
        byte[] avroByteArray = fromJasonToAvro(json, schemastr);

        DatumReader<GenericRecord> reader1 = new GenericDatumReader<GenericRecord>(schema);

        Decoder decoder1 = DecoderFactory.get().binaryDecoder(avroByteArray, null);
        GenericRecord result = reader1.read(null, decoder1);
        return result;

    }

    private static byte[] fromJasonToAvro(String json, String schemastr) throws Exception {

        Schema schema = Schema.parse(schemastr);
        InputStream input = new ByteArrayInputStream(json.getBytes());
        DataInputStream din = new DataInputStream(input);


        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object datum = reader.read(null, decoder);


        GenericDatumWriter<Object> w = new GenericDatumWriter<Object>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

        w.write(datum, e);
        e.flush();

        return outputStream.toByteArray();
    }
}
4

0 回答 0