我对带有本地 C++ CMS 客户端 3.9.3 的 ActiveMQ 5.11 代理有疑问。我修改了官方网站上的示例代码,使用 pthread_create 函数生成一个新线程并尝试从新线程确认消息(CLIENT_ACK 模式)。事实证明存在分段错误。我们如何实现从新生成的线程而不是当前线程返回 ack?ActiveMQ C++ 客户端是否支持多线程确认消息?
void* sendAckThreadFunc(void *pMessage) {
sleep(1);
const Message* message = (const Message*) pMessage;
message->acknowledge();
printf("ACK sent out.");
return NULL;
}
virtual void onMessage(const Message* message) {
static int count = 0;
try {
count++;
const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);
string text = "";
if (textMessage != NULL) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
if (clientAck) {
//message->acknowledge(); --> instead of ack the message in the onMessage function, they use pthread_create to generate a new thread and trying to ack the message from there. Is is a supported way??
pthread_t sendAckThread;
if (pthread_create(&sendAckThread, NULL, sendAckThreadFunc,
(void*) message)) {
printf("Error occured when create threads.");
}
}
printf("A Message #%d Received: %s\n", count, text.c_str());
} catch (CMSException& e) {
e.printStackTrace();
}
}
当我运行消费者时,它甚至无法尝试确认一条消息:
[root@amq6-283-1 examples]# ./simple_async_consumer
=====================================================
Starting the example:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
A Message #1 Received: Hello world! from thread 140486368756208
Segmentation fault (core dumped)
这里的事情是,一旦消息对象退出 OnMessage 函数,所有的资源都没有了,无法传递给其他线程。
CMS API 文档清楚地说明了这一点:
/**
* Called asynchronously when a new message is received, the message
* reference can be to any of the Message types. a dynamic cast is used
* to find out what type of message this is. The lifetime of this
* object is only guaranteed to be for the life of the onMessage function
* after this call-back returns the message may no longer exist. Users should
* copy the data or clone the message if they wish to retain information that
* was contained in this Message.
*
* It is considered a programming error for this method to throw an
* exception. The method has been tagged with the 'throw()' qualifier,
* this implies that you application will segfault if you throw an error
* from an implementation of this method.
*
* @param message
* Message object {const} pointer recipient does not own.
*/
我了解示例仅用于串行处理,但我真诚地要求进行并行处理,这意味着所有事情都不是在单个线程中完成的。如果是串行的,在当前消息被处理并返回 ack 之前,当前线程无法接收更多批次的消息。确实不能满足客户的性能需求。
那么任何人都可以说明 CMS API 是如何设计来处理并行性的吗?Receiver 线程只专注于接收OnMessage
函数内部的消息,而其他业务线程则专注于业务处理并根据结果返回 ack。我只想知道 CMS API 如何处理并行性。这就是他们使用 CLIENT ACK 模式的方式。任何人都可以提供一个并行示例吗?