我已使用 vcpkg 构建 rabbitmq-c,如https://github.com/alanxz/rabbitmq-c构建说明中所述:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
./vcpkg integrate install
./vcpkg install librabbitmq
./vcpkg install boost
然后我构建了https://github.com/alanxz/SimpleAmqpClient(最新发布版本 2.5.1)
我面临的问题是 CorrelationId、Type 等消息属性没有发送到 RabbitMq 集线器。在我升级到最新版本之前,相同的代码以前可以工作。
这是我正在使用的代码:我使用以下代码发布消息:
void RabbitMqNode::SendResponse(google::protobuf::Message *response, NodeEnvelope *envelope, std::map<string, string> headers)
{
auto message = GetBasicResponseMessage(response, envelope, &headers);
auto returnAddress = envelope->returnAddress;
channel->BasicPublish(std::string(), returnAddress, message);
}
AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicResponseMessage(google::protobuf::Message *response, NodeEnvelope * envelope, std::map<string, string>* headers)
{
auto message = GetBasicMessage(response, headers);
message->CorrelationId(envelope->corelationId);
return message;
}
AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicMessage(google::protobuf::Message *inputMessage, std::map<string, string> *headers)
{
auto body = serializer.SerializeMessageToJson(inputMessage);
return GetBasicMessage(inputMessage, body, headers);
}
AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicMessage(google::protobuf::Message *inputMessage, std::string messageBody, std::map<string, string> *headers)
{
auto name = GetMessageName(inputMessage);
auto message = AmqpClient::BasicMessage::Create(messageBody);
message->DeliveryMode(AmqpClient::BasicMessage::delivery_mode_t::dm_nonpersistent);
message->ContentType(MIME_JSON);
message->Type(name);
message->AppId(nodeId);
auto headerTable = message->HeaderTable();
InitializeHeaders(&headerTable, headers);
message->HeaderTable(headerTable);
message->Timestamp(utility.GetCurrentUnixTimestampMilliseconds());
return message;
}
void RabbitMqNode::InitializeHeaders(AmqpClient::Table *headersTable, std::map<string, string> *headers)
{
if (headers->empty())
{
return;
}
std::map<string, string>::iterator it = headers->begin();
while (it != headers->end())
{
auto key = (std::string)it->first;
auto value = it->second;
headersTable->insert(std::pair<string, string>(key, value));
// Increment the Iterator to point to next entry
it++;
}
}
注意:邮件正文和标头正在发送,没有任何问题。
观察:如果我不初始化标头,则仅正确发送 AppId 属性;但是如果我初始化标头,那么除了标头之外,没有其他属性被发送到集线器。正文始终正确发送;没问题。
有人可以找出错误吗?
PS:我为此在 Git 存储库中创建了一个问题:https ://github.com/alanxz/SimpleAmqpClient/issues/287