6

I want to listen for add events that are deployments within my cluster,I use the client-go Watch Api,It works well at first, but after a short period of time, it will report an error.

I have tried two API ways to listen

the one way:

func startWatchDeployment(clientSet *kubernetes.Clientset){
    defer func() {
        err := recover()
        if err != nil {
            fmt.Println(err)
        }
    }()

    Log.Info("正在监听deployment...")
    count := 0
    deploymentsClient := clientSet.AppsV1beta1().Deployments(metav1.NamespaceAll)
    list,_ := deploymentsClient.List(metav1.ListOptions{})
    items := list.Items
    w, _ := deploymentsClient.Watch(metav1.ListOptions{})
    for {
        select {
            case e, _ := <-w.ResultChan():
                Log.Infof("162: %s",e)
                if e.Type == watch.Added || e.Type == watch.Deleted{
                    if count != len(items){
                        count += 1
                    }else{
                        // go的reflect获取运行时的struct
                        nname := e.Object.(*v1beta1.Deployment).Namespace
                        if r, _ := regexp.Compile("^(p|u|user)-");nname != "default" && nname != "cattle-system" &&
                            nname != "kube-system" && nname != "dsky-system" &&
                            nname != "kube-public" && nname != "local" && nname != "tools" && !r.MatchString(nname) {
                            data := make(map[string]interface{},1)
                            data["type"] = e.Type
                            data["name"] = e.Object.(*v1beta1.Deployment).Name
                            data["namespace"] = e.Object.(*v1beta1.Deployment).Namespace
                            watchChannel <- data
                        }
                    }
                }
        }
    }
}

the other way:

func startWatchDp(clientSet *kubernetes.Clientset){
    watchlist := cache.NewListWatchFromClient(
        clientSet.AppsV1().RESTClient(),
        "deployments",
        metav1.NamespaceAll,
        fields.Everything())

    _, controller := cache.NewInformer(
        watchlist,
        &v13.Deployment{},
        time.Millisecond*100,
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                watchChannel <- 1
                //fmt.Println(obj)
            },
            DeleteFunc: func(obj interface{}) {
                watchChannel <- 1
            },
        },
    )

    stop := make(chan struct{})
    go controller.Run(stop)

    for {
        time.Sleep(10 * time.Second)
    }
}

Here is my result:

E0219 15:20:02.274210   19272 streamwatcher.go:109] Unable to decode an event from the watch stream: stream error: stream ID 3; INTERNAL_ERROR
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"
time="2019-02-19T15:20:02+08:00" level=info msg="162: { <nil>}"

How can I fix it?Please help!Thanks

4

0 回答 0