我将尝试描述我面临的问题并用一些代码示例来支持它。我正在使用如下声明的 LinkedBlockingQueue:
private BlockingQueue<OutgoingMessage> outgoingMessageQueue = new LinkedBlockingQueue<OutgoingMessage>();
OutgoingMessage 表示要发送给客户的文本消息。它通过准备它的 Web 服务调用进入队列,将其保存到数据库并将其放入队列。应用程序部署在 tomcat 上,因此该线程是来自 HTTP 池的线程。
我创建了不同的线程来处理队列并进行实际的消息发送。它无限期地运行并在队列上调用 take() 方法,如下所示:
public void processOutgoingMessageQueue() {
try {
OutgoingMessage outgoingMessage = outgoingMessageQueue.take();
logger.debug(MessageFormat.format("Took outgoing message with id = [{0}] from queue", outgoingMessage.getId()));
this.forwardToCommunicator(outgoingMessage);
}
catch (InterruptedException e) {
logger.fatal("Interupted while waiting to process outgoing messages ", e); }
catch (Exception e) {
exceptionHandler.handle(e);
}
}
从线程调用方法 processOutgoingMessageQueue()。
这是一种功能,将消息放入队列并稍后定期发送,但客户端(调用 Web 服务方法的一侧)不会在传出消息放入队列后立即获得响应,而是在线程从队列中取出它完成它的处理。看起来tomcat的HTTP池中的线程正在等待其他线程完成消息处理,然后将Web服务响应返回给客户端。这会导致糟糕的用户体验,因为用户必须等待整个过程完成才能将另一条消息排入队列。
这是一个显示消息已成功放入队列的日志:
[DEBUG] 2012-07-08 23:09:51,707 [http-8080-8] SimpleServiceCommunicatorImpl: Received sendMessage request from serviceId = [3], charginId = [3], text [some text]
[DEBUG] 2012-07-08 23:09:51,721 [http-8080-8] SimpleServiceCommunicatorImpl: Request verification succeeded, creating outgoing message.
[INFO ] 2012-07-08 23:09:51,738 [http-8080-8] SimpleMessageCreatorImpl: Created outgoing message with id = [1,366] and payment invoice with id = [1,323]
[INFO ] 2012-07-08 23:09:51,738 [http-8080-8] Core: Enqueued outgoing message with id = [1,366]
这是显示正在执行发送消息请求的客户端日志:
DEBUG 2012-07-08 23:09:51,702 CoreAdapter: Sending message with serviceId = [3], chargingId = [3], text = [some text]
INFO 2012-07-08 23:10:06,477 SimpleMessageSenderImpl: Created answer with core id = [1,366]
INFO 2012-07-08 23:10:06,477 SMSChatServiceImpl: Response message with result = 1366 sent to the customer
它表明在传出消息大约 15 秒后返回的请求被放入队列中,尽管线程 HTTP 8080-8 没有更多工作要执行。
更新
这是将传出消息放入队列的方法:
public void enqueueOutgoingMessage(OutgoingMessage outgoingMessage) {
try {
outgoingMessageQueue.put(outgoingMessage);
logger.info(MessageFormat.format("Enqueued outgoing message with id = [{0}]", outgoingMessage.getId()));
}
catch (InterruptedException e) {
logger.fatal("Interupted while waiting to enqueue outgoing message. ", e);
}
}
这是处理队列的线程:
// outgoing message queue
this.outgoingMessageQueueProcessingThread = new Thread(new Runnable() {
public void run() {
while (!stopQueueProcessing) {
Core.this.processOutgoingMessageQueue();
}
}
});
this.outgoingMessageQueueProcessingThread.start();
stopQueueProcessing 是一个在应用关闭时设置为 false 的标志。
这可能是已知的 tomcat 或 LinkedBlockingQueue 问题吗?
有人有想法吗?需要更多细节,我很乐意提供。