1

Java 7, Glassfish 3.1.2

Input are messages like:

public class Message {

    private final String contextId;
    private final String name;
    ...
}

This messages should be processed by a worker. For messages with new contextId a new thread should be started. For contextId that already exists use a already existing thread. The already existing thread should process the message with the same contextId sequential.

Hier my last, not working, Worker version.

@Stateless
@LocalBean
public class Worker {

private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>();
@EJB
private Worker worker;

@Asynchronous
public void work(Message message) {
    System.out.println(Thread.currentThread().getName() + ": A  message: " + message.toString()+ " should be processed");
    Future<Result> sameContext = MAP.get(message.getContextId());
    if (sameContext != null) {
        waitForSameContextId(message, sameContext);
    }
    MAP.put(message.getContextId(), worker.doWork(message));
}

@Asynchronous
public Future<Result> doWork(Message message) {
    System.out.println(Thread.currentThread().getName() + ": Processing the message:  " + message.toString());

    AsyncResult<Result> asyncResult = new AsyncResult<>(new Result());
    try {
        Thread.sleep(15000);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }

    MAP.remove(message.getContextId());  //We are done removing 
    System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed");
    return asyncResult;
}

private void waitForSameContextId(Message message, Future<Result> result) {
    try {
        System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString()
                + " is already in work, blocking Thread until it is finished");
        Result get = result.get(); //blocks thread
    } catch (InterruptedException | ExecutionException ex) {
        ex.printStackTrace();
        // Do some failure management
    }
}

Test class:

public class MessageReceiver {

private static String ID = "#########";

@EJB
private Worker worker;

public void receive(Message message) {

    worker.work(message);
}

@PostConstruct
void init() {

    receive(new Message(ID, "message 1"));
    receive(new Message(ID, "message 2"));
    receive(new Message(ID, "message 3"));
 ...
 }
4

1 回答 1

0

我认为您可以使用 CDI @Observes 模式调用 @Asynchronous 服务。如需深入解释,请查看此链接http://www.devchronicles.com/2011/12/javaee-revisits-design-patterns_28.html

于 2013-04-28T21:21:57.197 回答