2

我编写了一个基于 Go 的 K8s 客户端应用程序来连接 K8s 集群。为了处理来自 K8s 集群的 Pod、Namespace 和 Node 的实时通知(添加、删除、更新),我编写了一个通知器。代码片段如下。

我想特别注意“runtime.HandleCrash()”函数,(我猜)它有助于将运行时恐慌/错误重定向到恐慌文件。

// Read the ES config.
panicFile, _ := os.OpenFile("/var/log/panicfile", os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0644)
syscall.Dup2(int(panicFile.Fd()), int(os.Stderr.Fd()))

请参阅下面的一些错误,这些错误在恐慌文件中报告/收集。

我的问题是:有什么方法,我可以编写通知程序,它向我的应用程序报告/通知特定错误,而不是写入恐慌文件?这样,我的应用程序将能够更优雅地处理这个预期的事件。

有什么方法可以注册回调函数(类似于 Informer.AddEventHandler())。

func (kcv *K8sWorker) armK8sPodListeners() error {

    // Kubernetes serves an utility to handle API crashes
    defer runtime.HandleCrash()

    var sharedInformer = informers.NewSharedInformerFactory(kcv.kubeClient.K8sClient, 0)

    // Add watcher for the Pod.
    kcv.podInformer = sharedInformer.Core().V1().Pods().Informer()
    kcv.podInformerChan = make(chan struct{})

    // Pod informer state change handler
    kcv.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs {
        // When a new pod gets created
        AddFunc: func(obj interface{}) {
            kcv.handleAddPod(obj)
        },
        // When a pod gets updated
        UpdateFunc: func(oldObj interface{}, newObj interface{}) {
           kcv.handleUpdatePod(oldObj, newObj)
        },
        // When a pod gets deleted
        DeleteFunc: func(obj interface{}) {
            kcv.handleDeletePod(obj)
        },
    })

    kcv.nsInformer = sharedInformer.Core().V1().Namespaces().Informer()
    kcv.nsInformerChan = make(chan struct{})

    // Namespace informer state change handler
    kcv.nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs {
        // When a new namespace gets created
        AddFunc: func(obj interface{}) {
            kcv.handleAddNamespace(obj)
        },
        // When a namespace gets updated
        //UpdateFunc: func(oldObj interface{}, newObj interface{}) {
        //    kcv.handleUpdateNamespace(oldObj, newObj)
        //},
        // When a namespace gets deleted
        DeleteFunc: func(obj interface{}) {
            kcv.handleDeleteNamespace(obj)
        },
    })

    // Add watcher for the Node.
    kcv.nodeInformer = sharedInformer.Core().V1().Nodes().Informer()
    kcv.nodeInformerChan = make(chan struct{})

    // Node informer state change handler
    kcv.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs {
        // When a new node gets created
        AddFunc: func(obj interface{}) {
            kcv.handleAddNode(obj)
        },
        // When a node gets updated
        UpdateFunc: func(oldObj interface{}, newObj interface{}) {
           kcv.handleUpdateNode(oldObj, newObj)
        },
        // When a node gets deleted
        DeleteFunc: func(obj interface{}) {
           kcv.handleDeleteNode(obj)
        },
    })

    // Start the shared informer.
    kcv.sharedInformerChan = make(chan struct{})
    sharedInformer.Start(kcv.sharedInformerChan)
    log.Debug("Shared informer started")

    return nil
}

在一个特定的用例中,我关闭了 K8s 集群,导致告密者将错误消息抛出到恐慌文件中,如下所示。

在我启动 K8s 集群节点的那一刻,它停止报告这些错误。

==== output from "/var/log/panicfile" ====== 

E0611 16:13:03.558214      10 reflector.go:125] k8s.io/client-go/informers/factory.go:133: Failed to list *v1.Pod: Get https://10.30.8.75:6443/api/v1/pods?limit=500&resourceVersion=0: dial tcp 10.30.8.75:6443: connect: no route to host                                                                                                                                     
E0611 16:13:03.558224      10 reflector.go:125] k8s.io/client-go/informers/factory.go:133: Failed to list *v1.Namespace: Get https://10.30.8.75:6443/api/v1/namespaces?limit=500&resourceVersion=0: dial tcp 10.30.8.75:6443: connect: no route to host                                                                                                                         
E0611 16:13:03.558246      10 reflector.go:125] k8s.io/client-go/informers/factory.go:133: Failed to list *v1.Node: Get https://10.30.8.75:6443/api/v1/nodes?limit=500&resourceVersion=0: dial tcp 10.30.8.75:6443: connect: no route to host                                                                                                                                   
4

1 回答 1

1

你的问题是:

有什么方法可以注册回调函数(类似于 Informer.AddEventHandler())。

相信你正在寻找的是SetWatchErrorHandler()

源代码

type SharedInformer interface {
    ...

    // The WatchErrorHandler is called whenever ListAndWatch drops the
    // connection with an error. After calling this handler, the informer
    // will backoff and retry.
    //
    // The default implementation looks at the error type and tries to log
    // the error message at an appropriate level.
    //
    // There's only one handler, so if you call this multiple times, last one
    // wins; calling after the informer has been started returns an error.
    //
    // The handler is intended for visibility, not to e.g. pause the consumers.
    // The handler should return quickly - any expensive processing should be
    // offloaded.
    SetWatchErrorHandler(handler WatchErrorHandler) error
}

你在告密者上调用这个函数:

kcv.podInformer.SetWatchErrorHandler(func(r *Reflector, err error) {
    // your code goes here
})

这是DefaultWatchErrorHandler

于 2021-04-08T09:08:54.310 回答