I've a simple Listener pattern implementation which (in a condensed form) looks like this:
public class EmailProducer implements Runnable {
private List<Consumer<EmailMessage>> listeners = new ArrayList<>();
private IdleManager idleManager;
public void registerListener(Consumer<EmailMessage> listener) {
listeners.add(listener);
}
public void run() {
Session session = Session.getDefaultInstance(new Properties());
idleManager = new IdleManager(session, Executors.newCachedThreadPool());
// use IMAP IDLE to listen for new incoming emails
Folder inbox = openInbox();
inbox.addMessageCountListener(new MessageCountAdapter() {
public void messagesAdded(MessageCountEvent ev) {
Message[] msgs = ev.getMessages();
for (Message msg : msgs) {
EmailMessage info = EmailMessage.from(msg);
for (Consumer<EmailMessage> listener : listeners) {
listener.accept(info);
}
}
}
});
try {
idleManager.watch(inbox);
catch (FolderClosedException|StoreClosedException ex) {
run(); // restart, need to re-establish connection
}
}
}
So in essence this grabs email from my inbox, extracts information from each email and converts that into an internal message format.
A typical Consumer
would then persist that message, another would display the information on a HTML page, and yet another might trigger 3rd party systems to do some action. All of this works fine.
Now, I'm trying to get some more resiliency in face of a Consumer
failure. For example, I want my EmailMessage
to live on if the persistence consumer failed (threw an exception), ending processing. In that case, I want processing to be retried after a pause interval. Right now, if a Consumer fails, the message would be lost.
I recently found out about Reactor, and I think it's orchestration would save the message and enable me to do what I need. However, I fail to see how I would adapt my current Runnable
into it's model. From the documentation it seems like I need a RingBufferProcessor
to hold my messages. I can't find another way then to pass the processor to my Runnable
for it call the onNext()
method instead of the separate listener's accept
method(s). Am I missing something or is this the way it's supposed to work?
(Extra cookies for showing me how I retry in the face of consumer failure)