0

我想使用 AsyncLocal 通过异步工作流传递信息以进行跟踪。现在我遇到了RX的问题。
Thios 是我的测试代码:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

public class RxTest
{
    private readonly Subject<int> test = new Subject<int>();

    private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();

    public void Test()
    {
        this.test
             // .ObserveOn(Scheduler.Default)
            .Subscribe(this.OnNextNormal);
        this.test
             // .ObserveOn(Scheduler.Default)
            .Delay(TimeSpan.FromMilliseconds(1))
            .Subscribe(this.OnNextDelayed);

        for (var i = 0; i < 2; i++)
        {
            var index = i;
            Task.Run(() =>
            {
                this.asyncContext.Value = index;
                Console.WriteLine(
                    $"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
                this.test.OnNext(index);
            });
        }

        Console.ReadKey();
    }

    private void OnNextNormal(int obj)
    {
        Console.WriteLine(
            $"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }

    private void OnNextDelayed(int obj)
    {
        Console.WriteLine(
            $"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }
}

输出是:

Main 0(线程:5):AsyncLocal.Value => 0
Main 1(线程:6):AsyncLocal.Value => 1
OnNextNormal 0(线程:5):AsyncLocal.Value => 0
OnNextNormal 1(线程:6): AsyncLocal.Value => 1
OnNextDelayed 0(线程:4):AsyncLocal.Value => 0
OnNextDelayed 1(线程:4):AsyncLocal.Value => 0

如您所见, AsyncLocal.Value 不会流向延迟订阅的方法。
=> AsyncValue 在延迟轨道上丢失

据我了解,普通的 Subscribe() 不使用调度程序,而 Delay() 使用调度程序。
当我对两个调用都使用 ObserveOn() 时,两者的输出如下

Main 0(线程:5):AsyncLocal.Value => 0
Main 1(线程:7):AsyncLocal.Value => 1
OnNextNormal 0(线程:9):AsyncLocal.Value => 0
OnNextNormal 1(线程:9): AsyncLocal.Value => 0
OnNextDelayed 0(线程:4):AsyncLocal.Value => 0
OnNextDelayed 1(线程:4):AsyncLocal.Value => 0

=> AsyncValue 在每条轨道上都会丢失

有没有办法让 ExecutionContext 与 RX 一起流动?
我只发现了这一点,但这是另一个问题。他们解决了观察者的上下文如何流动的问题。我想流动发布者的上下文。

我想要实现的是:

  1. 来自“外部”的消息为我服务
  2. 在服务内分发消息 (RX)
  3. 记录消息时,使用 MessageId 格式化日志消息
  4. 我不想到处传递信息

提前感谢您的回答。

4

1 回答 1

0

Rx 中自由流动的执行上下文使其非常适合大多数多线程场景。您可以通过绕过预定的方法来强制执行线程上下文,如下所示:

public static class Extensions
{
    public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
    {
        return Observable.Create<T>(
            observer => observable.Subscribe(
                onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
                onError: observer.OnError,
                onCompleted: observer.OnCompleted
            )
        );
    }
}

输出 :

OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

这确实会继承上下文,但对于较大的查询,它很快就会变得复杂。我不确定IScheduler在通知时实现一个保留上下文的方法是否能正常工作。如果消息复制不是太多开销,那可能最适合 Rx。

于 2018-07-16T19:18:54.637 回答