我用 3 个分区和 3 个复制创建了一个 Kafka 主题
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
并编写了 Kafka-Producer Program 从 API 获取数据,它工作正常。我的数据成功存储在 3 个分区和 3 个复制中。
现在我必须处理分区 Kafka 日志中的数据。我有一个风暴拓扑。我的拓扑如何接受来自日志的数据作为输入。
最初我创建了一个具有 1 个分区、1 个复制的主题,并且我的风暴拓扑工作正常。但是当我将分区保持为 3 并且复制为 3 时,数据不会进入风暴拓扑。如何解决它。
我的拓扑代码
public static void main(String[] args) throws Exception{
Map map = Maps.newHashMap();
map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
map.put("dataSource.url","jdbc:postgresql://localhost:5432/test?user=postgres");
ConnectionProvider cp = new MyConnectionProvider(map);
String argument = args[0];
Config conf = new Config();
conf.put(JDBC_CONF, map);
conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 3);
//set the number of workers
conf.setNumWorkers(3);
TopologyBuilder builder = new TopologyBuilder();
//Setup Kafka spout
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "test";
String zkRoot = "";
String consumerGroupId = "group1";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
spoutConfig.scheme = new RawMultiScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new Search_Parser());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("KafkaSpout", kafkaSpout);
帮我解决这个问题
提前致谢