您的情况比Apple 文档中概述的情况要复杂一些,但是阅读它会有所帮助(如果您在阅读后仍然说“嗯?”,请阅读这个SO 答案)以了解预期的模式. 简而言之,一般的想法是生产者“驱动”链,操作系统中的 GCD 钩子帮助它确保根据内核中各种事物的状态适当地分派事物。
与您的问题有关的这种方法的问题在于,让生产者端在这里驱动事情并不简单,因为您的消费者是由垂直消隐回调实时驱动的,而不是纯粹由消耗性资源的可用性驱动. 由于工作流程固有的串行性质,这种情况变得更加复杂——例如,即使理论上您可以将帧数据解码为图像的并行化,图像仍然必须串行传递到管道中的下一个阶段,即GCD API 在流式处理的情况下不能很好处理的情况(即,如果您可以一次将所有内容都保存在内存中,这将很容易dispatch_apply
,但切入了问题的核心:您需要在准流式处理中发生这种情况语境。)
在尝试考虑如何处理这个问题时,我想出了以下示例,该示例尝试通过使用文本文件来模拟您的情况,其中文件中的每一行都是视频中的“帧”,并且“交叉淡入淡出”这两者通过附加字符串来剪辑。这里有一个完整的、工作的(至少对我来说)版本。此代码旨在说明如何仅使用 GCD 原语并使用(主要)生产者驱动的模式,同时仍与CVDisplayLink
基于 - 的消费者连接,来构建这样的处理管道。
它不是万无一失的(即,除许多其他事项外,它不能容忍文件中的帧数少于重叠所需的帧数)并且可能完全无法满足您的实时或内存使用边界要求(这对我来说很难复制和测试而不做比我愿意做的更多的工作。:)) 它也没有尝试解决我上面提到的问题,您可能能够并行化需要在下一次之前重新序列化的工作负载管道阶段。(代码也假设为 ARC。)有了所有这些警告,希望这里仍然有一些有趣/相关的想法给你。这是代码:
static void DieOnError(int error);
static NSString* NSStringFromDispatchData(dispatch_data_t data);
static dispatch_data_t FrameDataFromAccumulator(dispatch_data_t* accumulator);
static CVReturn MyDisplayLinkCallback(CVDisplayLinkRef displayLink, const CVTimeStamp* now, const CVTimeStamp* outputTime, CVOptionFlags flagsIn, CVOptionFlags* flagsOut, void* displayLinkContext);
static const NSUInteger kFramesToOverlap = 15;
@implementation SOAppDelegate
{
// Display link state
CVDisplayLinkRef mDisplayLink;
// State for our file reading process -- protected via mFrameReadQueue
dispatch_queue_t mFrameReadQueue;
NSUInteger mFileIndex; // keep track of what file we're reading
dispatch_io_t mReadingChannel; // channel for reading
dispatch_data_t mFrameReadAccumulator; // keep track of left-over data across read operations
// State for processing raw frame data delivered by the read process - protected via mFrameDataProcessingQueue
dispatch_queue_t mFrameDataProcessingQueue;
NSMutableArray* mFilesForOverlapping;
NSMutableArray* mFrameArraysForOverlapping;
// State for blending frames (or passing them through)
dispatch_queue_t mFrameBlendingQueue;
// Delivery state
dispatch_queue_t mFrameDeliveryQueue; // Is suspended/resumed to deliver one frame at a time
dispatch_queue_t mFrameDeliveryStateQueue; // Protects access to the iVars
dispatch_data_t mDeliveredFrame; // Data of the frame that has been delivered, but not yet picked up by the CVDisplayLink
NSInteger mLastFrameDelivered; // Counter of frames delivered
NSInteger mLastFrameDisplayed; // Counter of frames displayed
}
- (void)applicationDidFinishLaunching:(NSNotification *)aNotification
{
mFileIndex = 1;
mLastFrameDelivered = -1;
mLastFrameDisplayed = -1;
mFrameReadQueue = dispatch_queue_create("mFrameReadQueue", DISPATCH_QUEUE_SERIAL);
mFrameDataProcessingQueue = dispatch_queue_create("mFrameDataProcessingQueue", DISPATCH_QUEUE_SERIAL);
mFrameBlendingQueue = dispatch_queue_create("mFrameBlendingQueue", DISPATCH_QUEUE_SERIAL);
mFrameDeliveryQueue = dispatch_queue_create("mFrameDeliveryQueue", DISPATCH_QUEUE_SERIAL);
mFrameDeliveryStateQueue = dispatch_queue_create("mFrameDeliveryStateQueue", DISPATCH_QUEUE_SERIAL);
CVDisplayLinkCreateWithActiveCGDisplays(&mDisplayLink);
CVDisplayLinkSetOutputCallback(mDisplayLink, &MyDisplayLinkCallback, (__bridge void*)self);
[self readNextFile];
}
- (void)dealloc
{
if (mDisplayLink)
{
if (CVDisplayLinkIsRunning(mDisplayLink))
{
CVDisplayLinkStop(mDisplayLink);
}
CVDisplayLinkRelease(mDisplayLink);
}
}
- (void)readNextFile
{
dispatch_async (mFrameReadQueue, ^{
NSURL* url = [[NSBundle mainBundle] URLForResource: [NSString stringWithFormat: @"File%lu", mFileIndex++] withExtension: @"txt"];
if (!url)
return;
if (mReadingChannel)
{
dispatch_io_close(mReadingChannel, DISPATCH_IO_STOP);
mReadingChannel = nil;
}
// We don't care what queue the cleanup handler gets called on, because we know there's only ever one file being read at a time
mReadingChannel = dispatch_io_create_with_path(DISPATCH_IO_STREAM, [[url path] fileSystemRepresentation], O_RDONLY|O_NONBLOCK, 0, mFrameReadQueue, ^(int error) {
DieOnError(error);
mReadingChannel = nil;
// Start the next file
[self readNextFile];
});
// We don't care what queue the read handlers get called on, because we know they're inherently serial
dispatch_io_read(mReadingChannel, 0, SIZE_MAX, mFrameReadQueue, ^(bool done, dispatch_data_t data, int error) {
DieOnError(error);
// Grab frames
dispatch_data_t localAccumulator = mFrameReadAccumulator ? dispatch_data_create_concat(mFrameReadAccumulator, data) : data;
dispatch_data_t frameData = nil;
do
{
frameData = FrameDataFromAccumulator(&localAccumulator);
mFrameReadAccumulator = localAccumulator;
[self processFrameData: frameData fromFile: url];
} while (frameData);
if (done)
{
dispatch_io_close(mReadingChannel, DISPATCH_IO_STOP);
}
});
});
}
- (void)processFrameData: (dispatch_data_t)frameData fromFile: (NSURL*)file
{
if (!frameData || !file)
return;
// We want the data blobs constituting each frame to be processed serially
dispatch_async(mFrameDataProcessingQueue, ^{
mFilesForOverlapping = mFilesForOverlapping ?: [NSMutableArray array];
mFrameArraysForOverlapping = mFrameArraysForOverlapping ?: [NSMutableArray array];
NSMutableArray* arrayToAddTo = nil;
if ([file isEqual: mFilesForOverlapping.lastObject])
{
arrayToAddTo = mFrameArraysForOverlapping.lastObject;
}
else
{
arrayToAddTo = [NSMutableArray array];
[mFilesForOverlapping addObject: file];
[mFrameArraysForOverlapping addObject: arrayToAddTo];
}
[arrayToAddTo addObject: frameData];
// We've gotten to file two, and we have enough frames to process the overlap
if (mFrameArraysForOverlapping.count == 2 && [mFrameArraysForOverlapping[1] count] >= kFramesToOverlap)
{
NSMutableArray* fileOneFrames = mFrameArraysForOverlapping[0];
NSMutableArray* fileTwoFrames = mFrameArraysForOverlapping[1];
for (NSUInteger i = 0; i < kFramesToOverlap; ++i)
{
[self blendOneFrame:fileOneFrames[0] withOtherFrame: fileTwoFrames[0]];
[fileOneFrames removeObjectAtIndex:0];
[fileTwoFrames removeObjectAtIndex:0];
}
[mFilesForOverlapping removeObjectAtIndex: 0];
[mFrameArraysForOverlapping removeObjectAtIndex: 0];
}
// We're pulling in frames from file 1, haven't gotten to file 2 yet, have more than enough to overlap
while (mFrameArraysForOverlapping.count == 1 && [mFrameArraysForOverlapping[0] count] > kFramesToOverlap)
{
NSMutableArray* frameArray = mFrameArraysForOverlapping[0];
dispatch_data_t first = frameArray[0];
[mFrameArraysForOverlapping[0] removeObjectAtIndex: 0];
[self blendOneFrame: first withOtherFrame: nil];
}
});
}
- (void)blendOneFrame: (dispatch_data_t)frameA withOtherFrame: (dispatch_data_t)frameB
{
dispatch_async(mFrameBlendingQueue, ^{
NSString* blendedFrame = [NSString stringWithFormat: @"%@%@", [NSStringFromDispatchData(frameA) stringByReplacingOccurrencesOfString: @"\n" withString:@""], NSStringFromDispatchData(frameB)];
dispatch_data_t blendedFrameData = dispatch_data_create(blendedFrame.UTF8String, blendedFrame.length, NULL, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
[self deliverFrameForDisplay: blendedFrameData];
});
}
- (void)deliverFrameForDisplay: (dispatch_data_t)frame
{
// By suspending the queue from within the block, and by virtue of this being a serial queue, we guarantee that
// only one task will get called for each call to dispatch_resume on the queue...
dispatch_async(mFrameDeliveryQueue, ^{
dispatch_suspend(mFrameDeliveryQueue);
dispatch_sync(mFrameDeliveryStateQueue, ^{
mLastFrameDelivered++;
mDeliveredFrame = frame;
});
if (!CVDisplayLinkIsRunning(mDisplayLink))
{
CVDisplayLinkStart(mDisplayLink);
}
});
}
- (dispatch_data_t)getFrameForDisplay
{
__block dispatch_data_t frameData = nil;
dispatch_sync(mFrameDeliveryStateQueue, ^{
if (mLastFrameDelivered > mLastFrameDisplayed)
{
frameData = mDeliveredFrame;
mDeliveredFrame = nil;
mLastFrameDisplayed = mLastFrameDelivered;
}
});
// At this point, I've either got the next frame or I dont...
// resume the delivery queue so it will deliver the next frame
if (frameData)
{
dispatch_resume(mFrameDeliveryQueue);
}
return frameData;
}
@end
static void DieOnError(int error)
{
if (error)
{
NSLog(@"Error in %s: %s", __PRETTY_FUNCTION__, strerror(error));
exit(error);
}
}
static NSString* NSStringFromDispatchData(dispatch_data_t data)
{
if (!data || !dispatch_data_get_size(data))
return @"";
const char* buf = NULL;
size_t size = 0;
dispatch_data_t notUsed = dispatch_data_create_map(data, (const void**)&buf, &size);
#pragma unused(notUsed)
NSString* str = [[NSString alloc] initWithBytes: buf length: size encoding: NSUTF8StringEncoding];
return str;
}
// Peel off a frame if there is one, and put the left-overs back.
static dispatch_data_t FrameDataFromAccumulator(dispatch_data_t* accumulator)
{
__block dispatch_data_t frameData = dispatch_data_create(NULL, 0, NULL, NULL); // empty
__block dispatch_data_t leftOver = dispatch_data_create(NULL, 0, NULL, NULL); // empty
__block BOOL didFindFrame = NO;
dispatch_data_apply(*accumulator, ^bool(dispatch_data_t region, size_t offset, const void *buffer, size_t size) {
ssize_t newline = -1;
for (size_t i = 0; !didFindFrame && i < size; ++i)
{
if (((const char *)buffer)[i] == '\n')
{
newline = i;
break;
}
}
if (newline == -1)
{
if (!didFindFrame)
{
frameData = dispatch_data_create_concat(frameData, region);
}
else
{
leftOver = dispatch_data_create_concat(leftOver, region);
}
}
else if (newline >= 0)
{
didFindFrame = YES;
frameData = dispatch_data_create_concat(frameData, dispatch_data_create_subrange(region, 0, newline + 1));
leftOver = dispatch_data_create_concat(leftOver, dispatch_data_create_subrange(region, newline + 1, size - newline - 1));
}
return true;
});
*accumulator = leftOver;
return didFindFrame ? frameData : nil;
}
static CVReturn MyDisplayLinkCallback(CVDisplayLinkRef displayLink, const CVTimeStamp* now, const CVTimeStamp* outputTime, CVOptionFlags flagsIn, CVOptionFlags* flagsOut, void* displayLinkContext)
{
SOAppDelegate* self = (__bridge SOAppDelegate*)displayLinkContext;
dispatch_data_t frameData = [self getFrameForDisplay];
NSString* dataAsString = NSStringFromDispatchData(frameData);
if (dataAsString.length == 0)
{
NSLog(@"Dropped frame...");
}
else
{
NSLog(@"Drawing frame in CVDisplayLink. Contents: %@", dataAsString);
}
return kCVReturnSuccess;
}
理论上,GCD 应该为你平衡这些队列。例如,如果允许“生产者”队列继续进行导致内存使用量上升,GCD 将(理论上)开始让其他队列离开,并阻止生产者队列。在实践中,这种机制对我们来说是不透明的,所以谁知道它在现实环境下对你的效果如何,尤其是在你的实时限制面前。
如果这里有任何具体的事情不清楚,请发表评论,我会尽量详细说明。