0

我试图建立一个命名管道服务器和客户端来在两个程序之间发送数据。我的问题是,当我收到数据时,例如。BeginRead 命令 om 服务器在我从客户端序列化一个对象后触发,它会为同一条消息触发 20 次回调。目标是客户端程序将命令发送到服务器程序。当服务器处理任务时,它会在有连接时将状态更新发送回客户端。

这是我当前的测试程序。

class Program
{
    static void Main(string[] args)
    {
        var server = new PipeServer();
        server.Init();

        var client = new PipeClient();
        if (client.Connect())
        {
            Console.WriteLine("Connected to server.");
        }
        else
        {
            Console.WriteLine("Connection failed.");
            return;
        }

        while (true)
        {
            Console.Write(" \\> ");
            string input = Console.ReadLine();
            if (string.IsNullOrEmpty(input)) break;

            var arr = input.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
            int value = 0;

            if (arr.Length != 2) break;
            if (!int.TryParse(arr[1], out value)) break;

            var obj = new PipeObject { Name = arr[0], Value = value };
            client.Send(obj);

            //string result = f.Deserialize(client) as string;
            //Console.WriteLine(result);
        }
    }
}

internal class PipeServer
{
    IFormatter Formatter = new BinaryFormatter();
    public NamedPipeServerStream Instance { get; internal set; }
    public bool IsConnected { get; internal set; }
    byte[] buffer = new byte[65535];
    public object Message { get; set; }

    StreamReader sr;
    StreamWriter sw;

    internal PipeServer()
    {
        IsConnected = false;
    }

    public void Init()
    {
        var ps = new PipeSecurity();
        ps.AddAccessRule(new PipeAccessRule(WindowsIdentity.GetCurrent().User, PipeAccessRights.FullControl, AccessControlType.Allow));
        ps.AddAccessRule(new PipeAccessRule(new SecurityIdentifier(WellKnownSidType.AuthenticatedUserSid, null), PipeAccessRights.ReadWrite, AccessControlType.Allow));

        Instance = new NamedPipeServerStream("Levscan4Pipe", PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous | PipeOptions.WriteThrough, 65535, 65535, ps);

        sr = new StreamReader(Instance);
        sw = new StreamWriter(Instance);

        Instance.BeginWaitForConnection(OnClientConnected, Instance);

        Thread t = new Thread(Run);
        t.Start();
    }

    void Run()
    {
        int index = 0;
        if (IsConnected)
        {
            try
            {
                Instance.BeginRead(buffer, 0, buffer.Length, OnRead_Completed, Instance);
                //index += Instance.Read(buffer, 0, buffer.Length);
                //try
                //{
                //    using (var ms = new MemoryStream(buffer))
                //    {
                //        Message = Formatter.Deserialize(ms);
                //        index = 0;
                //    }
                //}
                //catch (Exception e)
                //{
                //    Debug.WriteLine(e.Message);
                //    Debug.WriteLine(e.StackTrace);
                //}
            }
            catch (IOException)
            {
                IsConnected = false;
                Instance.Disconnect();
            }
        }

        Thread.Sleep(Timeout.Infinite);
        //Instance.WaitForConnection();
        //Thread t = new Thread(Run);
        //t.Start();
    }

    void OnClientConnected(IAsyncResult ar)
    {
        Instance.EndWaitForConnection(ar);
        IsConnected = true;
    }

    void OnRead_Completed(IAsyncResult ar)
    {
        var bytes = Instance.EndRead(ar);
        Debug.WriteLine("{1} > Read completed - bytes read: {0}".FormatWith(bytes, DateTime.Now.ToString()));

        //try
        //{
        //    using (var ms = new MemoryStream(buffer))
        //    {
        //        Message = Formatter.Deserialize(ms);
        //    }
        //}
        //catch (Exception e)
        //{
        //    Debug.WriteLine(e.Message);
        //    Debug.WriteLine(e.StackTrace);
        //}
    }
}

internal class PipeClient
{
    IFormatter f = new BinaryFormatter();
    public NamedPipeClientStream Instance { get; internal set; }
    StreamWriter sw;
    StreamReader sr;

    public PipeClient()
    {
        Instance = new NamedPipeClientStream(".", "Levscan4Pipe", PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough);
        sr = new StreamReader(Instance);
        sw = new StreamWriter(Instance);
    }

    public bool Connect()
    {
        try
        {
            Instance.Connect(5000);
            Instance.ReadMode = PipeTransmissionMode.Message;
            Instance.WaitForPipeDrain();
            return true;
        }
        catch
        {
            return false;
        }
    }

    public void Send(object obj)
    {
        f.Serialize(Instance, obj);
        Instance.Flush();
        Instance.WaitForPipeDrain();
    }
}

编辑

将 while 循环更改为 if,以启动 BeginRead。这解决了多个回调,但我仍然没有收到完整的消息。

4

1 回答 1

1

如果服务器正在写入流,例如:

write field 1
write field 2
write field 3
etc.

写入之间有一段时间,接收器(您的程序)可以读取前三个字段,而服务器仍在写入其他字段。管道流不知道服务器何时完成写入,因此它无法缓冲所有内容并将其一大块发送给您。

当服务器首先将所有内容写入内存流然后将内存流复制到管道流时,您的程序可以一次获取所有内容。也许。如果服务器正在发送一个非常大的数据包,您可能只读取其中的一部分。

管道流只是一个字节流。它不会对数据强加任何格式。它没有任何记录或类似的概念。因此,您必须将其视为字节流并自己编写记录等。

如果您需要知道从服务器发送的记录的大小,服务器必须为您将该信息放入流中。通常,服务器会先写入长度,然后再写入数据。然后接收器可以读取长度,将其转换为整数,然后从流中读取那么多字节。而且,是的,可能需要多次读取才能获取所有字节。这只是字节流的本质。

处理此问题的另一种方法是使用记录结束标记。所以服务器发送它的数据并且你的程序读取直到它找到表示记录结束的字节序列。但是,您必须小心,因为服务器可能正在发送多条记录,而您的读取可能会抓取一条记录的结尾以及下一条记录的开头。

使用字节流可能需要做很多工作,因为您必须在读取字节后重建记录。如果可以的话,使用现有框架(如评论之一中提到的 WCF)要容易得多。

于 2013-08-23T13:57:17.503 回答