0

这是我第一次尝试连接到托管在 AWS 服务器上但不使用任何 Amazon Managed Streaming 服务的 Confluent Kafka 集群。

使用此代码(减去 Config.Net.SASL 设置),我能够毫无问题地连接到旧的自托管集群。

我无法弄清楚我做错了什么。

连接时,我收到此错误:

[sarama] 2021/01/13 08:21:54 Initializing new client
[sarama] 2021/01/13 08:21:54 client/metadata fetching metadata for all topics from broker <redacted URL>
[sarama] 2021/01/13 08:21:54 Failed to connect to broker <redacted URL>: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata got error from broker -1 while fetching metadata: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata no available broker to send metadata request to
[sarama] 2021/01/13 08:21:54 client/brokers resurrecting 1 dead seed brokers

这是我的代码:

package main

/*
#define EXTSRV_IP
#define CENTRAL_IP
#define BILLING_IP
#define GO_ISP

#include <srvindx.h>
#include <rdb_util.c>
*/
import "C"

import (
    "context"
    "database/sql"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "reflect"
    "runtime/debug"
    "strconv"
    "sync"
    "syscall"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    // "github.com/aws/aws-sdk-go/aws/credentials"
    // v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
    //
    _ "github.com/go-sql-driver/mysql"

    gl "gocode.prairiesys.com/lib/platform/genlog"
    tk "gocode.prairiesys.com/lib/platform/token"
)

// StatusNew Const
const StatusNew = 1

// StatusConnected Const
const StatusConnected = 2

// StatusFinished Const
const StatusFinished = 7

// StatusError Const
const StatusError = 8

// StatusTerminated Const
const StatusTerminated = 9

// ConditionalData struct
type ConditionalData struct {
    UserID     sql.NullInt64
    MailboxID  sql.NullInt64
    TemplateID sql.NullInt64
    Account1   sql.NullString
    Account2   sql.NullString
    Block      sql.NullString
    UpdateDate time.Time
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}

// KafkaConnectionData struct
type KafkaConnectionData struct {
    ID                int
    Group             string
    Assignor          string
    Brokers           []string
    Topics            []string
    Version           sarama.KafkaVersion
    Verbose           bool
    Running           bool
    Status            int
    StatusDescription string
    ExternalService   string
    APIKey            string
    APISecret         string
    // CommChannel       chan int
    // Logged            bool
    CallID string
}

// Sarama configuration options
var (
    wg1           sync.WaitGroup
    wg2           sync.WaitGroup
    done          = false
    appCallID     string
    logLevel      string
    qpickupErrors int
    template      string
    account       string
    condData      []ConditionalData
    kafkaConnData []KafkaConnectionData
    sleepCount    int
    sleepDuration time.Duration
    nextRefresh   time.Time
    dir           string
)

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
    // defer recovery("Setup")

    gl.Log(gl.LVL1, gl.Name, "Consumer Setup()...")

    gl.Log(gl.LVL1, gl.Name, "Setup: Claims [%v]", session.Claims())
    for key, value := range session.Claims() {
        gl.Log(gl.LVL1, gl.Name, "Setup: Claims topic[%s] partitions[%d]", key, value)
    }
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
    // defer recovery("Cleanup")

    gl.Log(gl.LVL1, gl.Name, "Cleanup: Claims [%v]", session.Claims())
    for key, value := range session.Claims() {
        gl.Log(gl.LVL1, gl.Name, "Setup: Claims topic[%s] partitions[%d]", key, value)
    }

    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // defer recovery("ConsumeClaim")

    gl.Log(gl.LVL1, gl.Name, "ConsumeClaim: Topic [%s], Partition [%d]", claim.Topic(), claim.Partition())
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {
        gl.Log(gl.LVL1, gl.Name, "Message: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        callID := C.GoString(C.strange_name())
        buildBlockFromJSON(message, callID)
        session.MarkMessage(message, fmt.Sprintf("CSGIM:%s", callID))
    }

    return nil
}

