我想使用 AsyncLocal 通过异步工作流传递信息以进行跟踪。现在我遇到了RX的问题。
Thios 是我的测试代码:
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
public class RxTest
{
private readonly Subject<int> test = new Subject<int>();
private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();
public void Test()
{
this.test
// .ObserveOn(Scheduler.Default)
.Subscribe(this.OnNextNormal);
this.test
// .ObserveOn(Scheduler.Default)
.Delay(TimeSpan.FromMilliseconds(1))
.Subscribe(this.OnNextDelayed);
for (var i = 0; i < 2; i++)
{
var index = i;
Task.Run(() =>
{
this.asyncContext.Value = index;
Console.WriteLine(
$"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
this.test.OnNext(index);
});
}
Console.ReadKey();
}
private void OnNextNormal(int obj)
{
Console.WriteLine(
$"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
private void OnNextDelayed(int obj)
{
Console.WriteLine(
$"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
}
输出是:
Main 0(线程:5):AsyncLocal.Value => 0
Main 1(线程:6):AsyncLocal.Value => 1
OnNextNormal 0(线程:5):AsyncLocal.Value => 0
OnNextNormal 1(线程:6): AsyncLocal.Value => 1
OnNextDelayed 0(线程:4):AsyncLocal.Value => 0
OnNextDelayed 1(线程:4):AsyncLocal.Value => 0
如您所见, AsyncLocal.Value 不会流向延迟订阅的方法。
=> AsyncValue 在延迟轨道上丢失
据我了解,普通的 Subscribe() 不使用调度程序,而 Delay() 使用调度程序。
当我对两个调用都使用 ObserveOn() 时,两者的输出如下
Main 0(线程:5):AsyncLocal.Value => 0
Main 1(线程:7):AsyncLocal.Value => 1
OnNextNormal 0(线程:9):AsyncLocal.Value => 0
OnNextNormal 1(线程:9): AsyncLocal.Value => 0
OnNextDelayed 0(线程:4):AsyncLocal.Value => 0
OnNextDelayed 1(线程:4):AsyncLocal.Value => 0
=> AsyncValue 在每条轨道上都会丢失
有没有办法让 ExecutionContext 与 RX 一起流动?
我只发现了这一点,但这是另一个问题。他们解决了观察者的上下文如何流动的问题。我想流动发布者的上下文。
我想要实现的是:
- 来自“外部”的消息为我服务
- 在服务内分发消息 (RX)
- 记录消息时,使用 MessageId 格式化日志消息
- 我不想到处传递信息
提前感谢您的回答。