3

我有两个线程。一个线程读取请求并使用消息队列将其传递给服务器,另一个线程从消息队列中读取响应并将其发送回。在同一进程中,调用者类方法将请求写入管道(使用第一个线程共享的服务器管道流),然后使用第二个线程共享的客户端管道流读取响应。这可以使用 Java PipeInputStream 和 PipeOutputStream 轻松完成,如下所示。本质上,我正在寻找等效于 C# 中的 Java 逻辑。我尝试在 C# 中使用匿名管道失败。

RequestHandlerThread(上面提到的Thread1)

out = new PipedOutputStream();
readPipeIs = new PipedInputStream(out);
readDataIs = new DataInputStream(readPipeIs);
// read data from readDataIs
// Send it to server over message queue
// Share 'out' so that other class method can write to it. 

响应处理程序(如上所述的线程 2)

in = new PipedInputStream();
writePipeOs = new PipedOutputStream(in);
writeDataOs = new DataOutputStream(writePipeOs);

// Wait and read from message queue
// write received data to 'writeDataOs'
// Share 'in' so that other class method can read from it. 

我不确定 C# 管道是否受限于两个进程之间的通信。上述所有逻辑都在同一个进程中,只是有两个线程与消息服务器通信。

我在两个线程中都尝试了一对 AnonymousPipeServerStream 和 AnonymousPipeClientStream 对。我共享了用于写入的服务器流和用于通过其他类方法读取的客户端流。

上述逻辑有什么明显的缺陷,或者对 IPC 的选择有什么建议吗?

添加源代码 这里是测试类

class Test
{
    private static byte[] ret;
    private static bool ready;

    Stream outStream;
    Stream inStream;


    private void clientConnReqHandler()
    {
        AnonymousPipeServerStream pipeServer = new
              AnonymousPipeServerStream(PipeDirection.Out);

        outStream = pipeServer;

        string pipeHandle = 
                     pipeServer.GetClientHandleAsString();

        AnonymousPipeClientStream pipeClient =
            new AnonymousPipeClientStream(PipeDirection.In, 
                       pipeHandle);

        pipeServer.DisposeLocalCopyOfClientHandle();

        ready = false;
        BinaryReader binReader = new BinaryReader(pipeClient);
        int mesgSize = binReader.ReadInt32();
        System.Console.WriteLine("Message Lenght To read: " + 
                                  mesgSize);
        byte[] buffer = binReader.ReadBytes(mesgSize);
        System.Console.WriteLine("Message read: " + 
                    buffer.ToString());
        // Simulate some processing 
        Thread.Sleep(5000);
        mesgProcessing(buffer);

    }
    private static void mesgProcessing(byte[] buffer)
    {

        System.Text.UTF8Encoding encoding = new 
                            System.Text.UTF8Encoding();
        byte[] extra = encoding.GetBytes("Echo : ");

        ret = new byte[buffer.Length + extra.Length];
        System.Buffer.BlockCopy(extra, 0, ret, 0, extra.Length);
        System.Buffer.BlockCopy(buffer, 0, ret, extra.Length, 
                                buffer.Length);
        ready = true;
    }


    private void clientConnRespHandler()
    {
        AnonymousPipeServerStream pipeServer = new 
                AnonymousPipeServerStream(PipeDirection.Out);

        string pipeHandle = 
                  pipeServer.GetClientHandleAsString();

        AnonymousPipeClientStream pipeClient =
            new AnonymousPipeClientStream(PipeDirection.In, 
                  pipeHandle);

        inStream = pipeClient;
        pipeServer.DisposeLocalCopyOfClientHandle();

        while (ready)
        {
            BinaryWriter binWriter = new 
                           BinaryWriter(pipeServer);
            binWriter.Write(ret.Length);
            binWriter.Write(ret);
            ready = false;
        }
    }

    public static void Main()
    {
        Test setup = new Test();
        setup.threadTest();

        Test2 threadTest = new Test2();
        // This method will do actuall read and write. 
        threadTest.runTest(setup.inStream, setup.outStream);
    }
    public void threadTest()
    {
        Thread reqHandlerThread = new Thread(new 
                ThreadStart(clientConnReqHandler));
        Thread respHandlerThread = new Thread(new 
               ThreadStart(clientConnRespHandler));

        reqHandlerThread.Start();
        respHandlerThread.Start();

    }
}

读/写的类:

class Test2
{

    internal void runTest(System.IO.Stream inStream, 
                  System.IO.Stream outStream)
    {
        BinaryWriter writer = new BinaryWriter(outStream);

        System.Text.UTF8Encoding encoding = new 
                 System.Text.UTF8Encoding();
        byte[] mesg = encoding.GetBytes("Hello World!!!");

        writer.Write(mesg.Length);
        writer.Write(mesg);

        BinaryReader reader = new BinaryReader(inStream);
        int mesgSize = reader.ReadInt32();
        System.Console.WriteLine("Message Lenght To read: " + 
                      mesgSize);
        byte[] buffer = reader.ReadBytes(mesgSize);
        System.Console.WriteLine("Message read: " + 
                    buffer.ToString());
    }
}

谢谢

4

1 回答 1

0

好的。它在摆脱 DisposeLocalCopyOfClientHandle() 后起作用。当然必须修复while循环条件的一些新错误,以检查数据是否准备好并从字节数组中正确打印字符串。

于 2012-04-17T02:28:23.773 回答