3

我需要使用 Opus 解码器尽快解码音频数据。

目前我的应用程序不够快。解码速度尽可能快,但我需要提高速度。

我需要解码大约 100 段音频。这些部分不是连续的(它们彼此不相关)。

我正在考虑使用多线程,这样我就不必等到 100 个解码之一完成。在我的梦想中,我可以并行开始所有事情。我以前没有使用过多线程。

因此,我想问一下我的方法是否总体上没问题,或者某个地方是否存在思维错误。

谢谢你。

4

6 回答 6

3

你会按任务分解你的工作。让我们假设您的进程实际上是 CPU 绑定的(您表示它是但是……它通常不是那么简单)。

现在,您解码了 100 个部分:

我正在考虑使用多线程,这样我就不必等到 100 个解码之一完成。在我的梦想中,我可以并行开始所有事情。

实际上,您应该使用接近机器上核心数的数字。

假设一个现代桌面(例如 2-8 个内核),一次运行 100 个线程只会减慢它的速度;内核会浪费大量时间从一个线程切换到另一个线程,并且该进程也可能会使用更高的峰值资源并争夺相似的资源。

因此,只需创建一个任务池,将活动任务的数量限制为核心数量。每个任务将(通常)表示为一个输入文件(部分)执行的解码工作。这样,解码过程实际上并没有跨多个线程共享数据(允许您避免锁定和其他资源争用)。

完成后,返回并微调任务池中的进程数量(例如,在多台机器上使用完全相同的输入和秒表)。最快的可能低于或高于核心数(很可能是因为磁盘 I/O)。它也有助于分析。

因此,我想问一下我的方法是否总体上没问题,或者某个地方是否存在思维错误。

是的,如果问题是受 CPU 限制的,那通常没问题。这还假设您的解码器/相关软件能够以多线程运行。

如果这些是磁盘上的文件,您将意识到的问题是,您可能需要优化从多个内核读取(和写入?)文件的方式。因此,允许它同时运行 8 个作业可能会使您的问题成为磁盘绑定的问题——而同时 8 个读取器/写入器是使用硬盘的一种不好的方式,因此您可能会发现它并没有您预期的那么快。因此,您可能需要针对并发解码实现优化 I/O。在这方面,使用更大的缓冲区大小,但这是以内存为代价的。

于 2013-07-03T15:26:29.120 回答
3

这个答案可能需要社区的一些改进,因为我已经很长时间没有在这种环境中工作了,但这是一个开始 -

由于您不熟悉 C++ 中的多线程,因此请从一个简单的项目开始,以创建一组执行简单任务的 pthread。

这是一个创建 pthread 的快速小示例:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void* ThreadStart(void* arg);

int main( int count, char** argv) {
        pthread_t thread1, thread2;

        int* threadArg1 = (int*)malloc(sizeof(int));
        int* threadArg2 = (int*)malloc(sizeof(int));

        *threadArg1 = 1;
        *threadArg2 = 2;

        pthread_create(&thread1, NULL, &ThreadStart, (void*)threadArg1 );
        pthread_create(&thread2, NULL, &ThreadStart, (void*)threadArg2 );

        pthread_join(thread1, NULL);
        pthread_join(thread2, NULL);
        free(threadArg1);
        free(threadArg2);
}

void* ThreadStart(void* arg) {

        int threadNum = *((int*)arg);
        printf("hello world from thread %d\n", threadNum);

        return NULL;
}

接下来,您将使用多个作品解码器。Opus 似乎是线程安全的,只要您为每个线程创建单独的 OpusDecoder 对象。

要将作业提供给您的线程,您需要一个可以以线程安全方式访问的待处理工作单元的列表。您可以使用std::vectoror std::queue,但在添加和删除时必须在其周围使用锁,并且您需要使用计数信号量,以便线程将阻塞,但保持活动状态,同时您慢慢添加工作单元到队列(例如,从磁盘读取的文件缓冲区)。

下面是一些与上面类似的示例代码,展示了如何使用共享队列,以及如何在填充队列时让线程等待:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <unistd.h>

void* ThreadStart(void* arg);

static std::queue<int> workunits;
static pthread_mutex_t workunitLock;
static sem_t workunitCount;

