2

在我的用例中,我想持续监听 TCP 连接并接收值。期望值是一个对象。所以我正在使用 gob 解码器从连接中接收值。我想不断地监听连接并使用 go 例程接收对象。我在这里有代码片段[它是应用程序的一部分。代码片段无法编译]。它第一次获得价值,但没有收到后续对象。

func main() {

    //...
    // SOME CODE
    //...


    // All hosts who are connected; a map wherein
    // the keys are ip addreses and the values are
    //net.Conn objects
    allClients := make(map[string]net.Conn)

    tMaps := make(chan map[string]int64)

    for {
            select {
            // Accept new clients
            //
            case conn := <-newConnections:
            log.Printf("Accepted new client, #%s", hostIp)

            // Constantly read incoming messages from this
            // client in a goroutine and push those onto
            // the tMaps channel for broadcast to others.
            //
            go func(conn net.Conn) {
                    dec := gob.NewDecoder(conn)
                    for {
                            var tMap map[string]int64
                            err := dec.Decode(&tMap)
                            if err != nil {
                                    fmt.Println("Error in decoding ", err)
                                    break
                            }
                            log.Printf("Received values: %+v", tMap)
                            //update throttle map based on the received value
                            tMaps <- throttleMap
                    }

            }(conn)
    }
}

有人可以帮我吗?

4

1 回答 1

3

让我们看一下 Go 中 TCP 服务器的基础知识。

首先是“聆听”部分。我们可以这样设置:

package main

import (
    "fmt"
    "io"
    "net"
    "time"
)

func main() {
    ln, err := net.Listen("tcp", ":9000")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        io.WriteString(conn, fmt.Sprint("Hello World\n", time.Now(), "\n"))

        conn.Close()
    }
}

注意无限循环。它总是在该代码上运行和循环。被循环的代码是做什么的?如果正在侦听的端口上有连接,则接受该连接。然后我们对这个连接做一些事情。在这种情况下,我们使用 io.WriteString 写回它。对于这一连接,我们正在发送响应。然后我们关闭连接。如果有更多的连接,我们已经准备好接受它们。

现在让我们创建一个客户端来连接到 TCP 服务器。这称为“拨入”TCP 服务器。

要在您的机器上运行所有这些代码,请运行上面的 TCP 服务器代码。要运行代码,请转到您的终端并输入:go run main.go

现在将下面的代码直接放入另一个文件中。在终端中启动另一个选项卡。也可以通过输入运行该代码: go run main.go

下面“拨入”您的 TCP 服务器的代码将连接到服务器,TCP 服务器将响应,然后关闭连接。

以下是作为客户端拨入 TCP 服务器的代码:

package main

import (
    "fmt"
    "io/ioutil"
    "net"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:9000")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    bs, _ := ioutil.ReadAll(conn)
    fmt.Println(string(bs))

}

我们可以掌握这些基础知识并开始享受乐趣。

让我们创建一个“echo”服务器。

这将说明接受许多连接。

package main

import (
    "io"
    "net"
)

func main() {
    ln, err := net.Listen("tcp", ":9000")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        // handles unlimited connections
        go func() {
            io.Copy(conn, conn)
            conn.Close()
        }()
    }
}

和之前一样运行上面的文件:go run main.go

如果您收到错误,请确保您已关闭我们在上一个示例中运行的 TCP 服务器。在终端中使用 ctrl+c 关闭 TCP 服务器。

现在您的新 TCP 服务器正在运行,让我们使用 Telnet 连接到它。

在windows上你可以安装telnet;在 Mac 上,它应该已经存在。使用此命令运行 telnet 并连接到您的 TCP 服务器:telnet localhost 9000

现在再举一个例子——像 Redis 这样的内存数据库:

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "net"
    "strings"
)

var data = make(map[string]string)

func handle(conn net.Conn) {
    defer conn.Close()

    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        ln := scanner.Text()
        fs := strings.Fields(ln)

        if len(fs) < 2 {
            io.WriteString(conn, "This is an in-memory database \n" +
            "Use SET, GET, DEL like this: \n" +
            "SET key value \n" +
            "GET key \n" +
            "DEL key \n\n" +
            "For example - try these commands: \n" +
            "SET fav chocolate \n" +
            "GET fav \n\n\n")
            continue
        }

        switch fs[0] {
        case "GET":
            key := fs[1]
            value := data[key]
            fmt.Fprintf(conn, "%s\n", value)
        case "SET":
            if len(fs) != 3 {
                io.WriteString(conn, "EXPECTED VALUE\n")
                continue
            }
            key := fs[1]
            value := fs[2]
            data[key] = value
        case "DEL":
            key := fs[1]
            delete(data, key)
        default:
            io.WriteString(conn, "INVALID COMMAND "+fs[0]+"\n")
        }
    }
}

func main() {
    li, err := net.Listen("tcp", ":9000")
    if err != nil {
        log.Fatalln(err)
    }
    defer li.Close()

    for {
        conn, err := li.Accept()
        if err != nil {
            log.Fatalln(err)
        }
        handle(conn)
    }
}

并添加并发:

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "net"
    "strings"
)

type Command struct {
    Fields []string
    Result chan string
}

func redisServer(commands chan Command) {
    var data = make(map[string]string)
    for cmd := range commands {
        if len(cmd.Fields) < 2 {
            cmd.Result <- "Expected at least 2 arguments"
            continue
        }

        fmt.Println("GOT COMMAND", cmd)

        switch cmd.Fields[0] {
        // GET <KEY>
        case "GET":
            key := cmd.Fields[1]
            value := data[key]

            cmd.Result <- value

        // SET <KEY> <VALUE>
        case "SET":
            if len(cmd.Fields) != 3 {
                cmd.Result <- "EXPECTED VALUE"
                continue
            }
            key := cmd.Fields[1]
            value := cmd.Fields[2]
            data[key] = value
            cmd.Result <- ""
        // DEL <KEY>
        case "DEL":
            key := cmd.Fields[1]
            delete(data, key)
            cmd.Result <- ""
        default:
            cmd.Result <- "INVALID COMMAND " + cmd.Fields[0] + "\n"
        }
    }
}

func handle(commands chan Command, conn net.Conn) {
    defer conn.Close()

    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        ln := scanner.Text()
        fs := strings.Fields(ln)

        result := make(chan string)
        commands <- Command{
            Fields: fs,
            Result: result,
        }

        io.WriteString(conn, <-result+"\n")
    }

}

func main() {
    li, err := net.Listen("tcp", ":9000")
    if err != nil {
        log.Fatalln(err)
    }
    defer li.Close()

    commands := make(chan Command)
    go redisServer(commands)

    for {
        conn, err := li.Accept()
        if err != nil {
            log.Fatalln(err)
        }

        go handle(commands, conn)
    }
}

请参阅我在 CSUF 课程中的讲座,在此处描述所有这些。还有一个很棒的资源

于 2015-12-10T16:21:46.420 回答