1

下面是我从需要刷新令牌的 Kafka 消费的代码。我有一个返回 jwt 令牌的函数。但是如何让 go 客户端刷新呢?当我运行时,我没有收到任何错误,但我也没有收到流。

Created Consumer rdkafka#consumer-1
Ignored OAuthBearerTokenRefresh

我试图从这个链接模仿https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"reflect"
"syscall"

"github.com/confluentinc/confluent-kafka-go/kafka"
"golang.org/x/oauth2/clientcredentials"
)

func handleOAuthBearerTokenRefreshEvent(client kafka.Handle, e kafka.OAuthBearerTokenRefresh) {
oauthBearerToken, retrieveErr := retrieveUnsecuredToken(e)
if retrieveErr != nil {
    fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr)
    client.SetOAuthBearerTokenFailure(retrieveErr.Error())
} else {
    setTokenError := client.SetOAuthBearerToken(oauthBearerToken)
    if setTokenError != nil {
        fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError)
        client.SetOAuthBearerTokenFailure(setTokenError.Error())
    }
}
}





    func retrieveUnsecuredToken(e kafka.OAuthBearerTokenRefresh) (kafka.OAuthBearerToken, error) {

    //https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go

    conf := &clientcredentials.Config{
        ClientID:     "test",
        ClientSecret: "test",
        TokenURL:     "https://test.com/auth/realms/pro-realm/protocol/openid-connect/token",
    }
    token, _ := conf.Token(context.Background())
    extensions := map[string]string{}
    oauthBearerToken := kafka.OAuthBearerToken{
        TokenValue: token.AccessToken,
        Expiration: token.Expiry,
        Principal:  "principal=dude",
        Extensions: extensions,
    }
    fmt.Println(oauthBearerToken)
    return oauthBearerToken, nil
}
    
func main() {

    /*
        if len(os.Args) < 4 {
            fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
                os.Args[0])
            os.Exit(1)
        }
        
    */

    

    broker := os.Args[1]
    group := os.Args[2]
    topics := os.Args[3:]
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    oauthConf := "principal=dude"

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":                   broker,
        "security.protocol":                   "SASL_PLAINTEXT",
        "sasl.mechanisms":                     "OAUTHBEARER",
        "sasl.oauthbearer.config":             oauthConf,
        "broker.address.family":               "v4",
        "group.id":                            group,
        "session.timeout.ms":                  6000,
        "enable.ssl.certificate.verification": false,
        "auto.offset.reset":                   "earliest"})

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }
    fmt.Printf("Created Consumer %v\n", c)
    err = c.SubscribeTopics(topics, nil)
    run := true
    
     
go func(eventsChan chan kafka.Event) {
    for ev := range eventsChan {
        oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
        if !ok {
            // Ignore other event types
            continue
        }

        handleOAuthBearerTokenRefreshEvent(c, oart)
    }
}(c.Events())


    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }
            switch e := ev.(type) {
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v\n", e.Headers)
                }
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                if e.Code() == kafka.ErrAllBrokersDown {
                    run = false
                }
            default:
                fmt.Printf("Ignored %v\n", e)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()

}

更新:

如果我删除 go func(eventsChan chan kafka.Event) 并在投票中添加以下内容,如果有效:

      case kafka.OAuthBearerTokenRefresh:
            oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
            fmt.Println(oart)
            if !ok {
                // Ignore other event types
                continue
            }
            handleOAuthBearerTokenRefreshEvent(c, oart)
4

0 回答 0