1

我正在尝试使用libchan库在使用类似 go 通道的传输的机器之间发送消息。

根据我收集到的信息,粗略的想法是这样的:

  1. 你有一个 SPDY 客户端,它通过 tcp 向一个地址发送一个序列化的命令对象。此命令对象包含一个名为 a 的 libchan 通道Pipe,通过该通道发送响应。
  2. 当服务器接收到一个传入连接时,它会等待一个命令对象。Pipe当它得到一个时,它会通过包含在对象中的内容发送响应。

这是我的困惑点。要让通道在两台机器之间持续存在,它们必须共享内存或至少共享连接它们两者的抽象。根据我对 libchan 代码库的了解,我不知道这怎么可能。

这是 repo 中示例的片段:

// client

    receiver, remoteSender := libchan.Pipe()
    command := &RemoteCommand{
        Cmd:        os.Args[1],
        Args:       os.Args[2:],
        Stdin:      os.Stdin,
        Stdout:     os.Stdout,
        Stderr:     os.Stderr,
        StatusChan: remoteSender,
    }

    err = sender.Send(command)
    if err != nil {
        log.Fatal(err)
    }
    err = receiver.Receive(response)
    if err != nil {
        log.Fatal(err)
    }

    os.Exit(response.Status)

和服务器:

// server
t := spdy.NewTransport(p)

        go func() {
            for {
                receiver, err := t.WaitReceiveChannel()
                if err != nil {
                    log.Print("receiver error")
                    log.Print(err)
                    break
                }
                log.Print("about to spawn receive proc")
                go func() {
                    for {
                        command := &RemoteReceivedCommand{}
                        err := receiver.Receive(command)
                        returnResult := &CommandResponse{}
                        if res != nil {
                            if exiterr, ok := res.(*exec.ExitError); ok {
                                returnResult.Status = exiterr.Sys(). 
                              (syscall.WaitStatus).ExitStatus()
                            } else {
                                log.Print("res")
                                log.Print(res)
                                returnResult.Status = 10
                            }
                        }
                        err = command.StatusChan.Send(returnResult)

我要磨练的重点在这里:

libchan.Pipe()

根据消息来源,这将返回一个通道。一个引用保存在客户端,另一个发送到服务器。然后使用此通道将值从后者传递到前者。这在实践中如何运作?

客户端服务器的完整代码

4

1 回答 1

0

首先,很高兴知道Pipe()所做的只是创建一个通道并返回内存中的发送器/接收器对。

来自inmem.go

// Pipe returns an inmemory Sender/Receiver pair.
func Pipe() (Receiver, Sender) {
    c := make(chan interface{})
    return pReceiver(c), pSender(c)
}

然后,您可以查看inmem_test.go一个简单的端到端示例。

这个结构相当于RemoteCommand来自演示。

type InMemMessage struct {
    Data   string
    Stream io.ReadWriteCloser
    Ret    Sender
}

TestInmemRetPipe()中,创建了一个简单的客户端和服务器。

客户端使用 来创建本地发送者/接收者对Pipe(),而服务器只使用结构中的libchan.Sender接口InMemMessage

请注意,客户端和服务器是分别接收SenderReceiver作为参数的函数。在下一个代码片段中了解更多信息。

func TestInmemRetPipe(t *testing.T) {
    client := func(t *testing.T, w Sender) {
        ret, retPipe := Pipe()
        message := &InMemMessage{Data: "hello", Ret: retPipe}

        err := w.Send(message)
        if err != nil {
            t.Fatal(err)
        }
        msg := &InMemMessage{}
        err = ret.Receive(msg)
        if err != nil {
            t.Fatal(err)
        }

        if msg.Data != "this better not crash" {
            t.Fatalf("%#v", msg)
        }

    }
    server := func(t *testing.T, r Receiver) {
        msg := &InMemMessage{}
        err := r.Receive(msg)
        if err != nil {
            t.Fatal(err)
        }

        if msg.Data != "hello" {
            t.Fatalf("Wrong message:\n\tExpected: %s\n\tActual: %s", "hello", msg.Data)
        }
        if msg.Ret == nil {
            t.Fatal("Message Ret is nil")
        }

        message := &InMemMessage{Data: "this better not crash"}
        if err := msg.Ret.Send(message); err != nil {
            t.Fatal(err)
        }
    }
    SpawnPipeTestRoutines(t, client, server)

}

SpawnPipeTestRoutines()执行客户端和服务器功能。在这个函数中,另一个发送者/接收者对通过Pipe().

在演示应用程序中,此处执行的功能Pipe()(即促进客户端和服务器实例之间的通信)改为通过网络通信处理。

func SpawnPipeTestRoutines(t *testing.T, s SendTestRoutine, r ReceiveTestRoutine) {
    end1 := make(chan bool)
    end2 := make(chan bool)

    receiver, sender := Pipe()

    go func() {
        defer close(end1)
        s(t, sender)
        err := sender.Close()
        if err != nil {
            t.Fatalf("Error closing sender: %s", err)
        }
    }()

    go func() {
        defer close(end2)
        r(t, receiver)
    }()
    ...

在演示应用程序中,通过调用客户端上的 Transport.NewSendChannel() 和 Transport.WaitReceiveChannel() 来促进通信它们分别返回 alibchan.Senderlibchan.Receiver。这些 libchan 实例处理通过网络促进“管道”。

从client.go:

sender, err := transport.NewSendChannel()
...
err = sender.Send(command)

从 server.go:

receiver, err := t.WaitReceiveChannel()
...
err := receiver.Receive(command)

在这两种情况下,先决条件的传输配置都是事先完成的(即绑定到套接字、利用 TLS 等)。

可能还值得注意的是,正在使用的 spdy 库是 libchan 发行版的一部分,因此它提供了 libchan 原语。

于 2018-06-24T23:07:11.523 回答