4

这可能是一个微不足道的问题,但我无法找到解决这个看似简单的任务的方法。由于我是 ReactiveSwift 和响应式编程的新手,我可能会错过一些明显的东西。

基本上我想做的是这样的:

signal.collect(timeInterval: .seconds(5))

我想从信号中收集特定时间段内的所有值。结果信号将每 x 秒产生一个事件,该事件将包含从第一个信号收集的事件数组。

在 ReactiveSwift 中执行此操作的最佳方法是什么?

4

2 回答 2

4

ReactiveSwift 中没有用于此任务的内置运算符。相反,您可以使用以下方法,编写扩展:

import Foundation
import ReactiveSwift
import Result
public extension Signal {
    public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
        let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
        onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
            pipe.1.sendCompleted()
        }
        return Signal { observer in
            return self.observe { event in
                switch event {
                case let .value(value):
                    observer.send(value: value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }
        }.take(until: pipe.0)
    }

    public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
        return self.completeAfter(after: until).collect()
    }
}

然后使用signal.collectUntil(5)方法。

另一种方法是使用timerReactiveSwift 中的函数。示例(添加到相同的扩展名,如上):

public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
    var signal: Signal<(), NoError>? = nil
    timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
        signal = innerSignal.map { _ in () }.take(first: 1)
    }
    return self.take(until: signal!).collect()
}

但是,我不喜欢这种方法,因为它是SignalProducer类型提取内部信号的伪装性质。

Signaltype 本身也有timeout功能,但是由于它会产生错误,因此很难使用它。如何使用它的示例(仍然,添加到相同的扩展名):

public func completeOnError() -> Signal<Value, Error> {
    return Signal { observer in
        return self.observe { event in
            switch(event) {
            case .value(let v): observer.send(value: v)
            case .failed(_): observer.sendCompleted()
            case .interrupted: observer.sendInterrupted()
            case .completed: observer.sendCompleted()
            }
        }
    }
}

public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
    return self
        .timeout(after: until,
                 raising: NSError() as! Error,
                 on: QueueScheduler())
        .completeOnError()
        .collect()
}

PS通过选择 3 个选项中的任何一个,请注意传递正确的调度程序或使用正确的调度程序参数化您的解决方案。

于 2017-03-12T11:03:03.627 回答
1

Based on the answer by Petro Korienev (which sadly wasn't quite what I was looking for), I've created an extension which solves my problem. The extension follows the structure of the ReactiveSwift collect functions, to stay as close as possible to the intents of ReactiveSwift.

It will collect all sent values over a given timeInterval and then send them as array. On a terminating event it will also send the remaining values, if there are any.

extension Signal {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
        return Signal<[Value], Error> { observer in
            var values: [Value] = []
            let sendAction: () -> Void = {
                observer.send(value: values)

                values.removeAll(keepingCapacity: true)
            }
            let disposable = CompositeDisposable()
            let scheduleDisposable = scheduler.schedule(
                    after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
                    interval: timeInterval,
                    action: sendAction
            )

            disposable += scheduleDisposable
            disposable += self.observe { (event: Event<Value, Error>) in
                if event.isTerminating {
                    if !values.isEmpty {
                        sendAction()
                    }

                    scheduleDisposable?.dispose()
                }

                switch event {
                case let .value(value):
                    values.append(value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }

            return disposable
        }
    }
}

extension SignalProducer {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
        return lift { (signal: ProducedSignal) in
            signal.collect(timeInterval: timeInterval, on: scheduler)
        }
    }
}

extension DispatchTimeInterval {
    var timeInterval: TimeInterval {
        switch self {
        case let .seconds(s):
            return TimeInterval(s)
        case let .milliseconds(ms):
            return TimeInterval(TimeInterval(ms) / 1000.0)
        case let .microseconds(us):
            return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
        case let .nanoseconds(ns):
            return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
        }
    }
}
于 2017-03-14T10:37:05.043 回答