0

应用程序正在从一个 Kafka 主题读取消息,在存储在 MongoDB 中并进行一些验证之后,它正在写入另一个主题。在这里,我面临着应用程序进入无限循环的问题。我的代码如下:

Hosts zkHosts = new ZkHosts("localhost:2181");
String zkRoot = "/brokers/topics" ;
String clientRequestID = "reqtest";
String clientPendingID = "pendtest";
SpoutConfig kafkaRequestConfig = new SpoutConfig(zkHosts,"reqtest",zkRoot,clientRequestID);
SpoutConfig kafkaPendingConfig = new SpoutConfig(zkHosts,"pendtest",zkRoot,clientPendingID);

kafkaRequestConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaPendingConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaRequestSpout = new KafkaSpout(kafkaRequestConfig);
KafkaSpout kafkaPendingSpout = new KafkaSpout(kafkaPendingConfig);

MongoBolt mongoBolt = new MongoBolt() ;
DeviceFilterBolt deviceFilterBolt = new DeviceFilterBolt() ;
KafkaRequestBolt kafkaReqBolt = new KafkaRequestBolt() ;
abc1DeviceBolt abc1DevBolt = new abc1DeviceBolt() ;
DefaultTopicSelector defTopicSelector = new DefaultTopicSelector(xyzKafkaTopic.RESPONSE.name()) ;
KafkaBolt kafkaRespBolt = new KafkaBolt()
    .withTopicSelector(defTopicSelector)
    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) ;

TopologyBuilder topoBuilder = new TopologyBuilder();
topoBuilder.setSpout(xyzComponent.KAFKA_REQUEST_SPOUT.name(), kafkaRequestSpout);
topoBuilder.setSpout(xyzComponent.KAFKA_PENDING_SPOUT.name(), kafkaPendingSpout);
topoBuilder.setBolt(xyzComponent.KAFKA_PENDING_BOLT.name(),
    deviceFilterBolt, 1)
    .shuffleGrouping(xyzComponent.KAFKA_PENDING_SPOUT.name()) ;
topoBuilder.setBolt(xyzComponent.abc1_DEVICE_BOLT.name(),
    abc1DevBolt, 1)
    .shuffleGrouping(xyzComponent.KAFKA_PENDING_BOLT.name(),
        xyzDevice.abc1.name()) ;
topoBuilder.setBolt(xyzComponent.MONGODB_BOLT.name(), 
    mongoBolt, 1)
    .shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(),
        xyzStreamID.KAFKARESP.name());
topoBuilder.setBolt(xyzComponent.KAFKA_RESPONSE_BOLT.name(),
    kafkaRespBolt, 1)
    .shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(),
        xyzStreamID.KAFKARESP.name());

Config config = new Config() ;
config.setDebug(true);
config.setNumWorkers(1);

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

LocalCluster cluster = new LocalCluster();
try{
    cluster.submitTopology("demo", config, topoBuilder.createTopology());
}

在上面的代码中,KAFKA_RESPONSE_BOLT正在将数据写入主题。 通过发出如下数据来提供此数据abc1_DEVICE_BOLTKAFKA_RESPONSE_BOLT

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
    Fields respFields = IoTFields.getKafkaResponseFieldsRTEXY();
    ofd.declareStream(IoTStreamID.KAFKARESP.name(), respFields);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    List<Object> newTuple = new ArrayList<Object>() ;
    String params  = tuple.getStringByField("params") ;
    newTuple.add(3, params);
    ----
    collector.emit(IoTStreamID.KAFKARESP.name(), newTuple);
}
4

1 回答 1

1

同一个问题困扰我很久了,答案很简单……你不会相信的。

据我了解,KafkaBolt必须接收元组的实现具有“消息”的字段名称,无论是Bolt还是Spout。所以你必须对你的代码做一些更改,我没有仔细看到。(但我相信这会有所帮助!)

具体原因在https://mail-archives.apache.org/mod_mbox/incubator-storm-user/201409.mbox/%3C6AF1CAC6-60EA-49D9-8333-0343777B48A7@andrashatvani.com%3E

于 2016-01-21T08:39:07.283 回答