Java 7, Glassfish 3.1.2
Input are messages like:
public class Message {
private final String contextId;
private final String name;
...
}
This messages should be processed by a worker. For messages with new contextId a new thread should be started. For contextId that already exists use a already existing thread. The already existing thread should process the message with the same contextId sequential.
Hier my last, not working, Worker version.
@Stateless
@LocalBean
public class Worker {
private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>();
@EJB
private Worker worker;
@Asynchronous
public void work(Message message) {
System.out.println(Thread.currentThread().getName() + ": A message: " + message.toString()+ " should be processed");
Future<Result> sameContext = MAP.get(message.getContextId());
if (sameContext != null) {
waitForSameContextId(message, sameContext);
}
MAP.put(message.getContextId(), worker.doWork(message));
}
@Asynchronous
public Future<Result> doWork(Message message) {
System.out.println(Thread.currentThread().getName() + ": Processing the message: " + message.toString());
AsyncResult<Result> asyncResult = new AsyncResult<>(new Result());
try {
Thread.sleep(15000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
MAP.remove(message.getContextId()); //We are done removing
System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed");
return asyncResult;
}
private void waitForSameContextId(Message message, Future<Result> result) {
try {
System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString()
+ " is already in work, blocking Thread until it is finished");
Result get = result.get(); //blocks thread
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
// Do some failure management
}
}
Test class:
public class MessageReceiver {
private static String ID = "#########";
@EJB
private Worker worker;
public void receive(Message message) {
worker.work(message);
}
@PostConstruct
void init() {
receive(new Message(ID, "message 1"));
receive(new Message(ID, "message 2"));
receive(new Message(ID, "message 3"));
...
}