0

我想用nanomsg/nng作为一个完全分布式的点对点多节点网络的通信基础,帮助构建拓扑发现和维护的动态能力。现在我陷入了它的 Golang 包mangos中。

在 Python 和 pynng(它是 nanomsg 的 python 绑定)中已经完成了相同的工作,但是当我使用 Go 并通过 mangos 调用相应的方法时,它们的行为完全不同。谜题主要有三方面:

  1. bus-type-Socket 的 Recv() 默认情况下以阻塞模式运行,似乎不能配置为非阻塞模式。文件说:

OptionRecvDeadline 是直到下一个 Recv 超时的时间。该值是时间。持续时间。可以传递零值以指示不应应用超时。负值表示非阻塞操作。默认情况下没有超时。

我相应地尝试了一个负值,但Recv()仍然阻塞。我还应该做什么?以及如何理解“零超时”和“非阻塞”之间的区别?

  1. dialer返回的 by似乎(s *socket) NewDialer(...)在调用之后会徘徊dialer.Close(),因为调用下一个报告时会发生错误,dialer.Dial()它仍然是“正在使用的地址”。但是当我再次尝试Close()dialer,会发生错误并报告它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true                    // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond  // or zero 
opts[mangos.OptionKeepAliveTime] = time.Millisecond     // or even smaller
opts[mangos.OptionKeepAlive] = false                    // or true

当我想完全杀死拨号器,或者想在一段时间后重用“伪关闭”拨号器时,我应该怎么做?

  1. 总线类型的套接字Send()很奇怪。通常每个节点都应该定期在我的代码中发送一条消息。我从网络中关闭了一个节点(比如“Node-X”)的物理连接,使其离线一段时间,然后将其重新连接到网络。我发现 Node-X 重新连接后会立即重新发送大量消息。但我真正期望的是 Node-X 可以将这些消息发送到空中,即使它没有邻居。

我想知道是否有任何方法可以解决这些问题。我想它可能缺少一些选项或配置,但我没有弄清楚它们。

以下代码用于重现重新拨号和重新关闭错误。

package main

import (
    "fmt"
    "os"
    "time"

    "go.nanomsg.org/mangos/v3"
    "go.nanomsg.org/mangos/v3/protocol/bus"

    // register transports
    _ "go.nanomsg.org/mangos/v3/transport/all"
)

var (
    sock      mangos.Socket
    DialerMap map[string]*mangos.Dialer
    opts      map[string]interface{}
)

func main() {
    var err error
    opts = make(map[string]interface{})
    opts[mangos.OptionDialAsynch] = true
    opts[mangos.OptionMaxReconnectTime] = time.Millisecond
    // opts[mangos.OptionKeepAliveTime] = time.Millisecond
    opts[mangos.OptionKeepAlive] = false
    DialerMap = make(map[string]*mangos.Dialer)

    if sock, err = bus.NewSocket(); err != nil {
        fmt.Println("bus.NewSocket error. ", err)
        os.Exit(1)
    }
    TargetUUID := "node-A"
    TargetAddr := "tcp://192.168.0.172:60000"   // this should be changed to a available address
    MyDial(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyDial(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID, TargetAddr)
    time.Sleep(100 * time.Second)

}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
    _, is_exist := DialerMap[TargetUUID]
    var err error
    var dialer mangos.Dialer
    if !is_exist {
        dialer, err = sock.NewDialer(TargetAddr, opts)
        if err != nil {
        } else {
            DialerMap[TargetUUID] = &dialer
        }
    }
    dialer = *DialerMap[TargetUUID]
    err = dialer.Dial()
    if err != nil {
        fmt.Println("Dialer fails to dial()", err)
    } else {
        fmt.Println("Dialer succeeds to dial()")
    }

    return dialer, err
}

func MyClose(TargetUUID string, TargetAddr string) {
    dialerAddr, is_exist := DialerMap[TargetUUID]
    if !is_exist {
        fmt.Println("Dialer does not exist")
    }
    dialer := *dialerAddr
    err := dialer.Close()

    if err != nil {
        fmt.Println("dialer fails to close.", err)
    } else {
        fmt.Println("dialer succeeds to close")
    }

}

和控制台输出是

Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
4

2 回答 2

1

我通常不会针对此类问题监控 stackoverflow 或 reddit —— 我们确实有一个不和谐频道(来自 mangos 和 NNG 主页的链接),以及一个邮件列表。

话虽如此,让我看看我是否可以提供帮助(我是 NNG 和 mangos 的作者):

  1. 总线支持 OptionRecvDeadline。但是,您是正确的,它不支持具有负值的非阻塞模式,而是将负值视为零,并充当阻塞。这是一个文档错误。要实现逻辑非阻塞,请使用值“1”,表示一纳秒,这在逻辑上等同于非阻塞,尽管粒度可能会受到调度程序延迟的限制。(在这种情况下,这就像做一个“关闭(通道); <-channel” - 几乎是非阻塞的。

我会看到有关修复文档的信息。

  1. 在拨号器上调用 Close() 是正确的做法。它会一直持续到管道关闭,它会自动关闭。您使用非常短的重拨时间可能会混淆这一点 - 老实说,我没有考虑过微小的重拨时间 - 通常这样做是不好的形式,因为这意味着如果对等方不是可用您的代码将在尝试重新连接的处理器上快速旋转。我通常建议至少10 毫秒的重试间隔上限。(mangos.OptionMaxReconnectTime)

  2. 我想你看到了排队的效果,但我不是 100% 确定——我需要看到一个测试用例重现这个。当然,总线协议是尽力而为的传递,如果没有连接的对等点,则消息将被丢弃。(刚刚重新检查以确定。)

于 2021-06-30T14:33:17.157 回答
0

感谢 @ 的回复Garrett D'Amore,我现在可以用另一种方式解决我的问题,并且我(作为一个对底层通信层知之甚少的 Golang 新粉丝)想为这样一个简单而愚蠢的问题给您带来困扰表示歉意。

作者很好地回答了问题(1)。

问题(3)可能与问题(2)结合在一起,因为作者将机制描述如下,从而消除了发送缓冲累积的可能性。

当然,总线协议是尽力而为的传递,如果没有连接的对等点,则消息将被丢弃。(刚刚重新检查以确定。)

问题(2),我第一次尝试设置mangos.OptionMaxReconnectTime100 ms但问题仍然存在。第二次,我尝试了各种options组合来配置套接字和拨号器,但尝试也失败了。

最后,由于作者指出

在拨号器上调用 Close() 是正确的做法。它会一直持续到管道关闭,它会自动关闭。您使用非常短的重拨时间可能会混淆这一点。

我转向另一种关闭旧拨号程序的方法,即显式关闭它拥有的所有管道。为此,可以定义一个回调处理程序,如

var pipe_c chan
func callbackHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
    pAddr := &pipe
    pipe_c <- pAddr
}

然后将 callbackHandler 附加到套接字

sock.SetPipeEventHook(callbackHandler)

通过这样做,用户可以获得(private var)管道。当一个人想关闭拨号连接时,他或她可以这样做

dialer.Close()                    // try best to close a dialer automatically
for pAddr, num := range pipeSet {
    (*pAddr).Close()              // explicitly close all the pipes of the dialer
}

并且只留下“伪关闭”拨号器。当人们想再次连接到远程地址时,可以创建和使用一个新的拨号器。

我不知道旧的“伪关闭”拨号器是否会在内存中累积。但这已经是我能找到的唯一解决方案。

于 2021-07-01T04:15:04.123 回答