4

当在 DISPATCH_QUEUE_PRIORITY_DEFAULT gcd 队列上运行的 dispatch_async 块中时:我创建两个 RACSubject 对象,使用 RACSignal 合并:然后订阅完成。然后,出于此测试的目的(并在我的实际代码中复制该场景),我对它们都发送了 sendComplete。合并的信号完成订阅永远不会触发。我独立地为主题附加了两个完成订阅,它们确实会触发。如果我在主线程而不是 gcd 队列上进行相同的测试,那么它会按预期工作。

有没有办法完成这项工作,或者我是否必须重构才能将所有主题都放在主线程上?

#import <ReactiveCocoa/ReactiveCocoa.h>

@interface rac_signal_testTests: SenTestCase
@end

@implementation rac_signal_testTests

- (void)setUp
{
    [super setUp];

    // Set-up code here.
}

- (void)tearDown
{
    // Tear-down code here.

    [super tearDown];
}

-(void)test_merged_subjects_will_complete_on_main_thread{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    __block BOOL completed_fired = NO;

    [merged subscribeCompleted:^{
        completed_fired = YES;
    }];

    [subject1 sendNext:@"1"];
    [subject2 sendNext:@"2"];

    [subject1 sendCompleted];
    [subject2 sendCompleted];

    STAssertTrue(completed_fired, nil);
}

//test proving that throttling isn't breaking the merged signal (initial hypothesis).
-(void)test_merged_subjects_will_complete_if_one_of_them_has_a_throttled_subscriber_on_main_thread{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    __block NSString * hit_subject2_next = nil;
    [[subject2 throttle:.5] subscribeNext:^(NSString *value){
        hit_subject2_next = value;
    }];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    __block BOOL completed_fired = NO;

    [merged subscribeCompleted:^{
        completed_fired = YES;
    }];

    [subject2 sendNext:@"2"];
    [subject2 sendCompleted];
    [subject1 sendCompleted];
    STAssertEqualObjects(@"2", hit_subject2_next, nil);
    STAssertTrue(completed_fired, nil);
}

-(void)test_merged_subjects_will_complete_if_on_gcd_queue{
    __block BOOL complete = NO;

    dispatch_queue_t global_default_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

    dispatch_async(global_default_queue, ^{
        RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
        RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

        __block NSString * hit_subject2_next = nil;

        RACScheduler *global_default_scheduler = [RACScheduler schedulerWithQueue:global_default_queue name:@"com.test.global_default"];

        RACSignal *sig1 = [subject1 deliverOn:RACScheduler.mainThreadScheduler];
        RACSignal *sig2 = [subject2 deliverOn:RACScheduler.mainThreadScheduler];

        [sig2    subscribeNext:^(NSString *value){
            hit_subject2_next = value;
        }];

        [sig2 subscribeCompleted:^{
            NSLog(@"hit sig2 complete");
        }];

        [sig1 subscribeCompleted:^{
            NSLog(@"hit sig1 complete");
        }];

        RACSignal *merged = [[RACSignal merge:@[sig1, sig2]] deliverOn:RACScheduler.mainThreadScheduler];

        [merged subscribeCompleted:^{
            complete = YES;
        }];

        [subject2 sendNext:@"2"];
//        if we dispatch the send complete calls to the main queue then this code works but that seems like it shoul be unnecessary.
//        dispatch_async(dispatch_get_main_queue(), ^{
            [subject1 sendCompleted];
            [subject2 sendCompleted];
//        });
    });

    NSDate *startTime = NSDate.date;
    do{
        [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:.5]];
    }while(!complete && [NSDate.date timeIntervalSinceDate:startTime] <= 10.0);

    STAssertTrue(complete, nil);
}

@end
4

1 回答 1

8

所以这是一个相当糟糕的案例,是由 GCD 和 RAC 的交互引起的。严格来说,没有bug。但这令人惊讶和奇怪。我们在https://github.com/ReactiveCocoa/ReactiveCocoa/blob/1bd47736f306befab64859602dbdea18f7f9a3f6/Documentation/DesignGuidelines.md#subscription-will-always-occur-on-a-scheduler的设计指南中讨论了这个要求。

关键是订阅必须始终发生在已知的调度程序上。这是 RAC 在内部强制执行的要求。如果您只是使用普通的旧 GCD,则没有已知的调度程序,因此 RAC 必须将订阅异步发送到调度程序。

所以去你的测试:

[merged subscribeCompleted:^{
    complete = YES;
}];

实际订阅是异步发生的,因为没有已知的调度程序。订阅最终发生通话之后,-sendCompleted并且完全错过了它们。这确实是一种竞争条件,但实际上你可能永远不会看到它成功。

解决方法是尽可能使用RACSchedulers 而不是 GCD。如果需要使用特定的 GCD 队列,可以使用RACTargetQueueScheduler. 例如,您的测试的有效简化版本:

-(void)test_merged_subjects_will_complete_if_on_gcd_queue{
    __block BOOL complete = NO;

    dispatch_queue_t global_default_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

    RACScheduler *scheduler = [[RACTargetQueueScheduler alloc] initWithName:@"testScheduler" targetQueue:global_default_queue];
    [scheduler schedule:^{
        RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
        RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

        RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

        [merged subscribeCompleted:^{
            complete = YES;
        }];

        [subject1 sendCompleted];
        [subject2 sendCompleted];
    }];

    NSDate *startTime = NSDate.date;
    do{
        [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:.5]];
    }while(!complete && [NSDate.date timeIntervalSinceDate:startTime] <= 10.0);

    STAssertTrue(complete, nil);
}

由于订阅发生在调度程序中,因此subscribeCompleted:同步完成,获取已完成的事件,并且一切都按照您的预期进行。

如果您不需要使用特定的 GCD 队列并且只想在非主队列上完成,那么请执行以下操作:

[[RACScheduler scheduler] schedule:^{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    [merged subscribeCompleted:^{
        complete = YES;
    }];

    [subject1 sendCompleted];
    [subject2 sendCompleted];
}];

我希望这能澄清你所看到的。让我知道是否需要重新措辞。

于 2013-08-01T22:52:56.147 回答