0

I am currently reading from a kafka topic, processing the messages and writing them to another topic. This processing and producing logic is inside the test_saprk function. A code sample can be found below:

df_file = (
            spark.readStream.format("kafka")
            .option(
                "kafka.bootstrap.servers",
                os.environ.get("KAFKA_CONSUMER_BOOTSTRAP_SERVERS"),
            )
            .option(
                "subscribe", 'TopicName'
            )
            .option(
                "kafka.group.id", os.environ.get("KAFKA_CONSUMER_GROUP_ID")
            )
            .option(
                "kafka.security.protocol",
                os.environ.get("KAFKA_SASL_SECURITY_PROTOCOL"),
            )
            .option(
                "kafka.sasl.mechanism",
                os.environ.get("KAFKA_SASL_MECHANISM"),
            )
            .option(
                "kafka.sasl.jaas.config",
                f"""org.apache.kafka.common.security.scram.ScramLoginModule required username='{os.environ.get('KAFKA_SASL_USERNAME')}' password='{os.environ.get('KAFKA_SASL_PASSWORD')}';""",
            )
            .option("checkpointLocation","s3a://checkpoint_location")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", "false")
            .load()
            .selectExpr("CAST(value AS STRING)")
        )
        df_file.writeStream.trigger(processingTime="2 seconds")\
            .foreachBatch(test_spark)\
                .option("checkpointLocation", "s3a://checkpoint_location")\
                .start().awaitTermination()

The problem is when I restart the service while it's processing the messages. What I wanted was for spark to pick up from the last read/processed message. Instead it processes them all from the beginning, so I end up with duplicated messages (if at the time of the service restart I had Y messages processed, I end up with TotalMessages+Y).

Shouldn't the checkpoints prevent this? Any ideas on how to fix it? Thanks

UPDATE: I think i have realised that it's only committing offsets for each batch. Sometimes I have only one batch, so that's why it's duplicating the messages. But this doesn't really apply for my use case, as I want to avoid duplicated messages as much as possible, so using this batch logic is not the best. But using a foreach would do it force me to work row by row, not being able to make use of the foreachPartition method that I use inside of the foreachBatch. Any ideas?

4

0 回答 0