我正在尝试使用 Golang 和 Gocqlx 在 Cassandra 中执行多分区 BATCH。
我的问题是我无法让我的 BATCH 语句正常工作。
我的目标如下:
1. Perform an INSERT to second_table.
2. Perform an UPDATE to first_table.
3. Batch the above 2 operations because they need to be all-or-nothing.
我在 Cassandra 中有以下表结构:
keyspace: mykeyspace
first_table:
uuid
second_table_uuid <-- serves as a link to second_table
create_timestamp
update_timestamp
second_table:
uuid
first_table_uuid <-- serves as a link to first_table
create_timestamp
update_timestamp
下面是我的代码:
func InsertToSecondTable(payload payload.SecondTable, session *gocql.Session) (string, error) {
// Make sure payload UUID exists on `first_table`.
firstTableSelectStmt, firstTableSelectNames := qb.Select("mykeyspace.first_table").
Where(qb.Eq("uuid")).
ToCql()
var firstTableRow table.FirstTable
q := gocqlx.Query(session.Query(firstTableSelectStmt), firstTableSelectNames).BindMap(qb.M{
"uuid": payload.UUID,
})
// Will return a `not found` error if no matches are found.
if err := q.GetRelease(&firstTableRow); err != nil {
return "", err
}
// Prepare INSERT to `second_table` query
now := time.Now()
uuid, _ := gocql.RandomUUID()
var secondTableRow table.SecondTable
secondTableRow.UUID = uuid
secondTableRow.CreateTimestamp = now
secondTableRow.UpdateTimestamp = now
// Handle UUIDs.
secondTableRow.FirstTableUUID, _ = getUUIDFromString(payload.UUID)
secondTableInsertStmt, secondTableInsertNames := qb.Insert("mykeyspace.second_table").
Columns("uuid", "first_table_uuid", "create_timestamp", "update_timestamp").
ToCql()
// Prepare UPDATE on `first_table` query.
firstTableUpdateStmt, firstTableUpdateNames := qb.Update("mykeyspace.first_table").
Set("second_table_uuid", "update_timestamp").
Where(qb.Eq("uuid")).
ToCql()
// Start a Batch.
// This is because we want the INSERT and UPDATE to be all-or-nothing.
finalStmt, finalNames := qb.Batch().AddStmt(secondTableInsertStmt, secondTableInsertNames).
AddStmt(firstTableUpdateStmt, firstTableUpdateNames).
ToCql()
// Had to change the anonymous struct field names so they don't conflict.
// Use DB tags to match the INSERT and UPDATE statements.
batchStruct := struct {
// second_table
SecondTableUUIDPK gocql.UUID `db:"uuid"`
FirstTableUUID gocql.UUID `db:"first_table_uuid"`
SecondTableCreateTimestamp time.Time `db:"create_timestamp"`
SecondTableUpdateTimestamp time.Time `db:"update_timestamp"`
// first_table
FirstTableUUIDPK gocql.UUID `db:"uuid"`
SecondTableUUID gocql.UUID `db:"second_table_uuid"`
FirstTableTimestamp time.Time `db:"create_timestamp"`
FirstTableTimestamp time.Time `db:"update_timestamp"`
}{
// second_table
SecondTableUUIDPK: secondTableRow.UUID,
FirstTableUUID: secondTableRow.FirstTableUUID,
SecondTableCreateTimestamp: secondTableRow.CreateTimestamp,
SecondTableUpdateTimestamp: secondTableRow.UpdateTimestamp,
// first_table
MainUUIDPK: firstTableRow.UUID,
AdditionalDataUUID: firstTableRow.SecondTableUUID,
MainCreateTimestamp: firstTableRow.CreateTimestamp,
MainUpdateTimestamp: firstTableRow.UpdateTimestamp,
}
err = gocqlx.Query(session.Query(finalStmt), finalNames).
BindStruct(&batchStruct).
ExecRelease()
if err != nil {
return "", err
}
return uuid.String(), nil
}
这可能是非常明显的事情,但遗憾的是我对 Cassandra 和 Gocqlx 还是陌生的。