我正在使用 Apache Kafka,并且一直在尝试使用 Kafka Streams 功能。我要实现的目标非常简单,至少在文字上是这样,并且可以通过常规的普通消费者/生产者方法轻松实现:
- 从主题的动态列表中读取
- 对消息做一些处理
- 将消息推送到另一个主题,该主题的名称是根据消息内容计算出来的
最初我以为我可以创建一个自定义接收器或注入某种端点解析器,以便以编程方式为每条消息定义主题名称,尽管最终找不到任何方法来做到这一点。因此,我深入研究了代码并找到了ProducerInterceptor类(引用自JavaDoc):
一个插件接口,允许您在生产者收到的记录发布到 Kafka 集群之前拦截(并可能改变)这些记录。
它是onSend方法:
这是从 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 方法调用的,在键和值被序列化并分配分区之前(如果在 ProducerRecord 中未指定分区)。
这对我来说似乎是一个完美的解决方案,因为我可以有效地返回一个带有我想要的主题名称的新ProducerRecord 。虽然显然有一个错误(我在他们的 JIRA 上打开了一个问题:KAFKA-4691)并且当键和值已经被序列化时调用该方法。太糟糕了,因为我认为此时进行额外的反序列化是可以接受的。
我对您更有经验和知识渊博的用户的问题将是您的意见和想法以及任何关于如何以高效和优雅的方式实现它的建议。
提前感谢您的帮助/意见/建议/想法。
以下是我尝试过的一些代码片段:
public static void main(String[] args) throws Exception {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
StringDeserializer stringDeserializer = new StringDeserializer();
StringSerializer stringSerializer = new StringSerializer();
MyObjectSerializer myObjectSerializer = new MyObjectSerializer();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, myObjectSerializer, Pattern.compile("input-.*"));
.addProcessor("PROCESS", MyCustomProcessor::new, "SOURCE");
System.out.println("Starting PurchaseProcessor Example");
KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
streaming.start();
System.out.println("Now started PurchaseProcessor Example");
}
private static Properties getProperties() {
Properties props = new Properties();
.....
.....
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.test.kafka.streams.OutputTopicRouterInterceptor");
return props;
}
OutputTopicRouterInterceptor onSend 实现:
@Override
public ProducerRecord<String, MyObject> onSend(ProducerRecord<String, MyObject> record) {
MyObject obj = record.value();
String topic = computeTopicName(obj);
ProducerRecord<String, MyObject> newRecord = new ProducerRecord<String, MyObject>(topic, record.partition(), record.timestamp(), record.key(), obj);
return newRecord;
}