我正在从运行的主 mongodb 实例中以 json 格式存储 oplog
[{"Timestamp":6477723955623886852,"HistoryID":166676398345289971,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9e8489535b1848d32d","num":9795},"QueryObject":null},{"Timestamp":6477723955623886853,"HistoryID":334415645635509829,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9e8489535b1848d32e","num":6183},"QueryObject":null},{"Timestamp":6477723959918854145,"HistoryID":-8753067027179338671,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9f8489535b1848d32f","num":6439},"QueryObject":null},{"Timestamp":6477723959918854146,"HistoryID":4170487939316826112,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9f8489535b1848d330","num":5699},"QueryObject":null},{"Timestamp":6477723959918854147,"HistoryID":-869729083312840671,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9f8489535b1848d331","num":8779},"QueryObject":null}]
但是我无法找到任何方法将这些操作日志推送到另一个主要的 mongodb 实例中并重播它们,以便可以同步两个数据库
这是我为重放操作日志而编写的代码
package operations
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"reflect"
"time"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type OlogQuery struct {
Session *mgo.Session
Query bson.M
Timeout time.Duration
}
type Olog struct {
ts bson.MongoTimestamp `bson:"ts"`
h float64 `bson:"h"`
v int `bson:"v"`
op string `bson:"op"`
ns string `bson:"ns"`
o bson.M `bson:"o"`
o2 bson.M `bson:"o2"`
}
type Obj struct {
data map[string]Olog
}
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
var Written = make(chan bool, 1)
func ReplayOpLogs(dest *mgo.Session) bool {
dirExists, _ := exists("oplogs")
if !dirExists {
fmt.Println("There are no oplogs to replay")
return false
}
files, err := ioutil.ReadDir("./oplogs")
if err != nil {
log.Fatal(err)
}
for _, f := range files {
raw, err := ioutil.ReadFile("./oplogs/" + f.Name())
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
var ol []map[string]interface{}
//var ol interface{}
if err := json.Unmarshal([]byte(raw), &ol); err != nil {
panic(err)
}
for x := 0; x < len(ol); x++ {
v := reflect.ValueOf(ol[x])
if v.Kind() == reflect.Map {
for key := range ol[x] {
var temp Olog
switch key {
case "Timestamp":
temp.ts, _ = ol[x][key].(bson.MongoTimestamp)
case "HistoryID":
temp.h, _ = ol[x][key].(float64)
case "MongoVersion":
temp.v, _ = ol[x][key].(int)
case "Operation":
temp.op, _ = ol[x][key].(string)
case "NameSpace":
temp.ns, _ = ol[x][key].(string)
case "Object":
temp.o, _ = ol[x][key].(bson.M)
case "QueryObject":
temp.o2, _ = ol[x][key].(bson.M)
}
var olq OlogQuery
olq.Session = dest //Destination Session
olq.Query = bson.M{"ts": bson.M{"$gt": LastTime(dest)}}
olq.Timeout = 1 * time.Second
db := olq.Session.DB("local")
collection := db.C("oplog.rs")
b, err := bson.Marshal(temp)
if err != nil {
panic(err)
}
collection.Insert(b)
collection.Find(olq.Query).LogReplay()
}
}
}
}
return true
}