5

我正在为我的全双工服务器编写测试,当我执行多个(顺序)调用(虽然被一条链覆盖)时,我从文件中async_write得到以下断言错误boost::beastboost/beast/websocket/detail/stream_base.hpp

// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);

要在您的机器上重现该问题:可在此处找到重现此问题 (MCVE) 的完整客户端代码。它在链接中不起作用,因为您需要一个服务器(在您自己的机器上,抱歉,因为无法方便地在线执行此操作,这更好地客观地表明问题出在客户端,而不是服务器,如果我把它包括在这里)。我使用websocketd./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py使用命令where创建服务器./prog.py一个简单的 python 程序,用于打印和刷新(我从websocketd 主页获得)。

在客户端进行写入的调用如下所示:

  std::vector<std::vector<std::future<void>>> clients_write_futures(
      clients_count);
  for (int i = 0; i < clients_count; i++) {
    clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
    for (int j = 0; j < num_of_messages; j++) {
      clients_write_futures[i][j] =
          clients[i]->write_data_async_future("Hello"); // writing here
    }
  }

请注意,我在示例中仅使用了 1 个客户端。客户端数组只是在测试时对服务器施加更多压力的概括。

我对这个问题的评论:

  1. 循环是顺序的;这不像我在多个线程中这样做
  2. 应该可以以全双工形式进行通信,其中无限数量的消息被发送到服务器。全双工通讯还能怎么做?
  3. 我正在使用 strands 来包装我的异步调用,以防止通过 io_service/io_context 在套接字中发生任何冲突
  4. 使用调试器对此进行调查显示循环的第二次迭代始终失败,这意味着我在做一些根本错误的事情,但我不知道它是什么。换句话说:这显然是一个确定性问题。

我在这里做错了什么?如何向我的 websocket 服务器写入不定数量的消息?


编辑:

Sehe,我想首先为代码混乱道歉(没有意识到这很糟糕),并感谢您为此付出的努力。我希望你问我为什么它同时以这种(可能)有组织且混乱的方式构建,答案很简单:主要是一个 gtest 代码,用于查看我用来强调的通用、多功能 websocket 客户端是否工作-测试我的服务器(它使用大量多线程 io_service 对象,我认为这些对象很敏感,需要进行广泛的测试)。我计划在实际生产测试期间同时用许多客户端轰炸我的服务器。我发布这个问题是因为我不理解客户的行为。我在这个文件中所做的是创建一个 MCVE(人们一直要求 SO)。我花了两个小时剥离我的代码来创建它,

那么为什么我不捕捉异常呢?因为 gtest 会捕获它们并认为测试失败。主要不是生产代码,而是客户端。我从你提到的东西中学到了很多,我不得不说扔和接是愚蠢的,但我不知道 std::make_exception_ptr(),所以我找到了我的 (dumm) 方法来实现相同的结果:- )。为什么有太多无用的功能:在这个测试/示例中它们在这里没用,但通常我可以稍后将它们用于其他事情,因为这个客户端不仅适用于这种情况。

现在回到问题:我不明白的是,为什么我们必须在async_writestrand_ 在主线程的循环中顺序使用它时覆盖它(我错误地表达了我只覆盖了处理程序)。我会理解为什么要覆盖处理程序,因为套接字不是线程安全的,并且多线程io_service会在那里产生竞争。我们也知道它io_service::post本身是线程安全的(这就是我认为不需要包装 async_write 的原因)。你能解释一下在我们需要包装 async_write 本身时,什么不是线程安全的吗?我知道你已经知道这一点,但同样的断言仍在触发。我们对处理程序和异步队列进行了顺序化,客户端仍然不喜欢进行多次写入调用。还能缺少什么?

(顺便说一句,如果你写,然后得到未来,然后读,然后再写,它会起作用。这就是为什么我使用期货来准确定义测试用例并定义我的测试的时间顺序。我很偏执这里。)

4

1 回答 1

2

async_write用一根绳子盖住你的。但你没有做这样的事。您所看到的只是将完成处理程序包装在该 strand 中。但是您直接发布异步操作。

更糟糕的是,您是从主线程执行此操作的,而与您的实例关联的任何线程上正在进行异步操作,WSClient这意味着您正在同时访问非线程安全的对象实例。

这是一场数据竞赛,所以你得到了Undefined Behavior

一个天真的修复可能是:

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();

    post(strand_, [=,self=shared_from_this()] {
        websock.async_write(
            boost::asio::buffer(*data_ptr),
            boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
                                                          std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                          write_promise)));
    });

    return write_promise->get_future();
}

