0

重要的提示:

由于 Ikegami 的评论,此问题已被编辑。我希望实际问题现在更清楚了。请注意,在撰写本文时是上午 10 点,添加的代码没有经过 IDE 或任何类型的代码编辑器,因此可能会出现拼写错误、缩进错误和错误。遇到这种情况请谅解,谢谢。回到最初的问题。

我的大脑围绕着 AMQP,在尝试了几个库之后,rabbitmq-c让我最接近成功。我仍在研究这些示例,但即使是那些也让我感到悲伤,一些上下文代码,首先是工作版本:

主.cpp:

#include "handler.h"
#include <QTimer>
int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Handler* myHandler = new Handler();
    return a.exec();
}

处理程序.h

#define HANDLER_H

#include <QObject>
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"

class Handler : public QObject
{
    Q_OBJECT

private:
    amqp_socket_t* socket;
    amqp_connection_state_t connectionState;
    char const *hostname;
    int port, status;
    char const *exchange;
    char const *routingkey;
    char const *messagebody;

public:
    explicit Handler(QObject *parent = nullptr);
    void sendMessage(QString queue_name, QString message);

signals:

};

#endif // HANDLER_H

和 Handler.cpp

#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include<QDebug>


Handler::Handler(QObject *parent) : QObject(parent)
{

      hostname = "ip.to.rabbitmq.server";
      port = 5672;
      exchange = "";
      routingkey = "someRoutingKey";
      messagebody = "HOOH";

      connectionState = amqp_new_connection();

      socket = amqp_tcp_socket_new(connectionState);
      if (!socket) {
        std::cout << "die: socket is false";
      }

      status = amqp_socket_open(socket, hostname, port);
      if (status) {
        std::cout << "die: opening TCP socket" << std::endl;
      }

      amqp_login(connectionState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
                                   "guest", "guest");
      amqp_channel_open(connectionState, 1);
      amqp_get_rpc_reply(connectionState);

      {
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("text/plain");
        props.delivery_mode = 2; /* persistent delivery mode */
        amqp_basic_publish(connectionState, 1, amqp_cstring_bytes(exchange),
                                        amqp_cstring_bytes(routingkey), 0, 0,
                                        &props, amqp_cstring_bytes(messagebody));
      }

      amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
      amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
      amqp_destroy_connection(connectionState);
}

void Handler::sendMessage(QString queue_name, QString message)
{
    
}

我真正想做的和我没能做的看起来像:

主文件

#include <QCoreApplication>
#include "handler.h"
#include <QTimer>
int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Handler* myHandler = new Handler();
    QTimer::singleShot(5000, [myHandler]()->void
    {
        myHandler->sendMessage("someRoutingKey", "HOOH");
        QTimer::singleShot(5000, [myHandler]()->void
        {
             myHandler->close();
             QCoreApplication::quit();
        }
    });
    return a.exec();
}

main 的详细信息在这里并不重要,文件在这里只是为了显示我想要使用我的 Handler 类的方式:

  1. 创建处理程序类,
  2. 延迟一段时间以确保我们不会陷入跨网络通信的延迟,然后将消息发送到队列。最后
  3. 延迟一段时间,然后关闭连接并关闭程序

处理程序.h

#define HANDLER_H

#include <QObject>
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"

class Handler : public QObject
{
    Q_OBJECT

private:
    amqp_socket_t* socket;
    amqp_connection_state_t connectionState;
    char const *hostname;
    int port, status;
    char const *exchange;
    char const *routingkey;
    char const *messagebody;

public:
    explicit Handler(QObject *parent = nullptr);
    void sendMessage(QString queue_name, QString message);
    void close();

signals:

};

#endif // HANDLER_H

和 handler.cpp

#include "handler.h"
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include<QDebug>


Handler::Handler(QObject *parent) : QObject(parent)
{

      hostname = "ip.to.rabbitmq.server";
      port = 5672;
      exchange = "";
      routingkey = "someRoutingKey";
      messagebody = "HOOH";

      connectionState = amqp_new_connection();

      socket = amqp_tcp_socket_new(connectionState);
      if (!socket) {
        std::cout << "die: socket is false";
      }

      status = amqp_socket_open(socket, hostname, port);
      if (status) {
        std::cout << "die: opening TCP socket" << std::endl;
      }

      amqp_login(connectionState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
                                   "guest", "guest");
      

      amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
      amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
      amqp_destroy_connection(connectionState);
}

void Handler::sendMessage(QString queue_name, QString message)
{
    amqp_channel_open(connectionState, 1);
      amqp_get_rpc_reply(connectionState);

      {
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("text/plain");
        props.delivery_mode = 2; /* persistent delivery mode */
        amqp_basic_publish(connectionState, 1, amqp_cstring_bytes(exchange),
                                        amqp_cstring_bytes(routingkey), 0, 0,
                                        &props, amqp_cstring_bytes(messagebody));
      }
}

void Handler::close()
{
      amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
      amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
      amqp_destroy_connection(connectionState);
}


您可以看到当前工作的实现与我想要的实现之间的差异。据我了解,将指令从一个功能拆分为多个不应该影响功能,但它似乎会。我尝试过使用和不使用 QTimers,但均无济于事。

这里有什么问题?我想将连接的形成,消息的发送和关闭连接分开到他们自己的功能中,但是无论我尝试什么,它都不起作用。我正在使用的库示例可以在这里找到。

如前所述,我尝试了不同的库: QAMQP 的两个版本都在这里这里找到,以及原始 C++ 实现在这里找到。在这三者中,rabbitmq-c 让我最接近突破。

任何帮助将不胜感激,谢谢。

编辑

我重做了代码示例以使我的观点更清楚

4

0 回答 0