我对 Kafka 和 Samza 很陌生。我尝试了 hello-samza 示例,它正在工作。我正在寻找的是创建一个 samza 任务,该任务从 kafka 主题中读取消息。我添加的任务不会引发任何错误,也不会从主题中读取任何消息。Yarn UI 将任务显示为已接受。不确定我在这里做错了什么。
这是课程
public class MyTask implements StreamTask {
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) throws Exception {
System.out.println(" key - " + incomingMessageEnvelope.getKey() + " | message " + incomingMessageEnvelope.getMessage());
}
}
这是属性文件
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=addresses
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
task.class=samza.examples.wikipedia.task.MyTask
task.inputs=addressestopic
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
# Job Coordinator
job.coordinator.system=kafka
# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model
# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details
job.coordinator.replication.factor=1