根据您更新的问题,这是一种可能的方法。请注意,这两个额外的进程在这里由 go-routines 表示,但在实际情况下,您可以让它们成为单独的进程。我也省略了错误检查。
// This represent what would be the last process in your
// example.
go func() {
nc, _ := nats.Connect(nats.DefaultURL)
nc.Subscribe("bar", func(m *nats.Msg) {
fmt.Printf("Received request: %s, final stop, sending back to %v\n", m.Data, m.Reply)
nc.Publish(m.Reply, []byte("I'm here to help!"))
})
nc.Flush()
runtime.Goexit()
}()
// This would be the in-between process that receives
// the message triggered by the TCP accept
go func() {
nc, _ := nats.Connect(nats.DefaultURL)
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received request: %s, forward to bar\n", m.Data)
nc.PublishRequest("bar", m.Reply, []byte(fmt.Sprintf("got %s", m.Data)))
})
nc.Flush()
runtime.Goexit()
}()
// This would be your TCP server
l, _ := net.Listen("tcp", "127.0.0.1:1234")
for {
c, _ := l.Accept()
go func(c net.Conn) {
// Close socket when done
defer c.Close()
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Close NATS connection when done
defer nc.Close()
// Sends the request to first process. Note that this
// has a timeout and so if no response is received, the
// go-routine will exit, closing the TCP connection.
reply, err := nc.Request("foo", []byte("help"), 10*time.Second)
if err != nil {
fmt.Printf("Got error: %v\n", err)
} else {
fmt.Printf("Got reply: %s\n", reply.Data)
}
}(c)
}
请注意,通常不建议创建寿命很短的 NATS 连接。如果适合您的模型,您可能希望重用 NATS 连接。