我通过返回 pyspark 2.2 库解决了这个问题,因为它具有获取 offsetRanges 并在 redis 上存储偏移量的 API。我不得不回到 python 2.7,因为 python 3.6 中没有“长期”支持。
import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD
def get_offset_ranges(topic):
ranges = None
rk = '{topic}:offsets'.format(topic=topic)
cache = redis.Redis()
if cache.exists(rk):
mapping = cache.hgetall(rk)
ranges = dict()
for k, v in mapping.items():
tp = TopicAndPartition(topic, int(k))
ranges[tp] = long(v)
return ranges
def update_offset_ranges(offset_ranges):
cache = redis.Redis()
for rng in offset_ranges:
rk = '{rng.topic}:offsets'.format(rng=rng)
print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
cache.hset(rk, rng.partition, rng.untilOffset)
def do_some_work(rdd):
pass
def process_dstream(rdd):
rdd.foreachPartition(lambda iter: do_some_work(iter))
krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
off_ranges = krdd.offsetRanges()
for o in off_ranges:
print(str(o))
update_offset_ranges(off_ranges)
sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)
kafka_params = {
"bootstrap.servers": "localhost:9092",
"group.id": "myUserGroup",
"enable.auto.commit": "false",
"auto.offset.reset": "smallest"
}
topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()
# Wait for the job to finish
try:
ssc.awaitTermination()
except Exception as e:
ssc.stop()
raise e # to exit with error condition