我构建了一个 topo 以通过 kafka 获取消息,然后 grep 一些关键字,如果合适,写入本地文件。
我使用OpaqueTridentKafkaSpout
storm-kafka来保证元组不会丢失或重复,但是考虑一种情况:将消息写入本地文件时,发生一些错误(例如,空间不足)。此时,有些消息已写入本地文件,有些则没有,如果 spout 重新发送消息,则消息将被写入两次。
如何处理?
我构建了一个 topo 以通过 kafka 获取消息,然后 grep 一些关键字,如果合适,写入本地文件。
我使用OpaqueTridentKafkaSpout
storm-kafka来保证元组不会丢失或重复,但是考虑一种情况:将消息写入本地文件时,发生一些错误(例如,空间不足)。此时,有些消息已写入本地文件,有些则没有,如果 spout 重新发送消息,则消息将被写入两次。
如何处理?
这很简单。写入文件的代码需要执行以下操作:
1) 确认元组 - 仅当写入文件成功时。2) 使元组失败 - 如果写入文件不成功。
对于所有已确认的元组,Kafka spout 不会重新发送它们。失败的元组将由 spout 重置。
您必须为此设计锚定策略。我建议您可以从 kafkaspoutconfig 减少批量大小并将您选择的消息存储在列表中。当批量处理完所有消息后,您可以将列表内容写入本地文件。
如您所知,Trident 批量处理流,如果您的系统在处理流中的任何元组时抛出任何错误,所有批处理都将被丢弃。
在您的情况下,您可以尝试捕获负责写入本地文件的代码块,并且在 catch 块中您必须抛出backtype.storm.topology.ReportedFailedException。通过这种方式,您可以确保只有一个语义。
此外,您必须使用事务性 kafka spout 来确保完全符合一种语义。