Apache Pulsar 函数无法将计数器值读取为context.getCounter()
.
使用本地机器、单一功能进行测试,并从 CLI 运行测试器生产者和消费者。函数正在读取订阅并打印它的函数名称(来自上下文),所以它似乎只是在访问计数器值时遇到了问题。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class App implements Function<String, String> {
@Override
public String process(String input, Context context) {
System.out.println(context.getFunctionName()); //this works
context.incrCounter("ABC",1);
Long n = context.getCounter("ABC");
System.out.println(context.getCounter("ABC")); //this doesn't
return n.toString(); //consumer on the topic doesn't get any messages
}
}
(CLI 命令都在同一个 AWS ec2 上)
bin/pulsar-admin functions localrun \
--jar /home/ubuntu/population_watch/0_ingestion/pulsar-functions-0.0.1-SNAPSHOT.jar \
--className xyz.datapipeline.pulsar_functions.App \
--inputs persistent://public/default/testinput1 \
--output persistent://public/default/testoutput1 \
--name myTestFunction
(消费者)
bin/pulsar-client consume -n 10 -s “test” persistent://public/default/testoutput1
(制片人)
bin/pulsar-client produce -m “Hello” persistent://public/default/testinput1
每次发送消息时,函数应将键“ABC”的计数器增加 1,随后getCounter()
应返回该值(即 1... 然后 2.... 然后 3... 生产者发送的每条消息)。
编辑:终止我的 AWS 实例并重新创建我的集群解决了这个问题,所以它似乎与配置相关。