但这还不够。现在您可以确定您的任何异步操作或其完成都不会同时运行,但您仍然可以在调用第一个完成处理程序之前发布下一个异步操作。

要解决这个问题,你只需要排队。

老实说,我不确定您为什么如此关注使用期货的同步。这使得实现这一目标变得非常困难。如果您可以描述您/功能上/想要实现的目标,我可以提出一个可能要短得多的解决方案。

代码审查说明

在我了解代码的全部内容之前,我花了很多时间阅读您的代码。我不想抢走我一路上做的笔记。

警告:这是一个相当漫长的代码潜水。我提供它是因为其中一些见解可能会帮助您了解需要如何重构代码。

我开始阅读异步代码链,直到on_handshake设置started_promise值)。

然后我进入了你的main职能部门。你的主要功能是50行代码?!有几个并行容器并通过它们重复手动嵌套循环?

这是我经过一些重构后得到的:

int main() {
    std::vector<actor> actors(1);

    for (auto& a : actors) {
        a.client = std::make_shared<WSClient>();
        a.session_start_future = a.client->start("127.0.0.1", "8085");
        a.messages.resize(50);
    }

    for (auto& a : actors) { a.session_start_future.get(); }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future = a.client->write_data_async_future("Hello");
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.read_future = a.client->read_data_async_future();
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future.get();
        std::string result = m.read_future.get();
    } }
}

所有数据结构都已折叠到小助手中actor

struct actor {
    std::shared_ptr<WSClient> client;
    std::future<void> session_start_future;

    struct message {
        std::string message = GenerateRandomString(20);
        std::future<void> write_future;
        std::future<std::string> read_future;
    };

    std::vector<message> messages;
};

我们现在大约进行了一个小时的代码审查,没有任何收获,除了我们现在可以告诉main正在做什么,并且有信心循环变量或其他东西没有一些微不足道的错误。

拾取备份

在写作的开头:write_data_async_future。等待。还有write_data_asyncwrite_data_sync。为什么?你会想读

更糟糕的是,WSClient仅将这些转发给假定的单个会话。为什么在这一点上WSClient有区别?WSClientSession我说,没有。

进一步蒸发 30 行不太有用的代码,我们仍然遇到同样的失败,所以这很好。

我们刚刚说到哪了。write_data_async_future. 哦,是的,我们需要非未来版本吗?不。所以,还有 40 行代码消失了。

现在,说真的write_data_async_future::

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
    websock.async_write(
        boost::asio::buffer(*data_ptr),
        boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
                                                      std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                      write_promise)));
    return write_promise->get_future();
}

看起来……还行。等等,有on_write_future吗?这可能意味着我们需要蒸发更多未使用的代码行。看着……是的。噗,没了

到目前为止,diffstat 看起来像这样:

  test.cpp | 683 +++++++++++++++++++++++----------------------------------------
  1 file changed, 249 insertions(+), 434 deletions(-)

回到那个函数,让我们看看on_write_future

void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
                     std::shared_ptr<std::string> data_posted,
                     std::shared_ptr<std::promise<void> > write_promise) {
    boost::ignore_unused(bytes_transferred);
    boost::ignore_unused(data_posted);

    if (ec) {
        try {
            throw std::runtime_error("Error thrown while performing async write: " + ec.message());
        } catch (...) {
            write_promise->set_exception(std::current_exception());
        }
        return;
    }
    write_promise->set_value();
}

几个问题。过去的一切都被忽略了。我知道你传递 shared_ptrs 的目的,但也许你应该将它们作为操作对象的一部分传递,以避免有这么多单独的 shared-ptrs。

抛出异常只是为了捕获它?嗯。对此我不确定。也许只是设置一个新的例外:

if (ec) {
    write_promise->set_exception(
            std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
    write_promise->set_value();
}

即使这样,现在也存在概念问题。您随意使用get()而不捕获的main方式意味着任何连接中的任何错误都将中止所有操作。简单地中止一个连接/会话/客户端的错误将非常有用。其中,在您的代码中都是非常同义的(也与io_contextand thread)。

旁注:您将线程存储为成员,但始终将其分离。这意味着该成员从那时起就没有用了。

在这一点上,我从复习中休息了一下,碰巧我得到了向我展示问题的脑电波。我锻炼的半生不熟的结果就在这里。请注意,您不能使用它,因为它实际上并不能解决问题。但它可能会在其他方面有所帮助吗?

于 2018-06-06T23:39:29.623 回答