0

我在 cockroachdb (postgres db) 中有带有 lambda 通知的 minio/s3 对象存储。我正在尝试使用以下 golang 代码监视这些事件。

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "github.com/lib/pq"
    "time"
)

const (
    //crdbConnectStr = "dbname=alerts user=crdbuser1 host=localhost port=26257 sslmode=disable connect_timeout=5"
    crdbConnectStr = "postgres://crdbuser1@localhost:26257/alerts?sslmode=disable"
    dbDriver       = "postgres"
)

func monitorEvents() {

    _, err := sql.Open(dbDriver, crdbConnectStr)
    if err != nil {
        fmt.Printf("connection open to crdb failed - %v\n", err.Error())
    }

    fmt.Printf("sql open on crdb OK\n")

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Printf("NewListener - event : %v, err - %v\n", ev, err.Error())
        }
    }

    minReconnect := 2 * time.Second
    maxReconnect := 20 * time.Second
    listener := pq.NewListener(crdbConnectStr, minReconnect, maxReconnect, reportProblem)
    err = listener.Listen("monitor")
    if err != nil {
        fmt.Printf("Listen error - %v\n", err.Error())
        return
    }

    fmt.Printf("begin monitoring events in CRDB\n")

    for {
        waitForAlertEvents(listener)
    }
}

// Record holds json data from object.
type Record struct {
    Data struct {
        Value struct {
            Records []struct {
                S3 struct {
                    Bucket struct {
                        Name string `json:"name"`
                    } `json:"bucket"`
                    Object struct {
                        Key string `json:"key"`
                    } `json:"object"`
                } `json:"s3"`
            } `json:"Records"`
        } `json:"value"`
    } `json:"data"`
}

func waitForAlertEvents(l *pq.Listener) {

    for {
        select {
        case n := <-l.Notify:
            fmt.Printf("Received data from channel [%v]\n", n.Channel)
            // Prepare notification payload for pretty print
            fmt.Println(n.Extra)
            record := Record{}

            jerr := json.Unmarshal([]byte(n.Extra), &record)
            if jerr != nil {
                fmt.Println("Error processing JSON: ", jerr)
                return
            }

            bucket := record.Data.Value.Records[0].S3.Bucket.Name
            object := record.Data.Value.Records[0].S3.Object.Key
            fmt.Printf("received event on bucket: %v, object: %v\n", bucket, object)

            return

        case <-time.After(60 * time.Second):
            fmt.Println("Received no events for 90 seconds, checking connection")
            go func() {
                l.Ping()
            }()
            return
        }
    }
}

func main() {
    monitorAlerts()
}

当我运行这个程序时,我看到以下错误并且卡住了。

[root]# ./alerts 
sql open on crdb OK
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"

手动连接到 cockroachdb 可以正常工作。

[root]# cockroach sql --insecure --user=crdbuser1

crdbuser1@:26257/defaultdb> show databases;                                                                                                               database_name  
+---------------+
  alerts         
(1 row)

Time: 1.22359ms

crdbuser1@:26257/defaultdb> set database=alerts;
SET

Time: 363.994µs

crdbuser1@:26257/alerts> show tables;
  table_name  
+------------+
  alertstab   
(1 row)

Time: 1.399014ms

crdbuser1@:26257/alerts> 

任何想法为什么会出错pq: syntax error at or near "listen"。我也在查看 pq 来源,该错误很可能与notify.go#L756有关

4

1 回答 1

1

该错误表明 CockroachDB 不支持LISTENandNOTIFY语句。

您将需要找到一种不同的方法来执行此操作。CRDB 中最接近的东西是Change Data Capture,但这更多是关于数据流而不是自定义通知。

你可以在这个 issue中找到一些关于LISTEN/ NOTIFYfor CRDB 的讨论,但目前还没有确定的计划。

于 2019-12-04T06:22:20.003 回答