4

我刚刚开始使用已合并 ffmpeg-mt 的最新版本的 ffmpeg。

但是,由于我的应用程序使用 TBB(英特尔线程构建块),具有新线程创建和同步的 ffmpeg-mt 实现不太合适,因为它可能会阻止我执行解码功能的 TBB 任务。它还会不必要地破坏缓存。

我在 pthread.c 中四处寻找,它似乎实现了 ffmpeg 用来启用多线程的接口。

我的问题是是否可以创建一个实现相同功能但使用 tbb 任务而不是显式线程的 tbb.c?

我对 C 没有经验,但我的猜测是不可能轻松地将 tbb(即 C++)编译成 ffmpeg。所以也许在运行时以某种方式覆盖 ffmpeg 函数指针会是要走的路吗?

对于将 TBB 实施到 ffmpeg 线程 api 中的任何建议或意见,我将不胜感激。

4

2 回答 2

7

所以我通过阅读ffmpeg代码想出了如何做到这一点。

基本上你所要做的就是包含下面的代码并使用tbb_avcodec_open/tbb_avcodec_close而不是 ffmpegs' avcodec_open/avcodec_close

这将使用 TBB 任务并行执行解码。

 // Author Robert Nagy

#include "tbb_avcodec.h"

#include <tbb/task.h>
#include <tbb/atomic.h>

extern "C" 
{
    #define __STDC_CONSTANT_MACROS
    #define __STDC_LIMIT_MACROS
    #include <libavformat/avformat.h>
}

int task_execute(AVCodecContext* s, std::function<int(void* arg, int arg_size, int jobnr, int threadnr)>&& func, void* arg, int* ret, int count, int size)
{   
    tbb::atomic<int> counter;
    counter = 0;

    // Execute s->thread_count number of tasks in parallel.
    tbb::parallel_for(0, s->thread_count, 1, [&](int threadnr) 
    {
        while(true)
        {
            int jobnr = counter++;
            if(jobnr >= count)
                break;

            int r = func(arg, size, jobnr, threadnr);
            if (ret)
                ret[jobnr] = r;
        }
    });

    return 0;
}

int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)
{
    return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
    {
        return func(s, reinterpret_cast<uint8_t*>(arg) + jobnr*size);
    }, arg, ret, count, size);
}

int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)
{
    return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
    {
        return func(s, arg, jobnr, threadnr);
    }, arg, ret, count, 0);
}

void thread_init(AVCodecContext* s)
{
    static const size_t MAX_THREADS = 16; // See mpegvideo.h
    static int dummy_opaque;

    s->active_thread_type = FF_THREAD_SLICE;
    s->thread_opaque      = &dummy_opaque; 
    s->execute            = thread_execute;
    s->execute2           = thread_execute2;
    s->thread_count       = MAX_THREADS; // We are using a task-scheduler, so use as many "threads/tasks" as possible.
}

void thread_free(AVCodecContext* s)
{
    s->thread_opaque = nullptr;
}

int tbb_avcodec_open(AVCodecContext* avctx, AVCodec* codec)
{
    avctx->thread_count = 1;
    if((codec->capabilities & CODEC_CAP_SLICE_THREADS) && (avctx->thread_type & FF_THREAD_SLICE))
        thread_init(avctx);
// ff_thread_init will not be executed since thread_opaque != nullptr || thread_count == 1.
    return avcodec_open(avctx, codec); 
}

int tbb_avcodec_close(AVCodecContext* avctx)
{
    thread_free(avctx);
    // ff_thread_free will not be executed since thread_opaque == nullptr.
    return avcodec_close(avctx); 
}
于 2011-05-19T13:35:35.720 回答
2

在这里重新发布我在 TBB 论坛上对您的回复,以供 SO 的任何人感兴趣。

您在上面答案中的代码对我来说看起来不错;在考虑到本机线程设计的上下文中使用 TBB 的巧妙方法。我想知道它是否可以变得更加TBBish,可以这么说。如果您有时间和愿望,我有一些想法可以尝试。

如果希望/需要控制线程数,则可能会感兴趣以下两项。

  • 在 thread_init 中,创建一个堆分配tbb::task_scheduler_init(TSI) 对象,并根据需要使用尽可能多的线程对其进行初始化(不需要 MAX_THREADS)。s->thread_opaque如果可能/允许,请保留此对象的地址;如果不是,一个可能的解决方案是映射AVCodecContext*到相应地址的全局映射task_scheduler_init
  • 对应在thread_free中,获取并移除TSI对象。

与上述无关,另一个潜在的变化是如何调用tbb::parallel_for. 而不是仅仅使用它来创建足够的线程,它不能用于它的直接目的,如下所示?

int task_execute(AVCodecContext* s,
                 std::function<int(void*, int, int, int)>&& f,
                 void* arg, int* ret, int count, int size)   
{      
    tbb::atomic<int> counter;   
    counter = 0;   

    // Execute 'count' number of tasks in parallel.   
    tbb::parallel_for(tbb::blocked_range<int>(0, count, 2),
                      [&](const tbb::blocked_range<int> &r)    
    {   
        int threadnr = counter++;   
        for(int jobnr=r.begin(); jobnr!=r.end(); ++jobnr)
        {   
            int r = func(arg, size, jobnr, threadnr);   
            if (ret)   
                ret[jobnr] = r;   
        }
        --counter;
    });   

    return 0;   
}

count如果显着大于,这可以更好地执行thread_count,因为 a) 更多的并行松弛意味着 TBB 更有效地工作(您显然知道),并且 b) 集中原子计数器的开销分布在更多的迭代中。blocked_range请注意,我为;选择了 2 的粒度。这是因为计数器在循环体内既递增又递减,因此每个任务至少需要两次迭代(相应地,count>=2*thread_count)才能“匹配”您的变体。

于 2011-05-24T11:28:48.720 回答