int main( int count, char** argv) {
    pthread_t thread1, thread2;

    pthread_mutex_init(&workunitLock, NULL);
    sem_init(&workunitCount, 0, 0);

    pthread_create(&thread1, NULL, &ThreadStart, NULL);
    pthread_create(&thread2, NULL, &ThreadStart, NULL);

    // Make a bunch of workunits while the threads are running.
    for (int i = 0; i < 200; i++ ){
        pthread_mutex_lock(&workunitLock);

        workunits.push(i);
        sem_post(&workunitCount);

        pthread_mutex_unlock(&workunitLock);

        // Pretend that it takes some effort to create work units;
        // this shows that the threads really do block patiently
        // while we generate workunits.
        usleep(5000);
    }

    // Sometime in the next while, the threads will be blocked on
    // sem_wait because they're waiting for more workunits. None
    // of them are quitting because they never saw an empty queue.
    // Pump the semaphore once for each thread so they can wake
    // up, see the empty queue, and return.

    sem_post(&workunitCount);
    sem_post(&workunitCount);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    pthread_mutex_destroy(&workunitLock);
    sem_destroy(&workunitCount);

}

void* ThreadStart(void* arg) {

    int workUnit;
    bool haveUnit;

    do{
        sem_wait(&workunitCount);

        pthread_mutex_lock(&workunitLock);

        // Figure out if there's a unit, grab it under
        // the lock, then release the lock as soon as we can.
        // After we release the lock, then we can 'process'
        // the unit without blocking everybody else.
        haveUnit = !workunits.empty();

        if ( haveUnit ) {
            workUnit = workunits.front();
            workunits.pop();
        }
        pthread_mutex_unlock(&workunitLock);

        // Now that we're not under the lock, we can spend
        // as much time as we want processing the workunit.
        if ( haveUnit ) {
            printf("Got workunit %d\n", workUnit);
        }
    } 
    while(haveUnit);

    return NULL;
}
于 2013-07-03T16:05:35.393 回答
2

我建议您使用线程池并将解码任务交给池,而不是创建自己的线程并管理它们。池将任务分配给它和系统可以处理的尽可能多的线程。虽然有不同类型的线程池,但您可以设置一些参数,例如强制它使用特定数量的线程,或者是否应该允许池继续增加线程数。

要记住的一件事是,更多的线程并不意味着它们可以并行执行。我认为正确的术语是并发,除非您保证每个线程都在不同的 CPU 上运行(这将提供真正的并行性)

如果 IO 被阻止,您的整个池可能会停止。

于 2013-07-03T15:56:20.443 回答
0

在跳入多线程作为加速事物的解决方案之前,研究一下 Oversubscribing 和 under Subscribing 的概念。

如果 Audio 的处理涉及 .long 阻塞调用 IO ,那么多线程是值得的。

于 2013-07-01T05:44:42.180 回答
0

通常可以使用线程,但锁定存在一些问题。我将围绕 POSIX 线程和锁给出答案,但这是相当笼统的,您可以将这个想法移植到任何平台。但是,如果您的工作需要任何类型的锁定,您可能会发现以下内容很有用。此外,最好一次又一次地继续使用现有线程,因为创建线程的成本很高(请参阅线程池)。

对于“实时”音频,锁定通常是一个坏主意,因为它会增加延迟,但对于解码/编码的实时作业来说,它们是完全可以的,即使对于实时作业,您也可以通过使用一些线程来获得更好的性能并且不会丢帧知识。

对于音频,信号量是一个坏主意。当我尝试时,它们至少在我的系统(POSIX 信号量)上太慢了,但是如果您正在考虑跨线程锁定(不是锁定在一个线程中并在同一个线程中解锁的锁定类型),您将需要它们。POSIX 互斥锁只允许自锁和自解锁(您必须在同一个线程中同时执行),否则程序可能会工作,但它是未定义的行为,应该避免。

大多数无锁原子操作可能会让您从锁中获得足够的自由来使用某些功能(如锁定),但性能更好。

于 2013-07-06T08:35:14.640 回答
0

虽然你的问题含糊不清并没有真正帮助......怎么样:

Create a list of audio files to convert.

While there is a free processor, 
   launch the decoder application with the next file in the queue.   
Repeat until there is nothing else in the list

如果在测试期间您发现处理器并不总是 100% 忙碌,则每个处理器启动 2 次解码。

用一点 bash/tcl/python 就可以很容易地完成它。

于 2013-07-03T15:24:28.623 回答