0

I've a simple Listener pattern implementation which (in a condensed form) looks like this:

public class EmailProducer implements Runnable {
    private List<Consumer<EmailMessage>> listeners = new ArrayList<>();
    private IdleManager idleManager;

    public void registerListener(Consumer<EmailMessage> listener) {
        listeners.add(listener);
    }

    public void run() {
        Session session = Session.getDefaultInstance(new Properties());
        idleManager = new IdleManager(session, Executors.newCachedThreadPool());
        // use IMAP IDLE to listen for new incoming emails
        Folder inbox = openInbox();
        inbox.addMessageCountListener(new MessageCountAdapter() {
            public void messagesAdded(MessageCountEvent ev) {
                Message[] msgs = ev.getMessages();
                for (Message msg : msgs) {
                    EmailMessage info = EmailMessage.from(msg);
                    for (Consumer<EmailMessage> listener : listeners) {
                        listener.accept(info);
                    }
                }
            }
        });
        try {
            idleManager.watch(inbox);
        catch (FolderClosedException|StoreClosedException ex) {
            run(); // restart, need to re-establish connection
        }
    }
}

So in essence this grabs email from my inbox, extracts information from each email and converts that into an internal message format.

A typical Consumer would then persist that message, another would display the information on a HTML page, and yet another might trigger 3rd party systems to do some action. All of this works fine.

Now, I'm trying to get some more resiliency in face of a Consumer failure. For example, I want my EmailMessage to live on if the persistence consumer failed (threw an exception), ending processing. In that case, I want processing to be retried after a pause interval. Right now, if a Consumer fails, the message would be lost.

I recently found out about Reactor, and I think it's orchestration would save the message and enable me to do what I need. However, I fail to see how I would adapt my current Runnable into it's model. From the documentation it seems like I need a RingBufferProcessor to hold my messages. I can't find another way then to pass the processor to my Runnable for it call the onNext() method instead of the separate listener's accept method(s). Am I missing something or is this the way it's supposed to work?

(Extra cookies for showing me how I retry in the face of consumer failure)

4

1 回答 1

1

You might take a look at the RingBufferProcessor which will allows you to do parallel tasks with a single EmailMessage. You'd set it up something like:

ReactorProcessor<Message, Message> processor = RingBufferProcessor.share("email-processor", 1024);
Stream<Message> s = Streams.wrap(processor);

s.consume(m -> firstTask(m));
s.consume(m -> secondTask(m));
s.consume(m -> thirdTask(m));

This will parallelize the work onto 3 separate threads (1 per Consumer). To do retries or capture errors, you'd need either Stream.retry or Stream.retryWhen (for backoff) or just Stream.when (which doesn't give you the message that caused the error).

s.retry(t -> t instanceof IllegalStateException)
 .consume(m -> retryMessage(m));

s.when(NumberFormatException.class, e -> e.printStackTrace());

Then to publish messages to this Processor, just call onNext:

for(EmailMessage msg : msgs) {
    processor.onNext(msg);
}
于 2015-06-08T18:35:08.783 回答