1

所以我想实现一个简单的应用程序,将通知 kafka 生产者发送给 kafka 消费者。到目前为止,我已经成功地将字符串消息发送给生产者给消费者。但是当我尝试发送通知对象时,kafka 消费者没有收到任何对象。这是代码我用过。

public class Notification implements Serializable{

    private String name;
    private String message;
    private long currentTimeStamp;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public long getCurrentTimeStamp() {
        return currentTimeStamp;
    }

    public void setCurrentTimeStamp(long currentTimeStamp) {
        this.currentTimeStamp = currentTimeStamp;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Notification that = (Notification) o;

        if (currentTimeStamp != that.currentTimeStamp) return false;
        if (message != null ? !message.equals(that.message) : that.message != null) return false;
        if (name != null ? !name.equals(that.name) : that.name != null) return false;

        return true;
    }

    @Override
    public int hashCode() {
        int result = name != null ? name.hashCode() : 0;
        result = 31 * result + (message != null ? message.hashCode() : 0);
        result = 31 * result + (int) (currentTimeStamp ^ (currentTimeStamp >>> 32));
        return result;
    }

    @Override
    public String toString() {
        return "Notification{" +
                "name='" + name + '\'' +
                ", message='" + message + '\'' +
                ", currentTimeStamp=" + currentTimeStamp +
                '}';
    }
}

这是制片人

public class KafkaProducer {
    static String topic = "kafka-tutorial";


    public static void main(String[] args) {
        System.out.println("Start Kafka producer");
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        kafka.javaapi.producer.Producer<String, Notification> producer = new kafka.javaapi.producer.Producer<String, Notification>(producerConfig);
        KeyedMessage<String, Notification> message = new KeyedMessage<String, Notification>(topic, createNotification());
        System.out.println("send Message to broker");
        producer.send(message);
        producer.close();

    }

    private static Notification createNotification(){
        Notification notification = new Notification();
        notification.setMessage("Sample Message");
        notification.setName("Sajith");
        notification.setCurrentTimeStamp(System.currentTimeMillis());
        return notification;
    }
}

这是消费者

public class KafkaConcumer extends Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "kafka-tutorial";
    ConsumerConnector consumerConnector;


    public KafkaConcumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect","localhost:2181");
        properties.put("group.id","test-group");
        properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        System.out.println("It :" + it.size());
        while(it.hasNext()){
            System.out.println(new String(it.next().message()));
        }
    }

    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
        for(MessageAndOffset messageAndOffset: messageSet) {
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(new String(bytes, "UTF-8"));
        }
    }
}

最后我使用customserializer 序列化和反序列化对象。

public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
    public CustomSerializer(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }
    @Override
    public byte[] toBytes(Notification o) {
        return new byte[0];
    }

    @Override
    public Notification fromBytes(byte[] bytes) {
        return null;
    }
}

有人可以告诉我是什么问题吗?这是正确的方法吗?

4

3 回答 3

3

你有两个问题。

首先,您的反序列化器没有任何逻辑。它为它序列化的每个对象返回一个空字节数组,并在要求反序列化对象时返回一个空对象。您需要将实际序列化和反序列化对象的代码放在那里。

其次,如果您计划使用来自 JVM 的本机 JVM 序列化和反序列化逻辑,则需要将 serialVersionUID 添加到要传输的 bean 中。像这样的东西:

private static final long serialVersionUID = 123L;

你可以使用任何你喜欢的值。当一个对象被 JVM 反序列化时,对象中的 serialVersionId 将与加载的类定义中指定的值进行比较。如果两者不同,则 JVM 假定即使您加载了类定义,您也没有加载正确版本的类定义,并且序列化将失败。如果您没有在类定义中为 serialVersionID 指定值,那么 JVM 将为您组成一个,而两个不同的 JVM(一个与生产者有关,一个与消费者有关)几乎肯定会为您组成不同的值。

编辑

如果你想利用默认的 Java 序列化,你需要让你的序列化器看起来像这样:

public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
    public CustomSerializer(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

@Override
public byte[] toBytes(Notification o) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(o);
        oos.close();
        byte[] b = baos.toByteArray();
        return b;
    } catch (IOException e) {
        return new byte[0];
    }
}

@Override
public Notification fromBytes(byte[] bytes) {
    try {
        return (Notification) new ObjectInputStream(new ByteArrayInputStream(b)).readObject();
    } catch (Exception e) {
        return null;
    }    
}
于 2015-11-30T12:07:57.587 回答
1

创建一个自定义的反序列化器,Kafka需要一种序列化和反序列化的方法。到目前为止我们必须提供这两种实现需要添加库来获取对象映射器类

FasterXML 杰克逊 – 2.8.6

示例 -序列化程序

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { @Override public byte[] serialize(String arg0, Object arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); TestModel model =(TestModel) arg1; try { retVal = objectMapper.writeValueAsString(model).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } @Override public void close() { } @Override public void configure(Map map, boolean bln) { } }

解串器

public class PayloadDeserializer implements Deserializer { @Override public void close() { } @Override public TestModel deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); TestModel testModel = null; try { testModel = mapper.readValue(arg1, TestModel.class); } catch (Exception e) { e.printStackTrace(); } return testModel; } @Override public void configure(Map map, boolean bln) { } } 最后我们必须将解串器类传递给接收器

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - PayloadDeserializer.class

或者

deserializer.class - 类路径.PayloadDeserializer

于 2018-05-09T05:21:28.887 回答
1

我强烈建议您在发送之前将对象转换为 Avro 对象。

这并不难,是Kafka传输对象的方式。

于 2018-05-09T06:10:09.667 回答