我正在尝试使用此处和此处所述的检查点工具来回退我的 samza 作业之一的输入提要。由于某种原因,检查点工具不会像承诺的那样输出偏移量,但是我知道该作业已经消耗了来自相关分区的多条消息。
这是检查点工具给我的输出的截断版本:
2015-06-11 16:31:04 ZkClient [INFO] zookeeper state changed (SyncConnected)
2015-06-11 16:31:04 ZkEventThread [INFO] Terminate ZkClient event thread.
2015-06-11 16:31:04 ZooKeeper [INFO] Session: 0x14de25b502e01b4 closed
2015-06-11 16:31:04 ClientCnxn [INFO] EventThread shut down
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1 already exists.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Validating checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Successfully validated checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Reading checkpoint for taskName Partition 0
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] No TaskName to checkpoint mapping provided. Reading for first time.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Connecting to leader pavels-mbp.it.local:9092 for topic __samza_checkpoint_ver_1_for_test-job1_1 and to fetch all checkpoint messages.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Got offset 0 for topic __samza_checkpoint_ver_1_for_test-job1_1 and partition 0. Attempting to fetch messages for checkpoint log.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Get latest offset 31 for topic __samza_checkpoint_ver_1_for_test-job1_1 and partition 0.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Got checkpoint state for taskName Partition 0: Checkpoint [offsets={}]
这是我的test_job.properties
文件:
# Job
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.name=test-job1
# Task
task.class=com.xim.test.TestTaskClass
task.inputs=kafka.EnergyPurchaseEvent
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1
如您所见,检查点已启用。