func buildBlockFromJSON(message *sarama.ConsumerMessage, callID string) {
    defer recovery("buildBlockFromJSON")
    // defer gl.Leave(gl.Enter())

    var err error
    var block string
    var temp string

    block, err = tk.BtokInit(tk.BLK_TYP_UNKNOWN, 4096)
    if err != nil {
        gl.Log(gl.LVL1, gl.Name, "failed to initialize block, err[%s]", err.Error())

    } else {
        temp = fmt.Sprintf("%v", message.Timestamp)
        block, err = tk.BtokAdd(block, "MessageTimestamp", tk.FLD_STRING, temp)

        temp = fmt.Sprintf("%v", message.BlockTimestamp)
        block, err = tk.BtokAdd(block, "MessageBlockTimestamp", tk.FLD_STRING, temp)

        block, err = tk.BtokAdd(block, "MessageTopic", tk.FLD_STRING, message.Topic)
        block, err = tk.BtokAdd(block, "MessagePartition", tk.FLD_STRING, fmt.Sprintf("%d", message.Partition))
        block, err = tk.BtokAdd(block, "MessageOffset", tk.FLD_STRING, fmt.Sprintf("%d", message.Offset))

        var anyJSON map[string]interface{}

        if err = json.Unmarshal(message.Value, &anyJSON); err != nil {
            gl.Log(gl.LVL1, gl.Name, "FAILURE: json.Unmarshal error [%s]", err.Error())
        } else if block, err = rangeMap(block, anyJSON, ""); err != nil {
            gl.Log(gl.LVL1, gl.Name, "FAILURE: rangeMap error [%s]", err.Error())
        } else if err := sendToExternalService(block, callID); err != nil {
            gl.Log(gl.LVL1, gl.Name, "FAILURE: sendToExternalService error [%s]", err.Error())
        } else {
            gl.Log(gl.LVL1, gl.Name, "MWS DEBUG: anyJSON [%v]", anyJSON)
        }

    }
}

func findGroupInList(group string, list []KafkaConnectionData) int {
    defer recovery("findGroupInList")
    // defer gl.Leave(gl.Enter())

    response := -1

    for idx, kafConData := range list {
        if group == kafConData.Group {
            response = idx
            break
        }
    }

    return response
}

func findStringInList(item string, list []string) (bool, error) {
    defer recovery("findStringInList")
    // defer gl.Leave(gl.Enter())

    response := false
    var err error

    for _, str := range list {
        if item == str {
            response = true
            break
        }
    }

    if !response {
        err = errors.New("Group not found in Conditional Data")
    } else {
        err = nil
    }

    return response, err
}

func init() {
    defer recovery("init")

    if !initGenLog() {
        gl.Log(gl.All, 101, "FAILURE: genlog initialization")
        os.Exit(11)
    } else {
        gl.Log(gl.LVL1, gl.Name, "SUCCESS: genlog initialization")
    }

    if !initArgs() {
        gl.Log(gl.All, 101, "FAILURE: argument initialization")
        os.Exit(12)
    } else {
        gl.Log(gl.All, 101, "SUCCESS: argument initialization")
    }

    if !initConfigurations() {
        gl.Log(gl.All, 101, "FAILURE: configuration initialization")
        os.Exit(13)
    } else {
        gl.Log(gl.All, 101, "SUCCESS: configuration initialization")
    }

    if !initSrv() {
        gl.Log(gl.All, 101, "FAILURE: srv initialization")
        os.Exit(14)
    } else {
        gl.Log(gl.LVL1, gl.Name, "SUCCESS: srv initialization")
    }

    gl.Log(gl.All, 101, "SUCCESS: initialization complete")

}

