4

我正在使用 Confluent 3.3.0。我的意图是使用 kafka-connect 将来自 Kafka 主题的值插入到 Oracle 表中。我的连接器适用于我使用 avro 控制台生成器生成的 avro 记录,如下所示:

./kafka-avro-console-producer --broker-list 192.168.0.1:9092 --topic topic6 --property value.schema='{"type":"record","name":"flights3","fields":[{"name":"flight_id","type":"string"},{"name":"flight_to", "type": "string"}, {"name":"flight_from", "type": "string"}]}'

我插入如下值:

{"flight_id":"1","flight_to":"QWE","flight_from":"RTY"}

我想要实现的是使用 Java 应用程序,使用对象插入相同的数据。下面是我的生产者代码:

public class Sender {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "serializers.custom.FlightSerializer");
        props.put("schema.registry.url", "http://192.168.0.1:8081");
        Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);
        Flight myflight = new Flight("testflight1","QWE","RTY");
        ProducerRecord<String, Flight> record = new ProducerRecord<String, Flight>("topic5","key",myflight);

        try {
            producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

以下是飞行画外音:

package vo;

public class Flight {
    String flight_id,flight_to,flight_from;

    public Flight(String flight_id, String flight_to, String flight_from) {
        this.flight_id = flight_id;
        this.flight_to = flight_to;
        this.flight_from = flight_from;
    }

    public Flight(){
    }

    public String getFlight_id() {
        return flight_id;
    }

    public void setFlight_id(String flight_id) {
        this.flight_id = flight_id;
    }

    public String getFlight_to() {
        return flight_to;
    }

    public void setFlight_to(String flight_to) {
        this.flight_to = flight_to;
    }

    public String getFlight_from() {
        return flight_from;
    }

    public void setFlight_from(String flight_from) {
        this.flight_from = flight_from;
    }
}

最后,序列化器:

package serializers.custom;

import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import vo.Flight;
import com.fasterxml.jackson.databind.ObjectMapper;

public class FlightSerializer implements Serializer<Flight> {
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
    }

    @Override
    public byte[] serialize(String arg0, Flight arg1) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();

        try {
            retVal = objectMapper.writeValueAsString(arg1).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return retVal;
    }
}

但我的理解是,需要定义类似模式的东西,并使用一些 avro 序列化程序来获取确切的数据,就像我使用avro console consumer所做的那样。我已经浏览了一些示例代码,但没有一个对我有用。

编辑

我尝试了以下代码。但是在 avro 控制台消费者中什么也没有出现。

package producer.serialized.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import vo.Flight;
import java.util.Properties;

public class Sender {
public static void main(String[] args) {
String flightSchema = "{\"type\":\"record\"," + "\"name\":\"flights\","
+ "\"fields\":[{\"name\":\"flight_id\",\"type\":\"string\"},{\"name\":\"flight_to\",\"type\":\"string\"},{\"name\":\"flight_from\",\"type\":\"string\"}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(flightSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "1");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic6", avroRecord);

try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4

2 回答 2

1

架构没有定义,所以什么时候KafkaAvroSerializer必须联系架构注册表来提交架构,它不会拥有它。

您必须为您的对象创建一个模式Flight

下面的 file.avdl(avro 扩展文件之一)的示例就可以了:

@namespace("vo")
protocol FlightSender {

    record Flight {
       union{null, string} flight_id = null;
       union{null, string} flight_to = null;
       union{null, string} flight_from = null;
    }
}

请参阅Avro IDL 文档

在编译时,当您使用 时avro-maven-plugin,上面的 avro 模式将生成您的 javaFlight类,因此您必须删除之前创建的类。

当它涉及您的主类时,您必须设置如下两个属性:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 

而您的制作人,您可以使用您生成的特定 Avro 类

Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);

希望这会有所帮助:-)

于 2018-01-11T17:49:58.977 回答
0

像我使用 avro 控制台消费者所做的确切数据

您可以查看源代码


假设你想使用通用记录,这一切都是正确的,

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
...
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

...

GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "1");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic6", avroRecord);

try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

但是你错过了一个电话,producer.flush()producer.close()在最后实际发送了这批记录

于 2018-10-01T00:31:53.530 回答