我想用kafka和storm做一些性能测试。在我的 WordCount 测试中,“输出”主题中出现了一个顺序数字:
uLK 1
1352
GaE 1
1353
IGF 1
1354
DtJ 2
1355
MVy 1
...
它应该看起来像:
uLK 1
GaE 1
IGF 1
DtJ 2
MVy 1
...
我的风暴拓扑如下所示:
builder.setSpout("kafkaSpoutPerfTest", kafkaSpout, 1);
builder.setBolt("split", new SplitSentence(), paralellismHint).shuffleGrouping("kafkaSpoutPerfTest");
builder.setBolt("count", new WordCount(), paralellismHint).fieldsGrouping("split", new Fields("message"));
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("output"));
builder.setBolt("kafkaBoltOutput", kafkaBolt, 1).shuffleGrouping("count");
我究竟做错了什么?有人有线索吗?谢谢!