2

我正在将一个用 scala 编写的流应用程序移植到 python。我想手动提交 DStream 的偏移量。这是在 scala 中完成的,如下所示:

stream = KafkaUtils.createDirectStream(soomeConfigs)
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

但我无法在 python 中找到类似的 API。您能否指导我如何使用 python 客户端手动提交偏移量。

4

1 回答 1

2

我通过返回 pyspark 2.2 库解决了这个问题,因为它具有获取 offsetRanges 并在 redis 上存储偏移量的 API。我不得不回到 python 2.7,因为 python 3.6 中没有“长期”支持。

import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD


def get_offset_ranges(topic):
    ranges = None

    rk = '{topic}:offsets'.format(topic=topic)
    cache = redis.Redis()
    if cache.exists(rk):
        mapping = cache.hgetall(rk)
        ranges = dict()
        for k, v in mapping.items():
            tp = TopicAndPartition(topic, int(k))
            ranges[tp] = long(v)

    return ranges


def update_offset_ranges(offset_ranges):
    cache = redis.Redis()
    for rng in offset_ranges:
        rk = '{rng.topic}:offsets'.format(rng=rng)
        print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
        cache.hset(rk, rng.partition, rng.untilOffset)


def do_some_work(rdd):
    pass


def process_dstream(rdd):
    rdd.foreachPartition(lambda iter: do_some_work(iter))

    krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
    off_ranges = krdd.offsetRanges()
    for o in off_ranges:
        print(str(o))
    update_offset_ranges(off_ranges)


sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)

kafka_params = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "myUserGroup",
    "enable.auto.commit": "false",
    "auto.offset.reset": "smallest"
}

topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()

# Wait for the job to finish
try:
    ssc.awaitTermination()
except Exception as e:
    ssc.stop()
    raise e  # to exit with error condition
于 2019-01-07T14:25:06.363 回答