1

当发生一些奇怪的崩溃和未定义的行为时,我使用Windows Fiber实现了我自己的任务调度程序。为了简单起见,我开始了一个新项目并编写了一个执行以下操作的简单程序:

  1. 主线程创建一堆纤维,然后启动两个线程
  2. 主线程一直等到你杀死程序
  3. 每个工作线程都将自己转换为纤维
  4. 每个工作线程试图找到一个空闲的光纤,然后切换到这个新的空闲光纤
  5. 一旦线程切换到新的光纤,它会将之前的光纤推入空闲光纤容器
  6. 每个工作线程进入第 4 步

如果您不熟悉光纤概念,本次讲座是一个好的开始

数据

每个线程都有自己的ThreadData数据结构来存储其先前的、当前的纤程实例及其线程索引。我尝试了几种在执行期间检索ThreadData数据结构的方法:

  • 我使用线程本地存储来存储 ThreadData 指针
  • 我使用了一个将 thread_id 与 ThreadData 结构相关联的容器

问题

当第一次输入纤程时(查看FiberFunc函数),使用该纤程的线程必须将其前一个纤程推入自由纤程容器中。但是碰巧有时之前的光纤是空的,这是不可能的。这是不可能的,因为在切换到新光纤之前,线程将其先前的光纤值设置为其当前的光纤值(并将其当前的光纤值设置为新的光纤值)。

因此,如果一个线程进入一个全新的光纤,其先前的光纤设置为空,这意味着它来自无处(这没有任何意义)。

ThreadData在进入全新的 Fiber 时将其先前的 Fiber值设置为 null的唯一原因是另一个线程将其设置为 null 或者编译器在后台重新排序了指令。

我检查了程序集,似乎编译器不负责。

有几个我无法解释的错误:

  1. 如果我使用第一个GetThreadData()函数来检索 ThreadData 结构,我可以检索其索引与线程本地索引不同的实例(这些索引已在线程启动时设置)。这将使程序断言 (assert(threadData->index == localThreadIndex))。

  2. 如果我使用任何其他函数来检索 ThreadData 结构,我将在FiberFunc函数中断言,因为先前的 Fiber值为 null (assert(threadData->previousFiber))。

你知道为什么这段代码不起作用吗?我花了无数个小时试图找出问题所在,但我没有看到我的错误。

规格

操作系统:Windows 10

IDE:Visual Studio 2015 和 Visual Studio 2017

编译器:VC++

配置:发布

请注意,调试配置中没有错误。

编码

在断言触发之前,您可能会尝试多次运行它。

#include "Windows.h"
#include <vector>
#include <thread>
#include <mutex>
#include <cassert>
#include <iostream>
#include <atomic>

struct Fiber
{
    void* handle;
};

struct ThreadData
{
    Fiber*  previousFiber{ nullptr };
    Fiber*  currentFiber{ nullptr };
    Fiber   fiber{ };
    unsigned int index{};
};

//Threads
std::vector<std::pair<std::thread::id, unsigned int>> threadsinfo{};

//threads data container
ThreadData  threadsData[8];

//Fibers
std::mutex  fibersLock{};
std::vector<Fiber> fibers{};
std::vector<Fiber*> freeFibers{};

thread_local unsigned int localThreadIndex{};
thread_local Fiber* debug_localTheadLastFiber{};
thread_local ThreadData* localThreadData{};

using WindowsThread = HANDLE;
std::vector<WindowsThread> threads{};

//This is the first way to retrieve the current thread's ThreadData structure using thread_id
//ThreadData* GetThreadData()
//{
//  std::thread::id threadId( std::this_thread::get_id());
//  for (auto const& pair : threadsinfo)
//  {
//      if (pair.first == threadId)
//      {
//          return &threadsData[pair.second];
//      }
//  }
//
//  //It is not possible to assert
//  assert(false);
//  return nullptr;
//}

//This is the second way to retrieve the current thread's ThreadData structure using thread local storage
//ThreadData* GetThreadData()
//{
//  return &threadsData[localThreadIndex];
//}


//This is the third way to retrieve the current thread's ThreadData structure using thread local storage
ThreadData* GetThreadData()
{
    return localThreadData;
}


//Try to pop a free fiber from the container, thread safe due to mutex usage
bool  TryPopFreeFiber(Fiber*& fiber)
{
    std::lock_guard<std::mutex> guard(fibersLock);
    if (freeFibers.empty()) { return false; }
    fiber = freeFibers.back();
    assert(fiber);
    assert(fiber->handle);
    freeFibers.pop_back();
    return true;
}


