1

我是三叉戟的新手。我正在编写一个从 kafka 读取数据的三叉戟拓扑。主题名称是“测试”。我有本地卡夫卡设置。我在本地启动了 zookeeper,kafka。并在 kafka 中创建了一个主题“测试”并打开了生产者并输入了消息“Hello Kafka!”。

我想使用 trident 从“测试”主题中读取消息“Hello Kafka”。

下面是我的代码。我得到空元组。

    TridentTopology topology = new TridentTopology();
    BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.forceFromStart = false;
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
      .each(new Fields(), new TestFilter()).parallelismHint(1)
      .each(new Fields(), new Utils.PrintFilter());

这是我的 TestFilter 类代码

public TestFilter()
{
    //
}

@Override
public boolean isKeep(TridentTuple tuple) {
    boolean isKeep=true;
    System.out.println("TestFilter is called...");
    if (tuple != null && tuple.getValues().size()>0) {
        System.out.println("data from kafka ::: "+tuple.getValues());
    } 
    return isKeep;
}

每当我在 kafka 生产者中向“测试”主题键入消息时,首先打印 sysout,但它没有通过 if 循环。我只是收到消息'TestFilter 被调用...'仅此而已。

我想将我生成的实际数据用于“测试”主题。如何?

4

1 回答 1

1

问题在于 Stream.each 的参数。该方法的 javadoc 的相关部分是:

each(Fields inputFields, Filter filter)

文档不太清楚,但语义是您应该使用inputFields参数指定过滤器使用的所有字段。

Storm 然后将在输入元组上应用投影并将其转发给过滤器

鉴于您没有指定任何输入字段,投影会导致一个空元组,从而导致tuple.getValues().size()>0过滤器内的条件失败。

值得一提的是每个的其他变体:

each(Fields inputFields, Function function, Fields functionFields)
each(Function function, Fields functionFields)

这些将在输入元组的投影上应用提供的函数,将结果元组附加到原始输入元组,将新字段重命名为functionFields(即投影仅用于应用函数)。

特别是第二个版本相当于调用 each 并将inputFields设置为 null (或new Fields()),并将导致一个空元组被传递给function

于 2015-10-24T10:49:20.423 回答