func initArgs() bool {
    defer recovery("initArgs")
    // defer gl.Leave(gl.Enter())

    gl.Log(gl.LVL1, gl.Name, "Initialize Arguments Begin")

    flag.StringVar(&logLevel, "L", "", "Log level")

    flag.StringVar(&template, "template", "", "USER template to query on for connection data")
    flag.StringVar(&account, "account", "", "USER account to query on for connection data")

    gl.Log(gl.LVL2, gl.Name, "Begin flag.Parse()")
    flag.Parse()
    gl.Log(gl.LVL2, gl.Name, "End   flag.Parse()")

    if flag.Parsed() {
        gl.Log(gl.LVL1, gl.Name, "logLevel [%s]", logLevel)

        gl.Log(gl.LVL1, gl.Name, "template  [%s]", template)
        gl.Log(gl.LVL1, gl.Name, "account   [%s]", account)

    } else {
        gl.Log(gl.LVL1, gl.Name, "flags not parsed properly")

    }

    if len(template) == 0 {
        gl.Log(gl.LVL1, gl.Name, "required argument [-template] missing")
        panic("no user template defined, please set the -template flag")
    }

    if len(account) == 0 {
        gl.Log(gl.LVL1, gl.Name, "required argument [-account] missing")
        panic("no user account defined, please set the -account flag")
    }

    gl.Log(gl.LVL1, gl.Name, "Initialize Arguments Completed")

    return true
}

func initConfigurations() bool {
    defer recovery("initConfigurations")
    // defer gl.Leave(gl.Enter())

    // CODE REDACTED

    return response
}

func initGenLog() bool {

    if !gl.Init() {
        gl.Log(gl.LVL1, gl.Name, "Error: Failed to initialize logging")
        return false
    }

    appCallID = C.GoString(C.strange_name())
    gl.ID(gl.File2|gl.File3, nil, nil)
    gl.Conf(gl.File2, gl.NoChg, gl.NoChgPtr, gl.NoChg, gl.NoChg, gl.NoChg, gl.NoChg, gl.Date2Def, nil)
    gl.ID(gl.File2|gl.File3, gl.SToCS(appCallID), nil)
    gl.Log(gl.LVL3, gl.Name, "Genlog Init Completed")

    return true
}

func initSrv() bool {
    var vt C.int

    defer recovery("initSrv")
    // defer gl.Leave(gl.Enter())

    // Get vterm
    if vt = C.get_vterm_id(C.TRUE); vt < 0 {
        gl.Log(gl.LVL1, gl.Name, "Error: Failed to get vterm")
        return false
    }

    if C.srv_init(nil, 0) < 0 {
        gl.Log(gl.LVL1, gl.Name, "Error: Failed srv init")
        return false
    }

    return true
}

func logKafkaConnectionData(kcd KafkaConnectionData, desc string) {
    defer recovery("logKafkaConnectionData")
    // defer gl.Leave(gl.Enter())

    // CODE REDACTED
}

func main() {
    defer recovery("main")
    // defer gl.Leave(gl.Enter())

    var t1 time.Time
    var t2 time.Time
    t1 = time.Now()

    for done == false {
        t2 = time.Now()

        if time.Duration(t2.Sub(t1)) > (time.Duration(2) * time.Second) {
            for idx := range kafkaConnData {
                if kafkaConnData[idx].Running == false {
                    if kafkaConnData[idx].Status == StatusNew {
                        kafkaConnData[idx].Running = true
                        kafkaConnData[idx].Status = StatusConnected
                        wg1.Add(1)
                        go saramaConsumer(&kafkaConnData[idx])
                    }
                }
            }

            t1 = time.Now()
        }

    }

    wg1.Wait()

    gl.Log(gl.LVL1, gl.Name, "Exiting application normally")
    os.Exit(0)
}

func rangeMap(block string, myMap map[string]interface{}, prefix string) (string, error) {
    defer recovery("rangeMap")
    // defer gl.Leave(gl.Enter())

    // CODE REDACTED
    
    return block, nil
}

