1

我正在开发一个使用原始 TCP 套接字与中央服务器通信的客户端应用程序。应用程序消息被序列化,然后加上长度前缀以创建传递到 TCP 流中的帧。

处理此问题的一种经典方法是直接在套接字类上调用 Receive 或 BeginReceive,在回调上反序列化消息,将消息传递到单独的队列中以供另一个线程处理,然后让回调再次在套接字上开始另一个接收。

这种方法的简单实现对我来说并不理想——它将消息序列化和反序列化与套接字紧密耦合,并且需要相当多的“管道”才能让队列在不同的线程/回调中发挥良好的作用。它也有点抽象——它需要调用代码了解底层套接字,而不是输入和输出消息的“数据流”。

鉴于我完全在 .NET 4.5 中工作,使用 TPL (TaskFactory.FromAsync) 包装 Socket 的 Begin 和 End 异步方法是一个明显的选择。但是,由于多种原因,我不清楚如何从这一点着手:

  1. 我需要一个永远不会完成的异步“任务”来接收数据。只要连接了套接字,我就想要处理消息流。任何中断(断开连接、套接字错误或取消请求)都将是异常,而不是传统的任务完成。根据 Stephen Toub ( http://blogs.msdn.com/b/pfxteam/archive/2011/10/02/10218999.aspx ) 的说法,我应该始终完成我的任务。这产生了一些问题——传统意义上的套接字接收永远不会完成。斯蒂芬似乎在他的“等待套接字操作”帖子中与自己略有矛盾,他在其中展示了一个套接字读取,它永远不会在没有套接字错误的情况下完成(http://blogs.msdn.com/b/pfxteam/archive/2011/12/15 /10248293.aspx)。
  2. 我需要一种同步“排队”要发送的数据的方法。调用者应该能够发送要传输的消息而不会阻塞,并且消息应该通过套接字顺序传输。换句话说,由于消息框架,一次只能在套接字本身上发送一个。TPL 数据流是否适合,或者我应该使用不同的排队模式?
  3. 我希望将消息序列化和消息传输之间的关注点完全分开。

我还没有看到很多使用这种策略的示例,只有“直接”套接字 I/O 或琐碎的实现。我的直觉告诉我,TPL Dataflow 非常适合,因为序列化和反序列化可以流水线化。

我不清楚如何将有效的无限链接收任务连接到 TPL 数据流或类似的东西。

有任何想法吗?

4

1 回答 1

3

目前,还没有针对此的好方法的示例。一旦你得到一个解决方案,我建议你在某个地方发布它。

1.我需要一个永远不会完成的异步“任务”来接收数据......

我不同意“传统意义上的套接字接收永远不会完成”的说法。较低级别的“接收”操作(例如,围绕/或 Toub's的FromAsync包装器)返回将在有可用数据、套接字关闭或出现错误时完成的操作。更高级别的“接收器”操作(例如,来自 Toub 的帖子)将在套接字关闭或出现错误时完成。BeginReceiveEndReceiveReceiveAsyncTaskReadAsync

可以拥有Task可能需要无限时间才能完成的 s,但只要它们完成就可以了。Toub 在这篇文章中指出的是你Task的 s 应该最终完成(特别是处理错误情况)。这与 Rx 的方法不同,在 Rx 的方法中,拥有一个永远不会产生数据且永远不会结束的 observable 是完全有效的。

2.我需要一种同步“排队”发送数据的方法……

从技术上讲,它是“序列化”(“按顺序”),而不是“同步”。理想的解决方案是异步序列化。有一些方法可以解决这个问题。

我会说 TPL 数据流非常适合。请注意,每个套接字实际上有两个逻辑“流”(读取和写入是独立的)。我在基于 Dataflow 的套接字包装器上取得了一些成功,但还没有时间使其达到生产质量。由于有两个流(我的每个可插入“块”必须有两个 TPL 数据流块,一个用于输入,一个用于输出),API 相当尴尬。

另一种方法是Rx。Rx 比 plain s 或 TPL Dataflow 具有更高的学习曲线Task,但提供了相当多的功能(和效率)。几年前我玩过基于 Rx 的套接字,但从来没有任何工作。这些天,Rx 的文档和示例要好得多,所以我认为它今天是一个可行的选择。

还有async直接使用方法的方法。您将使用同步而不是队列。例如,您可以使用 aSemaphoreSlim来确保SendAsync对于给定的套接字一次只能运行一个方法。但是,这确实改变了语义,将一些“序列化”推向了您的调用代码:而不是简单的入队和完成任务(除非您达到发送限制,否则它将始终同步完成),您有一个异步等待发送然后完成的任务(几乎总是异步等待)。async你可以通过建立一个生产者/消费者队列(就像我写的那样)来缓解这种情况,但是你有一个单独的消费者Task您需要跟踪,此时您只是在重写 TPL 数据流。

我不清楚如何将有效的“无尽”接收任务链“桥接”到 TPL 数据流或类似的东西。

没有一个好的内置方法可以做到这一点。

一个简单的解决方案是拥有一个“接收器” Task,它单独负责将数据推送到 TPL 数据流管道中。但是,您必须对其进行监控Task以确保在发生某些错误时不会放弃管道,并且您需要一种方法来干净地关闭它。

我确实编写了一种FuncBlock类型来处理这种情况(想法是使用它进行套接字读取和其他基于 I/O 的数据流输入)。解决方法和 Dataflow 块如何交互的所有语义async(特别是在取消/错误/完成方面)需要一段时间,但我认为它会很有用。我会很感激任何反馈。

于 2013-06-25T21:10:56.493 回答