我只是在尝试这里提到的 kafka-storm spout https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。
BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
ImmutableList.of("localhost"), 1);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, // list of Kafka
"test", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to
"discovery"); // an id for this consumer for storing the
// consumer offsets in Zookeeper
spoutConfig.scheme = new StringScheme();
spoutConfig.stateUpdateIntervalMs = 1000;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TridentTopology topology = new TridentTopology();
InetSocketAddress inetSocketAddress = new InetSocketAddress(
"localhost", 6379);
TridentState wordsCount = topology
.newStream(SPOUT_FIRST, kafkaSpout)
.parallelismHint(1)
.each(new Fields("str"), new TestSplit(), new Fields("words"))
.groupBy(new Fields("words"))
.persistentAggregate(
RedisState.transactional(inetSocketAddress),
new Count(), new Fields("counts")).parallelismHint(100);
Config conf = new Config();
conf.setMaxTaskParallelism(200);
// conf.setDebug( true );
// conf.setMaxSpoutPending(20);
// This topology can only be run as local because it is a toy example
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("symbolCounter", conf, topology.build());
但是上面的 spout 从 Kafka 主题获取消息的速度大约是 7000 条/秒,但我预计每秒负载大约 50000 条消息。我尝试了各种增加 spoutConfig 中提取缓冲区大小的选项,但没有可见的结果。
有没有人遇到过类似的问题,他无法以生产者生成消息的速度通过风暴获取 kafka 主题?