onErrorResume()
优于onErrorContinue()
.
那么问题是你不能在那里提交偏移量,因为此时接收器不再处于活动状态。
这对我有用...
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?, ?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
编辑
这是完整的应用程序...
@SpringBootApplication
public class So67373188Application {
private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(So67373188Application.class, args);
Thread.sleep(120_000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner2() {
return args -> {
SenderOptions<String, String> so = SenderOptions.create(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
KafkaSender<String, String> sender = KafkaSender.create(so);
Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
.subscribe(result -> {
System.out.println(result.recordMetadata());
});
Thread.sleep(5000);
subscribed.dispose();
};
}
@Bean
public ApplicationRunner runner3(KafkaOperations<String, String> template) {
return args -> {
DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("so67373188"));
consumeRecords(ro);
};
}
private SenderRecord<String, String, String> pr(String value) {
return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
}
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?, ?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
if (record.equals("fail")) {
throw new RuntimeException("Throwing exception!");
}
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
结果:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m [2m (v2.4.5)[0;39m
so67373188-0@16
so67373188-0@17
so67373188-0@18
so67373188-0@19
foo@16
bar@17
fail@18
fail@18
fail@18
fail@18
fail@18
fail@18
Retries exhausted for fail@18
fail@18
Committing failed record offset fail@18
baz@19