我是三叉戟的新手。我正在编写一个从 kafka 读取数据的三叉戟拓扑。主题名称是“测试”。我有本地卡夫卡设置。我在本地启动了 zookeeper,kafka。并在 kafka 中创建了一个主题“测试”并打开了生产者并输入了消息“Hello Kafka!”。
我想使用 trident 从“测试”主题中读取消息“Hello Kafka”。
下面是我的代码。我得到空元组。
TridentTopology topology = new TridentTopology();
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
.each(new Fields(), new TestFilter()).parallelismHint(1)
.each(new Fields(), new Utils.PrintFilter());
这是我的 TestFilter 类代码
public TestFilter()
{
//
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean isKeep=true;
System.out.println("TestFilter is called...");
if (tuple != null && tuple.getValues().size()>0) {
System.out.println("data from kafka ::: "+tuple.getValues());
}
return isKeep;
}
每当我在 kafka 生产者中向“测试”主题键入消息时,首先打印 sysout,但它没有通过 if 循环。我只是收到消息'TestFilter 被调用...'仅此而已。
我想将我生成的实际数据用于“测试”主题。如何?