func recovery(input string) {
    // defer gl.Leave(gl.Enter())

    if err := recover(); err != nil {
        gl.Log(gl.All, gl.Name, "%s, Exception: %v\n%s\n", input, err, debug.Stack())
        os.Exit(31)
    }
}

func saramaConsumer(kcd *KafkaConnectionData) {
    id := kcd.ID
    defer recovery("saramaConsumer")
    defer wg1.Done()

    gl.Log(gl.LVL1, gl.Name, "ID[%d]. Starting a new Sarama consumer", id)

    if kcd.Verbose {
        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
    }

    /**
     * Construct a new Sarama configuration.
     * The Kafka cluster version has to be defined before the consumer/producer is initialized.
     */
    config := sarama.NewConfig()
    config.Version = kcd.Version

    switch kcd.Assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
    case "roundrobin":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    case "range":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    default:
        gl.Log(gl.LVL1, gl.Name, "ID[%d]. Unrecognized consumer group partition assignor [%s]", id, kcd.Assignor)
        log.Panicf("Unrecognized consumer group partition assignor: %s", kcd.Assignor)
    }

    // config.Net.TLS.Enable = false
    // config.Net.SASL.Handshake = false
    // config.Net.SASL.AuthIdentity = kcd.APIKey
    config.Net.SASL.Enable = true
    config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
    config.Net.SASL.User = kcd.APIKey
    config.Net.SASL.Password = kcd.APISecret
    config.ClientID = "CSGIM"
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    if err := config.Validate(); err != nil {
        gl.Log(gl.LVL1, gl.Name, "Configuration validation failed, err [%s]", err)
    } else {
        gl.Log(gl.LVL1, gl.Name, "Configuration validation good, config [%v]", config)
    }

    /**
     * Setup a new Sarama consumer group
     */
    consumer := Consumer{
        ready: make(chan bool),
    }

    ctx, cancel := context.WithCancel(context.Background())
    client, err := sarama.NewConsumerGroup(kcd.Brokers, kcd.Group, config)
    if err != nil {
        gl.Log(gl.LVL1, gl.Name, "ID[%d]. Error creating consumer group client [%s]", id, err)
        log.Panicf("Error creating consumer group client: %v", err)
    }

    wg2.Add(1)

    go func(kcd *KafkaConnectionData) {
        defer wg2.Done()

        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            dir = kcd.ExternalService
            if err := client.Consume(ctx, kcd.Topics, &consumer); err != nil {
                gl.Log(gl.LVL1, gl.Name, "Error from consumer: %s", err)
                log.Panicf("Error from consumer: %v", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                gl.Log(gl.LVL1, gl.Name, "ID[%d]. Error from context: %s", kcd.ID, ctx.Err())
                return
            }
            consumer.ready = make(chan bool)
        }
    }(kcd)

    <-consumer.ready // Await till the consumer has been set up
    gl.Log(gl.LVL1, gl.Name, "ID[%d]. Sarama consumer up and running!...", id)

    sigterm := make(chan os.Signal, 1)
    // signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGTSTP)
    select {
    case <-ctx.Done():
        done = true
        gl.Log(gl.LVL1, gl.Name, "ID[%d]. terminating: context cancelled", id)
    case <-sigterm:
        done = true
        gl.Log(gl.LVL1, gl.Name, "ID[%d]. terminating sarama: via signal", id)
    }

    cancel()
    wg2.Wait()

    if err = client.Close(); err != nil {
        gl.Log(gl.LVL1, gl.Name, "ID[%d]. Error closing client [%s]", err)
        log.Panicf("Error closing client: %s", err)
    }
}

func sendToExternalService(block string, callID string) error {
    defer recovery("sendToExternalService")
    // CODE REDACTED
    return err
}

// sToCS converts a Go string to a C string
func sToCS(s string) *C.char {
    defer recovery("sToCS")
    // make slice of bytes from string
    b := make([]byte, len(s)+1)
    copy(b[:], s)

    return (*C.char)(unsafe.Pointer(&b[0]))
}
4

0 回答 0