1

我在 GCD 中实现了这个读写器锁,但在并行测试中失败了。我可以解释为什么它失败了吗?

这是用于 iOS 开发的。该代码基于Objective C。我在GCD中编写了一个带有读/写锁的RWCache,用于数据保护。

@interface RWCache : NSObject

- (void)setObject:(id)object forKey:(id <NSCopying>)key;

- (id)objectForKey:(id <NSCopying>)key;

@end

@interface RWCache()
@property (nonatomic, strong) NSMutableDictionary *memoryStorage;
@property (nonatomic, strong) dispatch_queue_t storageQueue;
@end

@implementation RWCache

- (instancetype)init {
    self = [super init];
    if (self) {
        _memoryStorage = [NSMutableDictionary new];
        _storageQueue = dispatch_queue_create("Storage Queue", DISPATCH_QUEUE_CONCURRENT);
    }
    return self;
}

- (void)setObject:(id)object forKey:(id <NSCopying>)key {
    dispatch_barrier_async(self.storageQueue, ^{
        self.memoryStorage[key] = object;
    });
}

- (id)objectForKey:(id <NSCopying>)key {
    __block id object = nil;
    dispatch_sync(self.storageQueue, ^{
        object = self.memoryStorage[key];
    });
    return object;
}

@end


int main(int argc, const char * argv[]) {
    @autoreleasepool {
        RWCache *cache = [RWCache new];
        dispatch_queue_t testQueue = dispatch_queue_create("Test Queue", DISPATCH_QUEUE_CONCURRENT);
        dispatch_group_t group = dispatch_group_create();
        for (int i = 0; i < 100; i++) {
            dispatch_group_async(group, testQueue, ^{
                [cache setObject:@(i) forKey:@(i)];
            });
            dispatch_group_async(group, testQueue, ^{
                [cache objectForKey:@(i)];
            });
        }
        dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
    }
    return 0;
}

如果没有死锁,程序会退出0,否则程序会挂起不退出。

4

1 回答 1

1

问题不在于读取器/写入器模式本身,而在于此代码中的一般线程爆炸。请参阅 WWDC 2015 视频Building Responsive and Efficient Apps with GCD中的“线程爆炸导致死锁”讨论。WWDC 2016 Concurrent Programming With GCD in Swift 3也是一个不错的视频。在这两个链接中,我会将您放在视频的相关部分,但都值得完整观看。

最重要的是,您正在耗尽 GCD 线程池中数量非常有限的工作线程。只有 64 个。但是您有 100 个带屏障的写入,这意味着在该队列上的所有其他内容完成之前,调度的块无法运行。它们散布着 100 次读取,因为它们是同步的,所以会阻塞您从中分派它的工作线程,直到它返回。

让我们将其简化为可以显示问题的更简单的东西:

dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 100; i++) {
    dispatch_async(queue2, ^{
        dispatch_barrier_async(queue1, ^{
            NSLog(@"barrier async %d", i);
        });
        dispatch_sync(queue1, ^{
            NSLog(@"sync %d", i);
        });
    });
}
NSLog(@"done dispatching all blocks to queue1");

这会产生类似的东西:

开始
完成将所有块分派到 queue1
屏障异步 0
同步 0

它陷入僵局。

但是,如果我们将其限制为不超过 30 个项目可以同时在 上运行queue2,那么问题就消失了:

dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_semaphore_t semaphore = dispatch_semaphore_create(30);

for (int i = 0; i < 100; i++) {
    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
    dispatch_async(queue2, ^{
        dispatch_barrier_async(queue1, ^{
            NSLog(@"barrier async %d", i);
        });
        dispatch_sync(queue1, ^{
            NSLog(@"sync %d", i);
        });
        dispatch_semaphore_signal(semaphore);
    });
}
NSLog(@"done dispatching all blocks to queue1");

或者,另一种方法是使用dispatch_apply,它实际上是一个并行for循环,但将任何给定时刻的并发任务数量限制为机器上的内核数量(使我们远低于耗尽工作线程的阈值):

dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);

dispatch_apply(100, queue2, ^(size_t i) {
    dispatch_barrier_async(queue1, ^{
        NSLog(@"barrier async %ld", i);
    });
    dispatch_sync(queue1, ^{
        NSLog(@"sync %ld", i);
    });
});
NSLog(@"done dispatching all blocks to queue1");
于 2019-08-02T06:35:10.000 回答