我想用nanomsg
/nng
作为一个完全分布式的点对点多节点网络的通信基础,帮助构建拓扑发现和维护的动态能力。现在我陷入了它的 Golang 包mangos
中。
在 Python 和 pynng(它是 nanomsg 的 python 绑定)中已经完成了相同的工作,但是当我使用 Go 并通过 mangos 调用相应的方法时,它们的行为完全不同。谜题主要有三方面:
- bus-type-Socket 的 Recv() 默认情况下以阻塞模式运行,似乎不能配置为非阻塞模式。文件说:
OptionRecvDeadline 是直到下一个 Recv 超时的时间。该值是时间。持续时间。可以传递零值以指示不应应用超时。负值表示非阻塞操作。默认情况下没有超时。
我相应地尝试了一个负值,但Recv()
仍然阻塞。我还应该做什么?以及如何理解“零超时”和“非阻塞”之间的区别?
dialer
返回的 by似乎(s *socket) NewDialer(...)
在调用之后会徘徊dialer.Close()
,因为调用下一个报告时会发生错误,dialer.Dial()
它仍然是“正在使用的地址”。但是当我再次尝试Close()
时dialer
,会发生错误并报告它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond // or zero
opts[mangos.OptionKeepAliveTime] = time.Millisecond // or even smaller
opts[mangos.OptionKeepAlive] = false // or true
当我想完全杀死拨号器,或者想在一段时间后重用“伪关闭”拨号器时,我应该怎么做?
- 总线类型的套接字
Send()
很奇怪。通常每个节点都应该定期在我的代码中发送一条消息。我从网络中关闭了一个节点(比如“Node-X”)的物理连接,使其离线一段时间,然后将其重新连接到网络。我发现 Node-X 重新连接后会立即重新发送大量消息。但我真正期望的是 Node-X 可以将这些消息发送到空中,即使它没有邻居。
我想知道是否有任何方法可以解决这些问题。我想它可能缺少一些选项或配置,但我没有弄清楚它们。
以下代码用于重现重新拨号和重新关闭错误。
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/bus"
// register transports
_ "go.nanomsg.org/mangos/v3/transport/all"
)
var (
sock mangos.Socket
DialerMap map[string]*mangos.Dialer
opts map[string]interface{}
)
func main() {
var err error
opts = make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true
opts[mangos.OptionMaxReconnectTime] = time.Millisecond
// opts[mangos.OptionKeepAliveTime] = time.Millisecond
opts[mangos.OptionKeepAlive] = false
DialerMap = make(map[string]*mangos.Dialer)
if sock, err = bus.NewSocket(); err != nil {
fmt.Println("bus.NewSocket error. ", err)
os.Exit(1)
}
TargetUUID := "node-A"
TargetAddr := "tcp://192.168.0.172:60000" // this should be changed to a available address
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(100 * time.Second)
}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
_, is_exist := DialerMap[TargetUUID]
var err error
var dialer mangos.Dialer
if !is_exist {
dialer, err = sock.NewDialer(TargetAddr, opts)
if err != nil {
} else {
DialerMap[TargetUUID] = &dialer
}
}
dialer = *DialerMap[TargetUUID]
err = dialer.Dial()
if err != nil {
fmt.Println("Dialer fails to dial()", err)
} else {
fmt.Println("Dialer succeeds to dial()")
}
return dialer, err
}
func MyClose(TargetUUID string, TargetAddr string) {
dialerAddr, is_exist := DialerMap[TargetUUID]
if !is_exist {
fmt.Println("Dialer does not exist")
}
dialer := *dialerAddr
err := dialer.Close()
if err != nil {
fmt.Println("dialer fails to close.", err)
} else {
fmt.Println("dialer succeeds to close")
}
}
和控制台输出是
Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed