当发生一些奇怪的崩溃和未定义的行为时,我使用Windows Fiber实现了我自己的任务调度程序。为了简单起见,我开始了一个新项目并编写了一个执行以下操作的简单程序:
- 主线程创建一堆纤维,然后启动两个线程
- 主线程一直等到你杀死程序
- 每个工作线程都将自己转换为纤维
- 每个工作线程试图找到一个空闲的光纤,然后切换到这个新的空闲光纤
- 一旦线程切换到新的光纤,它会将之前的光纤推入空闲光纤容器
- 每个工作线程进入第 4 步
如果您不熟悉光纤概念,本次讲座是一个好的开始。
数据
每个线程都有自己的ThreadData数据结构来存储其先前的、当前的纤程实例及其线程索引。我尝试了几种在执行期间检索ThreadData数据结构的方法:
- 我使用线程本地存储来存储 ThreadData 指针
- 我使用了一个将 thread_id 与 ThreadData 结构相关联的容器
问题
当第一次输入纤程时(查看FiberFunc函数),使用该纤程的线程必须将其前一个纤程推入自由纤程容器中。但是碰巧有时之前的光纤是空的,这是不可能的。这是不可能的,因为在切换到新光纤之前,线程将其先前的光纤值设置为其当前的光纤值(并将其当前的光纤值设置为新的光纤值)。
因此,如果一个线程进入一个全新的光纤,其先前的光纤设置为空,这意味着它来自无处(这没有任何意义)。
ThreadData在进入全新的 Fiber 时将其先前的 Fiber值设置为 null的唯一原因是另一个线程将其设置为 null 或者编译器在后台重新排序了指令。
我检查了程序集,似乎编译器不负责。
有几个我无法解释的错误:
如果我使用第一个GetThreadData()函数来检索 ThreadData 结构,我可以检索其索引与线程本地索引不同的实例(这些索引已在线程启动时设置)。这将使程序断言 (assert(threadData->index == localThreadIndex))。
如果我使用任何其他函数来检索 ThreadData 结构,我将在FiberFunc函数中断言,因为先前的 Fiber值为 null (assert(threadData->previousFiber))。
你知道为什么这段代码不起作用吗?我花了无数个小时试图找出问题所在,但我没有看到我的错误。
规格
操作系统:Windows 10
IDE:Visual Studio 2015 和 Visual Studio 2017
编译器:VC++
配置:发布
请注意,调试配置中没有错误。
编码
在断言触发之前,您可能会尝试多次运行它。
#include "Windows.h"
#include <vector>
#include <thread>
#include <mutex>
#include <cassert>
#include <iostream>
#include <atomic>
struct Fiber
{
void* handle;
};
struct ThreadData
{
Fiber* previousFiber{ nullptr };
Fiber* currentFiber{ nullptr };
Fiber fiber{ };
unsigned int index{};
};
//Threads
std::vector<std::pair<std::thread::id, unsigned int>> threadsinfo{};
//threads data container
ThreadData threadsData[8];
//Fibers
std::mutex fibersLock{};
std::vector<Fiber> fibers{};
std::vector<Fiber*> freeFibers{};
thread_local unsigned int localThreadIndex{};
thread_local Fiber* debug_localTheadLastFiber{};
thread_local ThreadData* localThreadData{};
using WindowsThread = HANDLE;
std::vector<WindowsThread> threads{};
//This is the first way to retrieve the current thread's ThreadData structure using thread_id
//ThreadData* GetThreadData()
//{
// std::thread::id threadId( std::this_thread::get_id());
// for (auto const& pair : threadsinfo)
// {
// if (pair.first == threadId)
// {
// return &threadsData[pair.second];
// }
// }
//
// //It is not possible to assert
// assert(false);
// return nullptr;
//}
//This is the second way to retrieve the current thread's ThreadData structure using thread local storage
//ThreadData* GetThreadData()
//{
// return &threadsData[localThreadIndex];
//}
//This is the third way to retrieve the current thread's ThreadData structure using thread local storage
ThreadData* GetThreadData()
{
return localThreadData;
}
//Try to pop a free fiber from the container, thread safe due to mutex usage
bool TryPopFreeFiber(Fiber*& fiber)
{
std::lock_guard<std::mutex> guard(fibersLock);
if (freeFibers.empty()) { return false; }
fiber = freeFibers.back();
assert(fiber);
assert(fiber->handle);
freeFibers.pop_back();
return true;
}
//Try to push a free fiber to the container, thread safe due to mutex usage
bool PushFreeFiber(Fiber* fiber)
{
std::lock_guard<std::mutex> guard(fibersLock);
freeFibers.push_back(fiber);
return true;
}
//the __declspec(noinline) is used to inspect code in release mode, comment it if you want
__declspec(noinline) void _SwitchToFiber(Fiber* newFiber)
{
//You want to switch to another fiber
//You first have to save your current fiber instance to release it once you will be in the new fiber
{
ThreadData* threadData{ GetThreadData() };
assert(threadData->index == localThreadIndex);
assert(threadData->currentFiber);
threadData->previousFiber = threadData->currentFiber;
threadData->currentFiber = newFiber;
debug_localTheadLastFiber = threadData->previousFiber;
assert(threadData->previousFiber);
assert(newFiber);
assert(newFiber->handle);
}
//You switch to the new fiber
//this call will either make you enter in the FiberFunc function if the fiber has never been used
//Or you will continue to execute this function if the new fiber has been already used (not that you will have a different stack so you can't use the old threadData value)
::SwitchToFiber(newFiber->handle);
{
//You must get the current ThreadData* again, because you come from another fiber (the previous statement is a switch), this fiber could have been used by any other thread
ThreadData* threadData{ GetThreadData() };
//THIS ASSERT WILL FIRES IF YOU USE THE FIRST GetThreadData METHOD, WHICH IS IMPOSSIBLE....
assert(threadData->index == localThreadIndex);
assert(threadData);
assert(threadData->previousFiber);
//We release the previous fiber
PushFreeFiber(threadData->previousFiber);
debug_localTheadLastFiber = nullptr;
threadData->previousFiber = nullptr;
}
}
void ExecuteThreadBody()
{
Fiber* newFiber{};
if (TryPopFreeFiber(newFiber))
{
_SwitchToFiber(newFiber);
}
}
DWORD __stdcall ThreadFunc(void* data)
{
int const index{ *static_cast<int*>(data)};
threadsinfo[index] = std::make_pair(std::this_thread::get_id(), index);
//setting up the current thread data
ThreadData* threadData{ &threadsData[index] };
threadData->index = index;
void* threadAsFiber{ ConvertThreadToFiber(nullptr) };
assert(threadAsFiber);
threadData->fiber = Fiber{ threadAsFiber };
threadData->currentFiber = &threadData->fiber;
localThreadData = threadData;
localThreadIndex = index;
while (true)
{
ExecuteThreadBody();
}
return DWORD{};
}
//The entry point of all fibers
void __stdcall FiberFunc(void* data)
{
//You enter to the fiber for the first time
ThreadData* threadData{ GetThreadData() };
//Making sure that the thread data structure is the good one
assert(threadData->index == localThreadIndex);
//Here you will assert
assert(threadData->previousFiber);
PushFreeFiber(threadData->previousFiber);
threadData->previousFiber = nullptr;
while (true)
{
ExecuteThreadBody();
}
}
__declspec(noinline) void main()
{
constexpr unsigned int threadCount{ 2 };
constexpr unsigned int fiberCount{ 20 };
threadsinfo.resize(threadCount);
fibers.resize(fiberCount);
for (auto index = 0; index < fiberCount; ++index)
{
fibers[index] = { CreateFiber(0, FiberFunc, nullptr) };
}
freeFibers.resize(fiberCount);
for (auto index = 0; index < fiberCount; ++index)
{
freeFibers[index] = std::addressof(fibers[index]);
}
threads.resize(threadCount);
std::vector<int> threadParamss(threadCount);
for (auto index = 0; index < threadCount; ++index)
{
//threads[index] = new std::thread{ ThreadFunc, index };
threadParamss[index] = index;
threads[index] = CreateThread(NULL, 0, &ThreadFunc, &threadParamss[index], 0, NULL);
assert(threads[index]);
}
while (true);
//I know, it is not clean, it will leak
}