应用程序正在从一个 Kafka 主题读取消息,在存储在 MongoDB 中并进行一些验证之后,它正在写入另一个主题。在这里,我面临着应用程序进入无限循环的问题。我的代码如下:
Hosts zkHosts = new ZkHosts("localhost:2181");
String zkRoot = "/brokers/topics" ;
String clientRequestID = "reqtest";
String clientPendingID = "pendtest";
SpoutConfig kafkaRequestConfig = new SpoutConfig(zkHosts,"reqtest",zkRoot,clientRequestID);
SpoutConfig kafkaPendingConfig = new SpoutConfig(zkHosts,"pendtest",zkRoot,clientPendingID);
kafkaRequestConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaPendingConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaRequestSpout = new KafkaSpout(kafkaRequestConfig);
KafkaSpout kafkaPendingSpout = new KafkaSpout(kafkaPendingConfig);
MongoBolt mongoBolt = new MongoBolt() ;
DeviceFilterBolt deviceFilterBolt = new DeviceFilterBolt() ;
KafkaRequestBolt kafkaReqBolt = new KafkaRequestBolt() ;
abc1DeviceBolt abc1DevBolt = new abc1DeviceBolt() ;
DefaultTopicSelector defTopicSelector = new DefaultTopicSelector(xyzKafkaTopic.RESPONSE.name()) ;
KafkaBolt kafkaRespBolt = new KafkaBolt()
.withTopicSelector(defTopicSelector)
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) ;
TopologyBuilder topoBuilder = new TopologyBuilder();
topoBuilder.setSpout(xyzComponent.KAFKA_REQUEST_SPOUT.name(), kafkaRequestSpout);
topoBuilder.setSpout(xyzComponent.KAFKA_PENDING_SPOUT.name(), kafkaPendingSpout);
topoBuilder.setBolt(xyzComponent.KAFKA_PENDING_BOLT.name(),
deviceFilterBolt, 1)
.shuffleGrouping(xyzComponent.KAFKA_PENDING_SPOUT.name()) ;
topoBuilder.setBolt(xyzComponent.abc1_DEVICE_BOLT.name(),
abc1DevBolt, 1)
.shuffleGrouping(xyzComponent.KAFKA_PENDING_BOLT.name(),
xyzDevice.abc1.name()) ;
topoBuilder.setBolt(xyzComponent.MONGODB_BOLT.name(),
mongoBolt, 1)
.shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(),
xyzStreamID.KAFKARESP.name());
topoBuilder.setBolt(xyzComponent.KAFKA_RESPONSE_BOLT.name(),
kafkaRespBolt, 1)
.shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(),
xyzStreamID.KAFKARESP.name());
Config config = new Config() ;
config.setDebug(true);
config.setNumWorkers(1);
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
LocalCluster cluster = new LocalCluster();
try{
cluster.submitTopology("demo", config, topoBuilder.createTopology());
}
在上面的代码中,KAFKA_RESPONSE_BOLT
正在将数据写入主题。
通过发出如下数据来提供此数据abc1_DEVICE_BOLT
:KAFKA_RESPONSE_BOLT
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
Fields respFields = IoTFields.getKafkaResponseFieldsRTEXY();
ofd.declareStream(IoTStreamID.KAFKARESP.name(), respFields);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
List<Object> newTuple = new ArrayList<Object>() ;
String params = tuple.getStringByField("params") ;
newTuple.add(3, params);
----
collector.emit(IoTStreamID.KAFKARESP.name(), newTuple);
}