0

我要做的是创建一个“监听器”,它一次监听几个不同的 Tcp 端口,并将消息通过管道传递给任何观察者。

伪代码:

private bool _Listen = false;
public void Start()
{
    _Listen = true;
    Task.Factory.StartNew(() => Listen(1);
    Task.Factory.StartNew(() => Listen(2);
}

public void Stop()
{
    _Listen = false;
}

private async void Listen(int port)
{
     var tcp = new TcpClient();
     while(_Listen)
     {
          await tcp.ConnectAsync(ip, port);
          using (/*networkStream, BinaryReader, etc*/)
          {
               while(_Listen)
               {
                   //Read from binary reader and OnNext to IObservable
               }
          }
     }
}

(为简洁起见,我在两个 while 中省略了 try/catch,两者都检查了标志)

我的问题是:我是否应该锁定标志,如果是,那么它如何与 async/await 位相关联?

4

4 回答 4

4

首先,您应该将返回类型更改为 Task,而不是 void。async void方法本质上是一劳永逸的,不能等待或取消。它们的存在主要是为了允许创建异步事件处理程序或类似事件的代码。它们不应该用于正常的异步操作。

协同取消/中止/停止异步操作的 TPL 方法是使用CancellationToken。您可以检查令牌的IsCancellationRequested属性以查看是否需要取消操作并停止。

更好的是,框架提供的大多数异步方法都接受 CancellationToken,因此您可以立即停止它们而无需等待它们返回。您可以使用 NetworkStream 的ReadAsync(Byte[], Int32, Int32, CancellationToken)读取数据并在有人调用您的 Stop 方法时立即取消。

您可以将代码更改为以下内容:

    CancellationTokenSource _source;

    public void Start()
    {
        _source = new CancellationTokenSource();            
        Task.Factory.StartNew(() => Listen(1, _source.Token),_source.Token);
        Task.Factory.StartNew(() => Listen(2, _source.Token), _source.Token);
    }

    public void Stop()
    {
        _source.Cancel();
    }


    private async Task Listen(int port,CancellationToken token)
    {
        var tcp = new TcpClient();
        while(!token.IsCancellationRequested)
        {
            await tcp.ConnectAsync(ip, port);
            using (var stream=tcp.GetStream())
            {
                ...
                try
                {
                    await stream.ReadAsync(buffer, offset, count, token);
                }
                catch (OperationCanceledException ex)
                {
                    //Handle Cancellation
                }
                ...
            }
        }
    }

您可以在托管线程中的取消中阅读更多关于取消的信息,包括有关如何轮询、注册取消回调、侦听多个令牌等的建议。

try/catch块之所以存在,是因为await如果取消任务,则会引发异常。您可以通过对 ReadAsync 返回的任务调用 ContinueWith 并检查 IsCanceled 标志来避免这种情况:

    private async Task Listen(int port,CancellationToken token)
    {
        var tcp = new TcpClient();
        while(!token.IsCancellationRequested)
        {
            await tcp.ConnectAsync(ip, port);
            using (var stream=tcp.GetStream())
            {
                ///...
                await stream.ReadAsync(buffer, offset, count, token)
                    .ContinueWith(t =>
                    {
                        if (t.IsCanceled)
                        {
                            //Do some cleanup?
                        }
                        else
                        {
                            //Process the buffer and send notifications
                        }
                    });
                ///...
            }
        }
    }

await现在等待一个简单Task的在延续完成时完成

于 2013-08-23T08:03:21.797 回答
1

ManualResetEventSlim当您可能处理多个线程时,某种形式的事件(例如)将是更明显的选择。

private ManualResetEventSlim _Listen;
public void Start()
{
    _Listen = new ManualResetEventSlim(true);
    Task.Factory.StartNew(() => Listen(1);
    Task.Factory.StartNew(() => Listen(2);
}

public void Stop()
{
    _Listen.Reset();
}

private async void Listen(int port)
{
     var tcp = new TcpClient();
     while(_Listen.IsSet)
     {
于 2013-08-23T07:15:26.270 回答
1

你可能会更好地坚持使用 RX 而不是使用 Task。这是我为使用 RX 连接到 UDP 套接字而编写的一些代码。

public IObservable<UdpReceiveResult> StreamObserver
(int localPort, TimeSpan? timeout = null)
{


    return Linq.Observable.Create<UdpReceiveResult>(observer =>
    {
        UdpClient client = new UdpClient(localPort);

        var o = Linq.Observable.Defer(() => client.ReceiveAsync().ToObservable());
        IDisposable subscription = null;
        if ((timeout != null)) {
            subscription = Linq.Observable.Timeout(o.Repeat(), timeout.Value).Subscribe(observer);
        } else {
            subscription = o.Repeat().Subscribe(observer);
        }

        return Disposable.Create(() =>
        {
            client.Close();
            subscription.Dispose();
            // Seems to take some time to close a socket so
            // when we resubscribe there is an error. I
            // really do NOT like this hack. TODO see if
            // this can be improved
            Thread.Sleep(TimeSpan.FromMilliseconds(200));
        });
    });
}
于 2013-08-23T08:29:26.487 回答
1

我应该锁定标志吗?如果是这样,它如何与 async/await 位相关联?

您需要以某种方式同步对标志的访问。如果不这样做,则允许编译器进行以下优化:

bool compilerGeneratedLocal = _Listen;
while (compilerGeneratedLocal)
{
    // body of the loop
}

这会使您的代码出错。

一些选项如何解决这个问题:

  1. 标记bool标志volatile。这将确保始终读取标志的当前值。
  2. 使用CancellationToken(由 Panagiotis Kanavos 建议)。这将确保您以线程安全的方式访问底层标志。它还具有许多异步方法支持的优点CancellationToken,因此您也可以取消它们。
于 2013-08-23T13:13:26.087 回答