我正在编写一个 .NET XMPP 库是为了好玩,正如其他地方所讨论的那样,.NET 4.5 之前的版本中的XmlReader
实现不适合从 a 解析 XML,NetworkStream
因为它在填充内部 4KB 缓冲区或达到 EOF 之前不会开始解析.
其他图书馆根本不使用来解决这个问题XmlReader
。正如前面链接的问题中提到的,jabber-net 使用 Java XML 解析器的一个端口。我在搜索时发现的一个实现 Babel IM 使用它自己的简单 XML 解析器。我不确定 agsXMPP 是做什么的。
但是,随着 .NET 4.5 的发布,新的异步功能XmlReader
显然得到了升级,现在可以进行真正的异步解析。因此,我使用它来构建一个相当简单的 XMPP 客户端,该客户端可以连接到服务器并发送和接收消息。
然而,症结实际上似乎在于与服务器断开连接。在断开连接时,我通常只想Dispose()
使用我的XmlReader
实例和底层流。但是,Dispose()
实际上会抛出一个InvalidOperationException
带有消息“异步操作已经在进行中”的消息。如果您在异步时调用它...以及消息中的内容。然而,由于 XMPP 的性质,我XmlReader
基本上一直在执行异步操作,因为它等待来自服务器的 XML 节从管道中下来。
似乎没有任何方法可以用来告诉它取消任何挂起的XmlReader
异步操作,以便我可以Dispose()
干净地处理它。有没有比简单地不尝试处理这种情况更好的方法来处理这种情况XmlReader
?XMPP规范声明服务器应该</stream:stream>
在断开连接时发送结束标记。我可以将其用作不尝试执行另一次异步读取的信号,因为管道中不应有任何其他内容,但不能保证这一点。
这是一些示例代码。LongLivedTextStream
基本上模拟一个打开NetworkStream
,因为它永远不会到达 EOF,并且会阻塞直到可以读取至少 1 个字节。您可以将 XML 文本“注入”到其中,它XmlReader
会很乐意解析,但尝试处置阅读器将触发上述异常。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Example
{
class LongLivedTextStream : Stream
{
ManualResetEvent moarDatas = new ManualResetEvent(false);
List<byte> data = new List<byte>();
int pos = 0;
public void Inject(string text)
{
data.AddRange(new UTF8Encoding(false).GetBytes(text));
moarDatas.Set();
}
public override int Read(byte[] buffer, int offset, int count)
{
var bytes = GetBytes(count).ToArray();
for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
{
buffer[offset + i] = bytes[i];
}
return bytes.Length;
}
private IEnumerable<byte> GetBytes(int count)
{
int returned = 0;
while (returned == 0)
{
if (pos < data.Count)
{
while (pos < data.Count && returned < count)
{
yield return data[pos];
pos += 1; returned += 1;
}
}
else
{
moarDatas.Reset();
moarDatas.WaitOne();
}
}
}
#region Other Stream Members
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush() { }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
}
public class Program
{
public static void Main(string[] args)
{
Test();
Console.ReadLine();
}
public static async void Test()
{
var stream = new LongLivedTextStream();
var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });
var t = Task.Run(() =>
{
stream.Inject("<root>");
Thread.Sleep(2000);
stream.Inject("value");
Thread.Sleep(2000);
stream.Inject("</root>");
Thread.Sleep(2000);
reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."
Console.WriteLine("Disposed");
});
while (await reader.ReadAsync())
{
bool kill = false;
switch (reader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("Start: " + reader.LocalName);
break;
case XmlNodeType.EndElement:
Console.WriteLine("End: " + reader.LocalName);
//kill = true; // I could use a particular EndElement as a signal to not try another read
break;
case XmlNodeType.Text:
Console.WriteLine("Text: " + await reader.GetValueAsync());
break;
}
if (kill) { break; }
}
}
}
}
编辑
此示例使用实际NetworkStream
并显示如果 IClose()
或Dispose()
基础流的ReadAsync()
调用XmlReader
不会像希望的那样返回 false,而是继续阻塞。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace Example
{
public class Program
{
public static void Main(string[] args)
{
NetworkStream stream = null;
var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);
var serverIsUp = new ManualResetEvent(false);
var doneWriting = new ManualResetEvent(false);
var t1 = Task.Run(() =>
{
var server = new TcpListener(endpoint);
server.Start();
serverIsUp.Set();
var client = server.AcceptTcpClient();
var writer = new StreamWriter(client.GetStream());
writer.Write("<root>"); writer.Flush();
Thread.Sleep(2000);
writer.Write("value"); writer.Flush();
Thread.Sleep(2000);
writer.Write("</root>"); writer.Flush();
Thread.Sleep(2000);
doneWriting.Set();
});
var t2 = Task.Run(() =>
{
doneWriting.WaitOne();
stream.Dispose();
Console.WriteLine("Disposed of Stream");
});
var t3 = Task.Run(async () =>
{
serverIsUp.WaitOne();
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
socket.Connect(endpoint);
stream = new NetworkStream(socket, true);
var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });
bool val;
while (val = await reader.ReadAsync())
{
bool kill = false;
switch (reader.NodeType)
{
case XmlNodeType.Element:
Console.WriteLine("Start: " + reader.LocalName);
break;
case XmlNodeType.EndElement:
Console.WriteLine("End: " + reader.LocalName);
//kill = true; // I could use a particular EndElement as a signal to not try another read
break;
case XmlNodeType.Text:
Console.WriteLine("Text: " + await reader.GetValueAsync());
break;
}
if (kill) { break; }
}
// Ideally once the underlying stream is closed, ReadAsync() would return false
// we would get here and could safely dispose the reader, but that's not the case
// ReadAsync() continues to block
reader.Dispose();
Console.WriteLine("Disposed of Reader");
});
Console.ReadLine();
}
}
}