5

我正在尝试配置从 mongo 副本集的主节点和两个辅助节点读取以提供更好的负载平衡。3 个节点中的每一个都位于具有 IP 地址的不同机器上:ip1、ip2、ip3。

我的GoLang网站,它是martini具有两个 url/insert和的 Web 服务器/get

package main

import (
    "github.com/go-martini/martini"
    "gopkg.in/mgo.v2"
    "gopkg.in/mgo.v2/bson"
    "net/http"
)

const (
    dialStr        = "ip1:port1,ip2:port2,ip3:port3"
    dbName         = "test"
    collectionName = "test"
    elementsCount  = 1000
)

var mainSessionForSave *mgo.Session

func ConnectToMongo() {
    var err error
    mainSessionForSave, err = mgo.Dial(dialStr)
    mainSessionForSave.SetMode(mgo.Monotonic, true)
    if err != nil {
        panic(err)
    }
}

func GetMgoSessionPerRequest() *mgo.Session {
    var sessionPerRequest *mgo.Session
    sessionPerRequest = mainSessionForSave.Copy()
    return sessionPerRequest
}

func main() {
    ConnectToMongo()
    prepareMartini().Run()
}

type Element struct {
    I int `bson:"I"`
}

func prepareMartini() *martini.ClassicMartini {
    m := martini.Classic()
    sessionPerRequest := GetMgoSessionPerRequest()
    m.Get("/insert", func(w http.ResponseWriter, r *http.Request) {
        for i := 0; i < elementsCount; i++ {
            e := Element{I: i}
            err := collection(sessionPerRequest).Insert(&e)
            if err != nil {
                panic(err)
            }
        }
        w.Write([]byte("data inserted successfully"))
    })
    m.Get("/get", func(w http.ResponseWriter, r *http.Request) {
        var element Element
        const findI = 500
        err := collection(sessionPerRequest).Find(bson.M{"I": findI}).One(&element)
        if err != nil {
            panic(err)
        }
        w.Write([]byte("get data successfully"))

    })

    return m
}

func collection(s *mgo.Session) *mgo.Collection {
    return s.DB(dbName).C(collectionName)
}

我使用命令运行这个GoLang站点go run site.go并准备​​我请求的实验http://localhost:3000/insert- 大约一分钟后我的测试数据被插入。

然后我开始测试从辅助节点和主节点的读取attacker.go

package main

import (
    "fmt"
    "time"

    vegeta "github.com/tsenart/vegeta/lib"
)

func main() {

    rate := uint64(4000) // per second
    duration := 4 * time.Second
    targeter := vegeta.NewStaticTargeter(&vegeta.Target{
        Method: "GET",
        URL:    "http://localhost:3000/get",
    })
    attacker := vegeta.NewAttacker()

    var results vegeta.Results
    for res := range attacker.Attack(targeter, rate, duration) {
        results = append(results, res)
    }

    metrics := vegeta.NewMetrics(results)
    fmt.Printf("99th percentile: %s\n", metrics.Latencies.P99)
}

运行它,go run attacker.go我每秒请求 URL http://localhost:3000/get 4000次。当攻击者工作时,我打开了我所有的 3 台服务器并运行htop命令来观察资源消耗。PRIMARY 节点显示它处于高负载状态,CPU 大约为 80%。中学很平静。

为什么?

正如我用mgo.Monotonic...

mainSessionForSave.SetMode(mgo.Monotonic, true)

...我希望从所有节点读取:ip1, ip2, ip3并且我希望在相同负载和相同 CPU 消耗下观察所有节点。但事实并非如此。我配置错了什么?实际上mgo.Monotonic在我的情况下不起作用,我只从PRIMARY节点读取。

4

2 回答 2

5

sessionPerRequest仅创建一次:prepareMartini在服务器启动时调用,然后sessionPerRequest设置。传递给m.Get()访问该变量的闭包。然后,在第一次写入之后(在您的测试设置期间),mgo将只访问主要的

如果可能,单调一致性将从 slave 开始读取,以便更好地分配负载,并且一旦发生第一次写入,连接就会切换到 master

(如果mgo只是在写入主节点后继续从辅助节点读取,则读取不一定反映您刚刚进行的写入,这可能会很痛苦。切换到主节点只会让您获得比您从主节点获得的数据更新的数据次要的,永远不会旧的,它保持单调性。无论如何,这就是它理想的工作方式;有关更多信息,请参阅下面的“未解决问题”链接。)

解决方案是将创建会话推送到您的处理程序中,例如,删除sessionPerRequest并在每个处理程序顶部放置一些明确的内容,例如

coll := mainSessionForSave.Copy().DB(dbName).Collection(collName)

所有一致性承诺都应该根据MongoDB 一致性的开放性问题来阅读:现在,在网络分区期间,读取可以看到旧数据和稍后将回滚的写入,即使mgo是在尝试从主数据库读取时也是如此。(比较和设置没有这个问题,但当然这是一个更大更慢的操作。)同样值得仔细阅读这篇文章只是为了讨论一致性级别和描述不同的数据库行为可能如何体现在应用程序的终端用户。

于 2015-05-02T02:54:03.920 回答
0

使用后忘记关闭连接:

defer mainSessionForSave.Close()

可能,这可能是一个原因。

PS确保所有节点都可用:)

于 2015-04-29T06:43:15.353 回答