重要的提示:
由于 Ikegami 的评论,此问题已被编辑。我希望实际问题现在更清楚了。请注意,在撰写本文时是上午 10 点,添加的代码没有经过 IDE 或任何类型的代码编辑器,因此可能会出现拼写错误、缩进错误和错误。遇到这种情况请谅解,谢谢。回到最初的问题。
我的大脑围绕着 AMQP,在尝试了几个库之后,rabbitmq-c让我最接近成功。我仍在研究这些示例,但即使是那些也让我感到悲伤,一些上下文代码,首先是工作版本:
主.cpp:
#include "handler.h"
#include <QTimer>
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
Handler* myHandler = new Handler();
return a.exec();
}
处理程序.h
#define HANDLER_H
#include <QObject>
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
class Handler : public QObject
{
Q_OBJECT
private:
amqp_socket_t* socket;
amqp_connection_state_t connectionState;
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
public:
explicit Handler(QObject *parent = nullptr);
void sendMessage(QString queue_name, QString message);
signals:
};
#endif // HANDLER_H
和 Handler.cpp
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include<QDebug>
Handler::Handler(QObject *parent) : QObject(parent)
{
hostname = "ip.to.rabbitmq.server";
port = 5672;
exchange = "";
routingkey = "someRoutingKey";
messagebody = "HOOH";
connectionState = amqp_new_connection();
socket = amqp_tcp_socket_new(connectionState);
if (!socket) {
std::cout << "die: socket is false";
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
std::cout << "die: opening TCP socket" << std::endl;
}
amqp_login(connectionState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest");
amqp_channel_open(connectionState, 1);
amqp_get_rpc_reply(connectionState);
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
amqp_basic_publish(connectionState, 1, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0,
&props, amqp_cstring_bytes(messagebody));
}
amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connectionState);
}
void Handler::sendMessage(QString queue_name, QString message)
{
}
我真正想做的和我没能做的看起来像:
主文件
#include <QCoreApplication>
#include "handler.h"
#include <QTimer>
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
Handler* myHandler = new Handler();
QTimer::singleShot(5000, [myHandler]()->void
{
myHandler->sendMessage("someRoutingKey", "HOOH");
QTimer::singleShot(5000, [myHandler]()->void
{
myHandler->close();
QCoreApplication::quit();
}
});
return a.exec();
}
main 的详细信息在这里并不重要,文件在这里只是为了显示我想要使用我的 Handler 类的方式:
- 创建处理程序类,
- 延迟一段时间以确保我们不会陷入跨网络通信的延迟,然后将消息发送到队列。最后
- 延迟一段时间,然后关闭连接并关闭程序
处理程序.h
#define HANDLER_H
#include <QObject>
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
class Handler : public QObject
{
Q_OBJECT
private:
amqp_socket_t* socket;
amqp_connection_state_t connectionState;
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
public:
explicit Handler(QObject *parent = nullptr);
void sendMessage(QString queue_name, QString message);
void close();
signals:
};
#endif // HANDLER_H
和 handler.cpp
#include "handler.h"
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include<QDebug>
Handler::Handler(QObject *parent) : QObject(parent)
{
hostname = "ip.to.rabbitmq.server";
port = 5672;
exchange = "";
routingkey = "someRoutingKey";
messagebody = "HOOH";
connectionState = amqp_new_connection();
socket = amqp_tcp_socket_new(connectionState);
if (!socket) {
std::cout << "die: socket is false";
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
std::cout << "die: opening TCP socket" << std::endl;
}
amqp_login(connectionState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest");
amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connectionState);
}
void Handler::sendMessage(QString queue_name, QString message)
{
amqp_channel_open(connectionState, 1);
amqp_get_rpc_reply(connectionState);
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
amqp_basic_publish(connectionState, 1, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0,
&props, amqp_cstring_bytes(messagebody));
}
}
void Handler::close()
{
amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connectionState);
}
您可以看到当前工作的实现与我想要的实现之间的差异。据我了解,将指令从一个功能拆分为多个不应该影响功能,但它似乎会。我尝试过使用和不使用 QTimers,但均无济于事。
这里有什么问题?我想将连接的形成,消息的发送和关闭连接分开到他们自己的功能中,但是无论我尝试什么,它都不起作用。我正在使用的库示例可以在这里找到。
如前所述,我尝试了不同的库: QAMQP 的两个版本都在这里和这里找到,以及原始 C++ 实现在这里找到。在这三者中,rabbitmq-c 让我最接近突破。
任何帮助将不胜感激,谢谢。
编辑
我重做了代码示例以使我的观点更清楚