1

我正在开发一个动态的 kubernetes 通知器来监视我的 kubernetes 集群中的事件和所有 kubernetes 组件的发现。

但是,当我尝试KUBECONFIG通过该InClusterConfig方法访问时,出现以下错误:

// go run main.go
FATA[0000] could not get config                          error="unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined"
exit status 1

我在 github 上的 kubernetes repo 以及 stackoverflow 上发现了与此相关的各种类似问题,但找不到任何解决方案或解决方法。[ kubernetes 问题, kubernetes 问题, stackoverflow 类似问题, stackoverflow 类似问题]

以下是 go 代码和 go.mod 文件

去代码

package main

import (
    "os"
    "os/signal"

    "github.com/sirupsen/logrus"
    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    cfg, err := restConfig()
    if err != nil {
        logrus.WithError(err).Fatal("could not get config")
    }

    // Grab a dynamic interface that we can create informers from
    dc, err := dynamic.NewForConfig(cfg)
    if err != nil {
        logrus.WithError(err).Fatal("could not generate dynamic client for config")
    }

    // Create a factory object that we can say "hey, I need to watch this resource"
    // and it will give us back an informer for it
    f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 0, v1.NamespaceAll, nil)

    // Retrieve a "GroupVersionResource" type that we need when generating our informer from our dynamic factory
    gvr, _ := schema.ParseResourceArg("deployments.v1.apps")

    // Finally, create our informer for deployments!
    i := f.ForResource(*gvr)

    stopCh := make(chan struct{})
    go startWatching(stopCh, i.Informer())

    sigCh := make(chan os.Signal, 0)
    signal.Notify(sigCh, os.Kill, os.Interrupt)

    <-sigCh
    close(stopCh)
}

func restConfig() (*rest.Config, error) {
    kubeCfg, err := rest.InClusterConfig()
    if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
        kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
    }

    // var kubeCfg *string
    // if home := homedir.HomeDir(); home != "" {
    //  kubeCfg = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    // } else {
    //  kubeCfg = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    // }
    // flag.Parse()

    // kubeCfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)

    //testing-ends

    if err != nil {
        return nil, err
    }

    return kubeCfg, nil
}

func startWatching(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
    handlers := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            u := obj.(*unstructured.Unstructured)

            logrus.WithFields(logrus.Fields{
                "name":      u.GetName(),
                "namespace": u.GetNamespace(),
                "labels":    u.GetLabels(),
            }).Info("received add event!")
        },
        UpdateFunc: func(oldObj, obj interface{}) {
            logrus.Info("received update event!")
        },
        DeleteFunc: func(obj interface{}) {
            logrus.Info("received update event!")
        },
    }

    s.AddEventHandler(handlers)
    s.Run(stopCh)
}

go.mod 文件

module discovery-test

go 1.15

require (
    github.com/googleapis/gnostic v0.5.3 // indirect
    github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
    github.com/imdario/mergo v0.3.11 // indirect
    github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
    github.com/sirupsen/logrus v1.7.0
    golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
    golang.org/x/net v0.0.0-20201029055024-942e2f445f3c // indirect
    golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 // indirect
    golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
    k8s.io/apimachinery v0.17.0
    k8s.io/client-go v0.17.0
    k8s.io/klog v1.0.0 // indirect
    k8s.io/utils v0.0.0-20201027101359-01387209bb0d // indirect
)
4

1 回答 1

4

首先,感谢@ShudiptaSharma。他的评论帮助我弄清楚我正试图从集群外部获取集群配置,该配置在我无法访问集群的本地计算机 (127.0.0.1) 上引导程序。

此外,我试图弄清楚如何从集群外部访问集群,发现InClusterConfig用于在集群内部运行的用例,当在集群外部运行时,可以使用如下内容:

//go run main.go
package main

import (
    "os"
    "os/signal"
  //"context"
    "flag"
    //"fmt"
    "path/filepath"
    //"time"

    "github.com/sirupsen/logrus"
    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    //"k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
  "k8s.io/client-go/util/homedir"
)

func main() {
  //kubeconfig := os.Getenv("KUBECONFIG")
  
  var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()
  
    cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        logrus.WithError(err).Fatal("could not get config")
    }

    // Grab a dynamic interface that we can create informers from
    dc, err := dynamic.NewForConfig(cfg)
    if err != nil {
        logrus.WithError(err).Fatal("could not generate dynamic client for config")
    }

    // Create a factory object that we can say "hey, I need to watch this resource"
    // and it will give us back an informer for it
    f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 0, v1.NamespaceAll, nil)

    // Retrieve a "GroupVersionResource" type that we need when generating our informer from our dynamic factory
    gvr, _ := schema.ParseResourceArg("deployments.v1.apps")

    // Finally, create our informer for deployments!
    i := f.ForResource(*gvr)

    stopCh := make(chan struct{})
    go startWatching(stopCh, i.Informer())

    sigCh := make(chan os.Signal, 0)
    signal.Notify(sigCh, os.Kill, os.Interrupt)

    <-sigCh
    close(stopCh)
}

func startWatching(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
    handlers := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            u := obj.(*unstructured.Unstructured)

            logrus.WithFields(logrus.Fields{
                "name":      u.GetName(),
                "namespace": u.GetNamespace(),
                "labels":    u.GetLabels(),
            }).Info("received add event!")
        },
        UpdateFunc: func(oldObj, obj interface{}) {
            logrus.Info("received update event!")
        },
        DeleteFunc: func(obj interface{}) {
            logrus.Info("received update event!")
        },
    }

    s.AddEventHandler(handlers)
    s.Run(stopCh)
}
于 2020-11-08T14:22:37.797 回答