1

我正在编写一个快速写入 mongodb 的应用程序。mongodb 和 mgo 处理速度太快。我的问题是,有没有办法让我确定 mongo 无法跟上并开始阻塞?但我也不想不必要地阻塞。这是一个模拟问题的代码示例:

package main

import (
  "labix.org/v2/mgo"
  "time"
  "fmt"
)

// in database name is a string and age is an int

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("Could not insert into database")
  }
}

func main() {
  session, _ := mgo.Dial("localhost:27017")
  bob := Person{Name : "Robert", Pet : Dog{}}
  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(session, bob)
  }
}

我经常收到如下错误:

panic: Could not insert into database

或者

panic: write tcp 127.0.0.1:27017: i/o timeout
4

2 回答 2

6

我怀疑如果您允许 Go 使用多个线程Copy() 然后 Close()您的会话,您将获得更好的性能。

要回答您的问题,这可能是频道的完美用例。在一个 goroutine 中将项目输入通道并在另一个 goroutine 中使用它们/将它们写入 Mongo。您可以调整通道的大小以满足您的需要。一旦通道已满,生产者线程将在尝试发送给它时阻塞。

您可能还想使用Safe()方法设置。设置 W:0 将使 Mongo 处于“即发即弃”模式,这将大大提高性能,但可能会丢失一些数据。您还可以更改超时时间。

于 2014-01-25T05:27:30.993 回答
0

我还没有测试过,但我认为这段代码应该可以工作。长时间保持会话后,我遇到了这个问题,以便我有计时器每隔一定时间更新会话。

package main

import (
  "gopkg.in/mgo.v2"
  "time"
  "fmt"
)

// in database name is a string and age is an int

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("Could not insert into database")
  }
}

func main() {
  current_session, _ := mgo.Dial("localhost:27017")
  using_session := current_session
  bob := Person{Name : "Robert", Pet : Dog{}}

  /*
  * this technical to prevent connect timeout after long time connection on mongodb from golang session
  * Idea is simple: the session will be renew after certain time such as 1 hour
  */
  //ticker := time.NewTicker(time.Hour * 1)

  //Set 10 seconds for test
  ticker := time.NewTicker(time.Second * 10)

  go func() {

    for t := range ticker.C {
      fmt.Println("Tick at", t)
      new_session := current_session.Copy()
      fmt.Printf("Current session here %p\n", current_session)
      fmt.Printf("New session here %p\n", new_session)
      using_session = new_session
      //setTimeout 30 second before close old sesion, to make sure current instance use current connection isn't affect
      //time.AfterFunc(time.Second * 30, func() { 

      //Set 2 seconds for test
      time.AfterFunc(time.Second * 2, func() { 

        //close previous session

        current_session.Close()
        current_session = new_session

        //assign to new session

      })

    }
  }()

  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(using_session, bob)
  }

}
于 2016-03-28T05:36:54.617 回答