我有一份将发件箱表(在 mysql DB 中)中的事件发布到 kafka 的工作。我正在使用弹簧卡夫卡。我有从发件箱表中读取记录、创建执行程序线程并将发件箱表中的行在多个线程之间划分并发布事件的代码。我想发送两种类型的事件,一种是 avro,另一种是 json,如果两种发送都成功,我想从表中删除记录。我希望这段代码以异步方式运行。
获取事件、在线程之间划分和运行线程的代码
final List<OutboxEntity> allEvents = getAllEvents();
List<EventProducerTask> tasks = new ArrayList<>();
HashSet<UUID> seen = new HashSet<>();
allEvents.removeIf(e->!seen.add(e.getObjectId()));
// We will create the partition from the events list
// and create thread for each events partition to publish
// events in azure eventhub in parallel.
if (allEvents.size() > 0) {
int partitionSize = (int) Math.ceil((double)allEvents.size() / this.threadPoolSize);
for (List<OutboxEntity> events : Lists.partition(allEvents, partitionSize)) {
EventProducerTask eventProducerTask = EventProducerTask.builder()
.allEvents(events)
.outboxRepo(this.outboxRepo)
.localUserService(this.localUserService)
.jsonKafkaTemplate(this.jsonKafkaTemplate)
.eventHubName(this.eventHubName)
.kafkaTemplate(this.kafkaTemplate)
.build();
tasks.add(eventProducerTask);
}
// invokeAll method called on executorService
List<Future<Void>> futures = executorService.invokeAll(tasks);
代码在每个线程上运行,即在kafka中产生两种类型的事件(avro和json)
private void publishEvent() {
final Iterator<OutboxEntity> uniqueEventsIterator =
allEvents.iterator();
final int totalUniqueEvents = allEvents.size();
while (uniqueEventsIterator.hasNext()) {
// iterating over unique events
final OutboxEntity outboxEntity = uniqueEventsIterator.next();
try {
// only send user related events
if (StringUtils.equals(outboxEntity.getActionType(),
UserUpsertedActionEnum.UserUpserted.toString())
|| StringUtils.equals(outboxEntity.getActionType(),
UserDeletedActionEnum.UserDeleted.toString())) {
// at some point we need to retire this avro events
final ProducerEventDto avroProducerEventDto =
localUserService.buildUserChangedSerializedEvent(outboxEntity.getObjectId());
ListenableFuture<SendResult<String, byte[]>> avroFuture =
this.sendAvroEvent(outboxEntity,
() -> avroProducerEventDto,
kafkaTemplate);
// Json event
final AuthnEvent userEvent =
localUserService.buildUserAuthnEvent(outboxEntity.getObjectId());
ListenableFuture<SendResult<String, Object>> jsonFuture =
this.sendJsonEvent(outboxEntity,
() -> userEvent,
jsonKafkaTemplate);
log.info(format("Published Event Successfully for Object Id: %s",
outboxEntity.getObjectId()));
// remove event after successful sending. After exiting the loop, uniqueEventsMap will
// contain only non-successful(failed) events.
uniqueEventsIterator.remove();
}
} catch (Exception ex) {
log.error(String.format("Error in publishing event...[Object Id: %s]",
outboxEntity.getObjectId()), ex);
}
}
}
@Async
public ListenableFuture<SendResult<String, byte[]>> sendAvroEvent(
final OutboxEntity outboxEntity,
final Supplier<ProducerEventDto> supplier,
final KafkaTemplate<String,
byte[]> kafkaTemplate) {
final List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader(HEADER_KEY, supplier.get().eventType.getBytes()));
final ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(eventHubName, null, outboxEntity.getObjectId().toString(),
supplier.get().data, headers);
ListenableFuture<SendResult<String, byte[]>> future =
kafkaTemplate.send(producerRecord);
future.addCallback(new ListenableFutureCallback<SendResult<String, byte[]>>() {
@Override
public void onSuccess(final SendResult<String, byte[]> message) {
if (outboxEntity.getCount() == 0) {
outboxEntity.setCount(1);
outboxRepo.saveAndFlush(outboxEntity);
} else {
outboxRepo.delete(outboxEntity);
}
}
@Override
public void onFailure(final Throwable throwable) {
}
});
return future;
}
@Async
public ListenableFuture<SendResult<String, Object>> sendJsonEvent(
final OutboxEntity outboxEntity,
final Supplier<AuthnEvent> supplier,
final KafkaTemplate<String,
Object> kafkaTemplate) {
final ProducerRecord<String, Object> jsonRecord = new ProducerRecord<>(eventHubName,
null, outboxEntity.getObjectId().toString(), supplier.get(),
Collections.singletonList(new RecordHeader(HEADER_KEY,
supplier.get().getAction().name().getBytes())));
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(jsonRecord);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(final SendResult<String, Object> message) {
if (outboxEntity.getCount() == 0) {
outboxEntity.setCount(1);
outboxRepo.saveAndFlush(outboxEntity);
} else {
outboxRepo.delete(outboxEntity);
}
}
@Override
public void onFailure(final Throwable throwable) {
}
});
return future;
}
@Transactional
@Override
public Void call() {
try {
this.publishEvent();
} catch (Exception exp) {
log.error("Error in publishing events in parallel...", exp);
}
return null;
}
现在你可以看到我以异步方式发布事件,在发布任何一个(avro 或 json)时,我将发件箱实体中的计数增加到 1,如果它已经是 1,我将记录删除为已经是一种类型的事件,即。 e avro 或 json 已发布,这是第二个事件。所以通过这种方式,我可以确保我已经发布了这两个事件。现在我不确定这将如何在多线程环境中工作。这是正确的方法吗?