//Try to push a free fiber to the container, thread safe due to mutex usage
bool PushFreeFiber(Fiber* fiber)
{
    std::lock_guard<std::mutex> guard(fibersLock);
    freeFibers.push_back(fiber);
    return true;
}


//the __declspec(noinline) is used to inspect code in release mode, comment it if you want
__declspec(noinline) void  _SwitchToFiber(Fiber* newFiber)
{
    //You want to switch to another fiber
    //You first have to save your current fiber instance to release it once you will be in the new fiber
    {
        ThreadData* threadData{ GetThreadData() };
        assert(threadData->index == localThreadIndex);
        assert(threadData->currentFiber);
        threadData->previousFiber = threadData->currentFiber;
        threadData->currentFiber = newFiber;
        debug_localTheadLastFiber = threadData->previousFiber;
        assert(threadData->previousFiber);
        assert(newFiber);
        assert(newFiber->handle);
    }

    //You switch to the new fiber
    //this call will either make you enter in the FiberFunc function if the fiber has never been used
    //Or you will continue to execute this function if the new fiber has been already used (not that you will have a different stack so you can't use the old threadData value)
    ::SwitchToFiber(newFiber->handle);

    {
        //You must get the current ThreadData* again, because you come from another fiber (the previous statement is a switch), this fiber could have been used by any other thread
        ThreadData* threadData{ GetThreadData() };

        //THIS ASSERT WILL FIRES IF YOU USE THE FIRST GetThreadData METHOD, WHICH IS IMPOSSIBLE....
        assert(threadData->index == localThreadIndex);

        assert(threadData);
        assert(threadData->previousFiber);

        //We release the previous fiber
        PushFreeFiber(threadData->previousFiber);
        debug_localTheadLastFiber = nullptr;
        threadData->previousFiber = nullptr;
    }

}


void ExecuteThreadBody()
{
    Fiber*  newFiber{};

    if (TryPopFreeFiber(newFiber))
    {
        _SwitchToFiber(newFiber);
    }
}


DWORD __stdcall ThreadFunc(void* data)
{
    int const index{ *static_cast<int*>(data)};

    threadsinfo[index] = std::make_pair(std::this_thread::get_id(), index);

    //setting up the current thread data
    ThreadData* threadData{ &threadsData[index] };
    threadData->index = index;

    void*   threadAsFiber{ ConvertThreadToFiber(nullptr) };
    assert(threadAsFiber);

    threadData->fiber = Fiber{ threadAsFiber };
    threadData->currentFiber = &threadData->fiber;

    localThreadData = threadData;
    localThreadIndex = index;

    while (true)
    {
        ExecuteThreadBody();
    }

    return DWORD{};
}


//The entry point of all fibers
void __stdcall FiberFunc(void* data)
{
    //You enter to the fiber for the first time

    ThreadData* threadData{ GetThreadData() };

    //Making sure that the thread data structure is the good one
    assert(threadData->index == localThreadIndex);

    //Here you will assert
    assert(threadData->previousFiber);

    PushFreeFiber(threadData->previousFiber);
    threadData->previousFiber = nullptr;

    while (true)
    {
        ExecuteThreadBody();
    }
}


__declspec(noinline) void main()
{
    constexpr unsigned int threadCount{ 2 };
    constexpr unsigned int fiberCount{ 20 };

    threadsinfo.resize(threadCount);

    fibers.resize(fiberCount);
    for (auto index = 0; index < fiberCount; ++index)
    {
        fibers[index] = { CreateFiber(0, FiberFunc, nullptr) };
    }

    freeFibers.resize(fiberCount);
    for (auto index = 0; index < fiberCount; ++index)
    {
        freeFibers[index] = std::addressof(fibers[index]);
    }

    threads.resize(threadCount);

    std::vector<int>    threadParamss(threadCount);



    for (auto index = 0; index < threadCount; ++index)
    {
        //threads[index] = new std::thread{ ThreadFunc, index };
        threadParamss[index] = index;
        threads[index] = CreateThread(NULL, 0, &ThreadFunc, &threadParamss[index], 0, NULL);
        assert(threads[index]);
    }

    while (true);

    //I know, it is not clean, it will leak
}
4

1 回答 1

0

嗯,几个月后。我发现声明为thread_local的变量是罪魁祸首。如果您使用纤程,请忘记thread_local变量并使用您在创建它们时分配的每个纤程内存。我现在将我当前的线程索引存储在 per-fiber 结构实例中。

于 2019-01-19T15:00:37.470 回答