I am currently reading from a kafka topic, processing the messages and writing them to another topic. This processing and producing logic is inside the test_saprk function. A code sample can be found below:
df_file = (
spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
os.environ.get("KAFKA_CONSUMER_BOOTSTRAP_SERVERS"),
)
.option(
"subscribe", 'TopicName'
)
.option(
"kafka.group.id", os.environ.get("KAFKA_CONSUMER_GROUP_ID")
)
.option(
"kafka.security.protocol",
os.environ.get("KAFKA_SASL_SECURITY_PROTOCOL"),
)
.option(
"kafka.sasl.mechanism",
os.environ.get("KAFKA_SASL_MECHANISM"),
)
.option(
"kafka.sasl.jaas.config",
f"""org.apache.kafka.common.security.scram.ScramLoginModule required username='{os.environ.get('KAFKA_SASL_USERNAME')}' password='{os.environ.get('KAFKA_SASL_PASSWORD')}';""",
)
.option("checkpointLocation","s3a://checkpoint_location")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
)
df_file.writeStream.trigger(processingTime="2 seconds")\
.foreachBatch(test_spark)\
.option("checkpointLocation", "s3a://checkpoint_location")\
.start().awaitTermination()
The problem is when I restart the service while it's processing the messages. What I wanted was for spark to pick up from the last read/processed message. Instead it processes them all from the beginning, so I end up with duplicated messages (if at the time of the service restart I had Y messages processed, I end up with TotalMessages+Y).
Shouldn't the checkpoints prevent this? Any ideas on how to fix it? Thanks
UPDATE: I think i have realised that it's only committing offsets for each batch. Sometimes I have only one batch, so that's why it's duplicating the messages. But this doesn't really apply for my use case, as I want to avoid duplicated messages as much as possible, so using this batch logic is not the best. But using a foreach would do it force me to work row by row, not being able to make use of the foreachPartition method that I use inside of the foreachBatch. Any ideas?