我写了一个风暴拓扑。我基本上想以字节数组的形式将 avro 模式中的元组发送到 kafka 主题。
这就是我设置螺栓的方式:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
这就是我转换为字节数组的方式
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
当我以以下方式发出元组时,我在 kafka 主题中看不到任何内容(向 kafka 发送字节流):
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
但相反,如果我将字节数组转换为字符串,然后转换为 kafka 主题,它可以工作:
如下所示:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
我究竟做错了什么?如何使用 Storm kafka bolt 将字节流发送到 kafka 主题?