10

我正在处理一些通过 websockets 与远程 API 交互的代码。我的数据层负责建立和监控 websocket 连接。它还包含应用程序可用于将要发送的 websocket 消息排入队列的方法。应用程序代码不应负责检查 websocket 连接的状态,即即发即弃。

理想情况下,我希望数据层的功能如下:

  • 当数据层没有与 websocket 端点 ( self.isConnected == NO) 的连接时,消息会在内部缓冲。
  • 当连接可用时 ( self.isConnected == YES),缓冲的消息会立即发送,并且任何后续消息都会立即发送。

这是我能想到的:

#import "RACSignal+Buffering.h"

@implementation RACSignal (Buffering)

- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer
{
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {

        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        NSMutableArray* bufferedValues = [[NSMutableArray alloc] init];
        __block BOOL buffering = NO;

        void (^bufferHandler)() = ^{
            if (!buffering)
            {
                for (id val in bufferedValues)
                {
                    [subscriber sendNext:val];
                }

                [bufferedValues removeAllObjects];
            }
        };

        RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) {

            buffering = shouldBuffer.boolValue;
            bufferHandler();

        }];

        if (bufferDisposable)
        {
            [disposable addDisposable:bufferDisposable];
        }

        RACDisposable* valueDisposable = [self subscribeNext:^(id x) {

            [bufferedValues addObject:x];
            bufferHandler();

        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];

        if (valueDisposable)
        {
            [disposable addDisposable:valueDisposable];
        }

        return disposable;
    }];
}

@end

最后,这是如何使用它的伪代码:

@interface APIManager ()

@property (nonatomic) RACSubject* requests;

@end

@implementation WebsocketDataLayer

- (id)init
{
    self = [super init];

    if (self) {

        RACSignal* connectedSignal = RACObserve(self, connected);

        self.requests = [[RACSubject alloc] init];

        RACSignal* bufferedApiRequests = [self.requests bufferWithSignal:connectedSignal];

        [self rac_liftSelector:@selector(sendRequest:) withSignalsFromArray:@[bufferedApiRequests]];
    }
    return self;
}

- (void)enqueueRequest:(NSString*)request
{
    [self.requests sendNext:request];
}

- (void)sendRequest:(NSString*)request
{
    DebugLog(@"Making websocket request: %@", request);
}

@end

我的问题是:这是缓冲值的正确方法吗?有没有更惯用的 RAC 方式来处理这个问题?

4

2 回答 2

11

缓冲可以被认为是适用于单个请求的东西,这导致使用-flattenMap:and的自然实现RACObserve

@weakify(self);
RACSignal *bufferedRequests = [self.requests flattenMap:^(NSString *request) {
    @strongify(self);

    // Waits for self.connected to be YES, or checks that it already is,
    // then forwards the request.
    return [[[[RACObserve(self, connected)
        ignore:@NO]
        take:1]
        // Replace the property value with our request.
        mapReplace:request];
}];

如果订购很重要,您可以替换-flattenMap:-map:plus -concat。这些实现避免了对任何自定义运算符的需要,并且无需手动订阅(众所周知的混乱)即可工作。

于 2013-10-23T20:58:51.437 回答
0

您所做的与操作中实现的几乎完全相同bufferWithTime:,我想不出任何现有的操作可以更惯用地实现它。(可能这就是以这种方式实现的原因bufferWithTime。)使用该实现检查您的代码可能会发现一些您没有想到的错误。

但老实说,这不应该那么难。应该存在一个缓冲操作,在触发信号触发时缓冲输出并喷出内容。可能大多数缓冲都可以根据此功能实现,因此拥有它会为框架增加价值。

于 2013-10-23T16:33:41.543 回答