17

Throttle如果其他人跟随得太快,方法会跳过可观察序列中的值。但我需要一种方法来延迟它们。也就是说,我需要设置项目之间的最小延迟,而不跳过任何.

实际示例:有一个 Web 服务接受请求的速度不超过每秒一次;有一个用户可以添加请求,单个或批量。如果没有 Rx,我将创建一个列表和一个计时器。当用户添加请求时,我会将它们添加到列表中。在计时器事件中,我会检查列表是否为空。如果不是,我将发送请求并删除相应的项目。有锁和所有这些东西。现在,使用 Rx,我可以Subject在用户添加请求时创建、添加项目。但我需要一种方法来确保 Web 服务不会因应用延迟而被淹没。

我是 Rx 的新手,所以也许我遗漏了一些明显的东西。

4

6 回答 6

7

有一种相当简单的方法可以使用EventLoopScheduler.

我从一个 observable 开始,它将每 0 到 3 秒随机产生一次值。

var rnd = new Random();

var xs =
    Observable
        .Generate(
            0,
            x => x < 20,
            x => x + 1,
            x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));

现在,要立即生成此输出值,除非最后一个值在一秒钟内,否则我这样做了:

var ys =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return xs
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });

这有效地观察了源上的源EventLoopScheduler,然后在每次之后将其休眠 1 秒钟,OnNext以便它只能OnNext在唤醒后开始下一个。

我测试了它是否可以与以下代码一起使用:

ys
    .Timestamp()
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
    .Subscribe(x => Console.WriteLine(x));

我希望这有帮助。

于 2012-07-26T02:28:07.827 回答
5

一个简单的扩展方法怎么样:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

用法:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
于 2012-07-26T10:51:04.613 回答
2

我想建议一种使用方法Observable.Zip

// Incoming requests
var requests = new[] {1, 2, 3, 4, 5}.ToObservable();

// defines the frequency of the incoming requests
// This is the way to emulate flood of incoming requests.
// Which, by the way, uses the same approach that will be used in the solution
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;});
incomingRequests.Subscribe((number) =>
{
    Console.WriteLine($"Request received: {number}");
});

// This the minimum interval at which we want to process the incoming requests
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1));

// Zipping incoming requests with the interval
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;});

requestsToProcess.Subscribe((number) =>
{
    Console.WriteLine($"Request processed: {number}");
});
于 2017-06-18T20:51:20.830 回答
1

我正在玩这个,发现 .Zip (如前所述)是最简单的方法:

var stream = "ThisFastObservable".ToObservable();
var slowStream = 
    stream.Zip(
        Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay 
        (x, y) => x); // We just care about the original stream value (x), not the interval ticks (y)

slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}"));

输出:

T arrived after 00:00:01.0393840
h arrived after 00:00:00.9787150
i arrived after 00:00:01.0080400
s arrived after 00:00:00.9963000
F arrived after 00:00:01.0002530
a arrived after 00:00:01.0003770
s arrived after 00:00:00.9963710
t arrived after 00:00:01.0026450
O arrived after 00:00:00.9995360
b arrived after 00:00:01.0014620
s arrived after 00:00:00.9993100
e arrived after 00:00:00.9972710
r arrived after 00:00:01.0001240
v arrived after 00:00:01.0016600
a arrived after 00:00:00.9981140
b arrived after 00:00:01.0033980
l arrived after 00:00:00.9992570
e arrived after 00:00:01.0003520
于 2017-10-07T17:53:54.387 回答
-1

使用可观察计时器从阻塞队列中获取数据怎么样?下面的代码未经测试,但应该让您了解我的意思......

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ...

Observable
  .Timer(new TimeSpan(0,0,1), new EventLoopScheduler())
  .Do(i => myWebService.Send(workQueue.Take()));

// Then just add items to the queue using workQueue.Add(...)
于 2012-07-25T16:50:26.053 回答
-1
.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any())
.Subscribe(buffer => 
{
     foreach(var item in buffer) Console.WriteLine(item)
});
于 2013-05-01T19:08:00.013 回答