1

Apache Pulsar 函数无法将计数器值读取为context.getCounter().

使用本地机器、单一功能进行测试,并从 CLI 运行测试器生产者和消费者。函数正在读取订阅并打印它的函数名称(来自上下文),所以它似乎只是在访问计数器值时遇到了问题。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class App implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        System.out.println(context.getFunctionName()); //this works


        context.incrCounter("ABC",1);

        Long n = context.getCounter("ABC");

        System.out.println(context.getCounter("ABC")); //this doesn't

        return n.toString(); //consumer on the topic doesn't get any messages
    }
}

(CLI 命令都在同一个 AWS ec2 上)

bin/pulsar-admin functions localrun \
  --jar /home/ubuntu/population_watch/0_ingestion/pulsar-functions-0.0.1-SNAPSHOT.jar \
  --className xyz.datapipeline.pulsar_functions.App \
  --inputs persistent://public/default/testinput1 \
  --output persistent://public/default/testoutput1 \
  --name myTestFunction

(消费者)

bin/pulsar-client consume -n 10 -s “test” persistent://public/default/testoutput1

(制片人)

bin/pulsar-client produce -m “Hello” persistent://public/default/testinput1

每次发送消息时,函数应将键“ABC”的计数器增加 1,随后getCounter()应返回该值(即 1... 然后 2.... 然后 3... 生产者发送的每条消息)。

编辑:终止我的 AWS 实例并重新创建我的集群解决了这个问题,所以它似乎与配置相关。

4

0 回答 0