这是我第一次尝试连接到托管在 AWS 服务器上但不使用任何 Amazon Managed Streaming 服务的 Confluent Kafka 集群。
使用此代码(减去 Config.Net.SASL 设置),我能够毫无问题地连接到旧的自托管集群。
我无法弄清楚我做错了什么。
连接时,我收到此错误:
[sarama] 2021/01/13 08:21:54 Initializing new client
[sarama] 2021/01/13 08:21:54 client/metadata fetching metadata for all topics from broker <redacted URL>
[sarama] 2021/01/13 08:21:54 Failed to connect to broker <redacted URL>: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata got error from broker -1 while fetching metadata: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata no available broker to send metadata request to
[sarama] 2021/01/13 08:21:54 client/brokers resurrecting 1 dead seed brokers
这是我的代码:
package main
/*
#define EXTSRV_IP
#define CENTRAL_IP
#define BILLING_IP
#define GO_ISP
#include <srvindx.h>
#include <rdb_util.c>
*/
import "C"
import (
"context"
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"reflect"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"
"unsafe"
"github.com/Shopify/sarama"
// "github.com/aws/aws-sdk-go/aws/credentials"
// v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
//
_ "github.com/go-sql-driver/mysql"
gl "gocode.prairiesys.com/lib/platform/genlog"
tk "gocode.prairiesys.com/lib/platform/token"
)
// StatusNew Const
const StatusNew = 1
// StatusConnected Const
const StatusConnected = 2
// StatusFinished Const
const StatusFinished = 7
// StatusError Const
const StatusError = 8
// StatusTerminated Const
const StatusTerminated = 9
// ConditionalData struct
type ConditionalData struct {
UserID sql.NullInt64
MailboxID sql.NullInt64
TemplateID sql.NullInt64
Account1 sql.NullString
Account2 sql.NullString
Block sql.NullString
UpdateDate time.Time
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// KafkaConnectionData struct
type KafkaConnectionData struct {
ID int
Group string
Assignor string
Brokers []string
Topics []string
Version sarama.KafkaVersion
Verbose bool
Running bool
Status int
StatusDescription string
ExternalService string
APIKey string
APISecret string
// CommChannel chan int
// Logged bool
CallID string
}
// Sarama configuration options
var (
wg1 sync.WaitGroup
wg2 sync.WaitGroup
done = false
appCallID string
logLevel string
qpickupErrors int
template string
account string
condData []ConditionalData
kafkaConnData []KafkaConnectionData
sleepCount int
sleepDuration time.Duration
nextRefresh time.Time
dir string
)
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
// defer recovery("Setup")
gl.Log(gl.LVL1, gl.Name, "Consumer Setup()...")
gl.Log(gl.LVL1, gl.Name, "Setup: Claims [%v]", session.Claims())
for key, value := range session.Claims() {
gl.Log(gl.LVL1, gl.Name, "Setup: Claims topic[%s] partitions[%d]", key, value)
}
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
// defer recovery("Cleanup")
gl.Log(gl.LVL1, gl.Name, "Cleanup: Claims [%v]", session.Claims())
for key, value := range session.Claims() {
gl.Log(gl.LVL1, gl.Name, "Setup: Claims topic[%s] partitions[%d]", key, value)
}
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// defer recovery("ConsumeClaim")
gl.Log(gl.LVL1, gl.Name, "ConsumeClaim: Topic [%s], Partition [%d]", claim.Topic(), claim.Partition())
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
gl.Log(gl.LVL1, gl.Name, "Message: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
callID := C.GoString(C.strange_name())
buildBlockFromJSON(message, callID)
session.MarkMessage(message, fmt.Sprintf("CSGIM:%s", callID))
}
return nil
}
func buildBlockFromJSON(message *sarama.ConsumerMessage, callID string) {
defer recovery("buildBlockFromJSON")
// defer gl.Leave(gl.Enter())
var err error
var block string
var temp string
block, err = tk.BtokInit(tk.BLK_TYP_UNKNOWN, 4096)
if err != nil {
gl.Log(gl.LVL1, gl.Name, "failed to initialize block, err[%s]", err.Error())
} else {
temp = fmt.Sprintf("%v", message.Timestamp)
block, err = tk.BtokAdd(block, "MessageTimestamp", tk.FLD_STRING, temp)
temp = fmt.Sprintf("%v", message.BlockTimestamp)
block, err = tk.BtokAdd(block, "MessageBlockTimestamp", tk.FLD_STRING, temp)
block, err = tk.BtokAdd(block, "MessageTopic", tk.FLD_STRING, message.Topic)
block, err = tk.BtokAdd(block, "MessagePartition", tk.FLD_STRING, fmt.Sprintf("%d", message.Partition))
block, err = tk.BtokAdd(block, "MessageOffset", tk.FLD_STRING, fmt.Sprintf("%d", message.Offset))
var anyJSON map[string]interface{}
if err = json.Unmarshal(message.Value, &anyJSON); err != nil {
gl.Log(gl.LVL1, gl.Name, "FAILURE: json.Unmarshal error [%s]", err.Error())
} else if block, err = rangeMap(block, anyJSON, ""); err != nil {
gl.Log(gl.LVL1, gl.Name, "FAILURE: rangeMap error [%s]", err.Error())
} else if err := sendToExternalService(block, callID); err != nil {
gl.Log(gl.LVL1, gl.Name, "FAILURE: sendToExternalService error [%s]", err.Error())
} else {
gl.Log(gl.LVL1, gl.Name, "MWS DEBUG: anyJSON [%v]", anyJSON)
}
}
}
func findGroupInList(group string, list []KafkaConnectionData) int {
defer recovery("findGroupInList")
// defer gl.Leave(gl.Enter())
response := -1
for idx, kafConData := range list {
if group == kafConData.Group {
response = idx
break
}
}
return response
}
func findStringInList(item string, list []string) (bool, error) {
defer recovery("findStringInList")
// defer gl.Leave(gl.Enter())
response := false
var err error
for _, str := range list {
if item == str {
response = true
break
}
}
if !response {
err = errors.New("Group not found in Conditional Data")
} else {
err = nil
}
return response, err
}
func init() {
defer recovery("init")
if !initGenLog() {
gl.Log(gl.All, 101, "FAILURE: genlog initialization")
os.Exit(11)
} else {
gl.Log(gl.LVL1, gl.Name, "SUCCESS: genlog initialization")
}
if !initArgs() {
gl.Log(gl.All, 101, "FAILURE: argument initialization")
os.Exit(12)
} else {
gl.Log(gl.All, 101, "SUCCESS: argument initialization")
}
if !initConfigurations() {
gl.Log(gl.All, 101, "FAILURE: configuration initialization")
os.Exit(13)
} else {
gl.Log(gl.All, 101, "SUCCESS: configuration initialization")
}
if !initSrv() {
gl.Log(gl.All, 101, "FAILURE: srv initialization")
os.Exit(14)
} else {
gl.Log(gl.LVL1, gl.Name, "SUCCESS: srv initialization")
}
gl.Log(gl.All, 101, "SUCCESS: initialization complete")
}
func initArgs() bool {
defer recovery("initArgs")
// defer gl.Leave(gl.Enter())
gl.Log(gl.LVL1, gl.Name, "Initialize Arguments Begin")
flag.StringVar(&logLevel, "L", "", "Log level")
flag.StringVar(&template, "template", "", "USER template to query on for connection data")
flag.StringVar(&account, "account", "", "USER account to query on for connection data")
gl.Log(gl.LVL2, gl.Name, "Begin flag.Parse()")
flag.Parse()
gl.Log(gl.LVL2, gl.Name, "End flag.Parse()")
if flag.Parsed() {
gl.Log(gl.LVL1, gl.Name, "logLevel [%s]", logLevel)
gl.Log(gl.LVL1, gl.Name, "template [%s]", template)
gl.Log(gl.LVL1, gl.Name, "account [%s]", account)
} else {
gl.Log(gl.LVL1, gl.Name, "flags not parsed properly")
}
if len(template) == 0 {
gl.Log(gl.LVL1, gl.Name, "required argument [-template] missing")
panic("no user template defined, please set the -template flag")
}
if len(account) == 0 {
gl.Log(gl.LVL1, gl.Name, "required argument [-account] missing")
panic("no user account defined, please set the -account flag")
}
gl.Log(gl.LVL1, gl.Name, "Initialize Arguments Completed")
return true
}
func initConfigurations() bool {
defer recovery("initConfigurations")
// defer gl.Leave(gl.Enter())
// CODE REDACTED
return response
}
func initGenLog() bool {
if !gl.Init() {
gl.Log(gl.LVL1, gl.Name, "Error: Failed to initialize logging")
return false
}
appCallID = C.GoString(C.strange_name())
gl.ID(gl.File2|gl.File3, nil, nil)
gl.Conf(gl.File2, gl.NoChg, gl.NoChgPtr, gl.NoChg, gl.NoChg, gl.NoChg, gl.NoChg, gl.Date2Def, nil)
gl.ID(gl.File2|gl.File3, gl.SToCS(appCallID), nil)
gl.Log(gl.LVL3, gl.Name, "Genlog Init Completed")
return true
}
func initSrv() bool {
var vt C.int
defer recovery("initSrv")
// defer gl.Leave(gl.Enter())
// Get vterm
if vt = C.get_vterm_id(C.TRUE); vt < 0 {
gl.Log(gl.LVL1, gl.Name, "Error: Failed to get vterm")
return false
}
if C.srv_init(nil, 0) < 0 {
gl.Log(gl.LVL1, gl.Name, "Error: Failed srv init")
return false
}
return true
}
func logKafkaConnectionData(kcd KafkaConnectionData, desc string) {
defer recovery("logKafkaConnectionData")
// defer gl.Leave(gl.Enter())
// CODE REDACTED
}
func main() {
defer recovery("main")
// defer gl.Leave(gl.Enter())
var t1 time.Time
var t2 time.Time
t1 = time.Now()
for done == false {
t2 = time.Now()
if time.Duration(t2.Sub(t1)) > (time.Duration(2) * time.Second) {
for idx := range kafkaConnData {
if kafkaConnData[idx].Running == false {
if kafkaConnData[idx].Status == StatusNew {
kafkaConnData[idx].Running = true
kafkaConnData[idx].Status = StatusConnected
wg1.Add(1)
go saramaConsumer(&kafkaConnData[idx])
}
}
}
t1 = time.Now()
}
}
wg1.Wait()
gl.Log(gl.LVL1, gl.Name, "Exiting application normally")
os.Exit(0)
}
func rangeMap(block string, myMap map[string]interface{}, prefix string) (string, error) {
defer recovery("rangeMap")
// defer gl.Leave(gl.Enter())
// CODE REDACTED
return block, nil
}
func recovery(input string) {
// defer gl.Leave(gl.Enter())
if err := recover(); err != nil {
gl.Log(gl.All, gl.Name, "%s, Exception: %v\n%s\n", input, err, debug.Stack())
os.Exit(31)
}
}
func saramaConsumer(kcd *KafkaConnectionData) {
id := kcd.ID
defer recovery("saramaConsumer")
defer wg1.Done()
gl.Log(gl.LVL1, gl.Name, "ID[%d]. Starting a new Sarama consumer", id)
if kcd.Verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = kcd.Version
switch kcd.Assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
gl.Log(gl.LVL1, gl.Name, "ID[%d]. Unrecognized consumer group partition assignor [%s]", id, kcd.Assignor)
log.Panicf("Unrecognized consumer group partition assignor: %s", kcd.Assignor)
}
// config.Net.TLS.Enable = false
// config.Net.SASL.Handshake = false
// config.Net.SASL.AuthIdentity = kcd.APIKey
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = kcd.APIKey
config.Net.SASL.Password = kcd.APISecret
config.ClientID = "CSGIM"
config.Consumer.Offsets.Initial = sarama.OffsetOldest
if err := config.Validate(); err != nil {
gl.Log(gl.LVL1, gl.Name, "Configuration validation failed, err [%s]", err)
} else {
gl.Log(gl.LVL1, gl.Name, "Configuration validation good, config [%v]", config)
}
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(kcd.Brokers, kcd.Group, config)
if err != nil {
gl.Log(gl.LVL1, gl.Name, "ID[%d]. Error creating consumer group client [%s]", id, err)
log.Panicf("Error creating consumer group client: %v", err)
}
wg2.Add(1)
go func(kcd *KafkaConnectionData) {
defer wg2.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
dir = kcd.ExternalService
if err := client.Consume(ctx, kcd.Topics, &consumer); err != nil {
gl.Log(gl.LVL1, gl.Name, "Error from consumer: %s", err)
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
gl.Log(gl.LVL1, gl.Name, "ID[%d]. Error from context: %s", kcd.ID, ctx.Err())
return
}
consumer.ready = make(chan bool)
}
}(kcd)
<-consumer.ready // Await till the consumer has been set up
gl.Log(gl.LVL1, gl.Name, "ID[%d]. Sarama consumer up and running!...", id)
sigterm := make(chan os.Signal, 1)
// signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGTSTP)
select {
case <-ctx.Done():
done = true
gl.Log(gl.LVL1, gl.Name, "ID[%d]. terminating: context cancelled", id)
case <-sigterm:
done = true
gl.Log(gl.LVL1, gl.Name, "ID[%d]. terminating sarama: via signal", id)
}
cancel()
wg2.Wait()
if err = client.Close(); err != nil {
gl.Log(gl.LVL1, gl.Name, "ID[%d]. Error closing client [%s]", err)
log.Panicf("Error closing client: %s", err)
}
}
func sendToExternalService(block string, callID string) error {
defer recovery("sendToExternalService")
// CODE REDACTED
return err
}
// sToCS converts a Go string to a C string
func sToCS(s string) *C.char {
defer recovery("sToCS")
// make slice of bytes from string
b := make([]byte, len(s)+1)
copy(b[:], s)
return (*C.char)(unsafe.Pointer(&b[0]))
}