0

我正在尝试编写一个生产者、多个消费者管道,其中消费者在并行线程中运行。或者找到或分享一个简单的例子。使用Go 中相对简单的代码,输出清楚地显示了消费者并行工作。我认为它可能与 Boost 1.73 光纤类似,但我无法超越(不出所料)按顺序工作的代码:

#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/fiber.hpp>

static void process(int item) {
    std::cout << "consumer processing " << item << std::endl;
    auto wasteOfTime = 0.;
    for (auto s = 0.; s < item; s += 1e-7) {
        wasteOfTime += sin(s);
    }
    if (wasteOfTime != 42) {
        std::cout << "consumer processed " << item << std::endl;
    }
}

static const std::uint32_t workers = 3;

int main() {
    boost::fibers::buffered_channel<int> channel { 2 };

    boost::fibers::fiber consumer[workers];
    for (int i = 0; i < workers; ++i) {
        consumer[i] = boost::fibers::fiber([&channel]() {
            for (auto item : channel) {
                process(item);
            }
        });
    }

    auto producer = boost::fibers::fiber([&channel]() {
        std::cout << "producer starting" << std::endl;
        channel.push(1);
        channel.push(2);
        channel.push(3);
        channel.close();
        std::cout << "producer ending" << std::endl;
    });

    producer.join();
    for (int i = 0; i < workers; ++i) {
        consumer[i].join();
    }
    return 0;
}

我尝试插入许多代码片段的变体来让工作线程调度纤程,但它们总是按顺序执行或根本不执行。来自关于逆问题的问题的代码似乎是朝着正确方向迈出的一步,虽然比 Go 复杂得多,但是(当使用 -DM_1_PI=3.14 编译时)该程序对我来说也只是闲置。

4

1 回答 1

0

原来我误解了 boost 用于调度光纤的代码片段。它似乎对我有用:

#include <boost/fiber/barrier.hpp>
#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/algo/work_stealing.hpp>
#include <optional>

static void process(int item) {
    std::cout << "processing " << item << std::endl;
    auto wasteOfTime = 0.;
    for (auto s = 0.; s < 1; s += 3e-9) {
        wasteOfTime += sin(s);
    }
    if (wasteOfTime != 42) {
        std::cout << "processed " << item << std::endl;
    }
}

static const std::uint32_t N_CONSUMING_FIBERS = 3;

void task() {
    boost::fibers::buffered_channel<int> channel{ 2 };

    boost::fibers::fiber fibers[1 + N_CONSUMING_FIBERS];
    fibers[0] = boost::fibers::fiber([&channel]() {
        std::cout << "producer starting" << std::endl;
        channel.push(1);
        channel.push(2);
        channel.push(3);
        channel.push(4);
        channel.push(5);
        channel.close();
        std::cout << "producer ending" << std::endl;
        });

    auto start = boost::fibers::barrier(N_CONSUMING_FIBERS);
    for (int i = 1; i <= N_CONSUMING_FIBERS; ++i) {
        fibers[i] = boost::fibers::fiber([&start, &channel]() {
            start.wait();
            for (auto item : channel) {
                process(item);
            }
            });
    }

    for (int i = 0; i <= N_CONSUMING_FIBERS; ++i) {
        fibers[i].join();
    }
}

class FiberExecutor {
    static std::vector<std::thread > extra_threads;
    static std::optional<boost::fibers::barrier> barrier;

public:
    static void init() {
        auto const N_THREADS = std::thread::hardware_concurrency();
        barrier.emplace(N_THREADS);
        extra_threads.reserve(N_THREADS - 1);
        for (auto i = decltype(N_THREADS){0}; i < N_THREADS - 1; ++i) {
            extra_threads.emplace_back([N_THREADS]() {
                boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(N_THREADS);
                barrier->wait(); // start only after everyone has scheduled
                barrier->wait(); // end after main thread says to
                });
        }
        boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(N_THREADS);
        barrier->wait(); // start only after everyone has scheduled
    }

    static void exit() {
        barrier->wait();
        for (auto& thread : extra_threads) {
            thread.join();
        }
    }
};

std::vector<std::thread> FiberExecutor::extra_threads;
std::optional<boost::fibers::barrier> FiberExecutor::barrier;

int Main() {
    FiberExecutor::init();

    std::cout << "Going once\n";
    task();
    std::cout << "Going twice\n";
    task();

    FiberExecutor::exit();
    return 0;
}

请注意,这可能不正确。它也不会将可靠的调度到所有线程上(至少在 Windows 上)。

实际上,它与关于逆问题的问题中的代码几乎相同,但需要更新。另外,我发现使用条件变量而不是障碍是不可靠的,因为主线程可能会先于其他线程运行。

我没有将初始化和清理建模为一个类,以避免产生可以多次实例化它的错觉(按顺序或并行)。work_stealing的构造函数使用全局变量。您不能干净地结束基于光纤的任务,然后启动另一个基于光纤的任务,即使在临时线程中编码所有内容并且不以任何方式污染主线程也是如此。

于 2020-05-04T14:24:21.140 回答