0

我们正在尝试从 k8s 1.1 升级到 1.2.4,而我们用于在复制控制器上创建监视的代码似乎不再起作用:监视被创建但我们没有收到它们的“添加”或“修改”事件。

我已经创建了最小的可重现案例,并将其包括在下面。为了方便任何对此感兴趣的人,我还包括启动 K8s 1.1 和 k8s 1.2.4 的脚本。

请注意有关测试用例的以下内容:

0.  We use fabric8 1.3.91 to connect to k8s 1.2.4 and  fabric8 1.2.2 to connect to k8s 1.1.
1.  There are slight changes in the fabric8 API that will require you to 
    tweak the REPRODUCIBLE TEST CASE accoding to the version you run on.

        Please run the k8s 1.1 scenario first.  
        As-shipped, the test program will compile against that version.
        After that, please tweak as follows:

        comment out the following for new fabric8 
            import io.fabric8.kubernetes.client.DefaultKubernetesClient.ConfigBuilder   

        replace masterUrl with: withMasterUrl

        comment out the onClose method of new Watcher { ... }


2.  you should start the appropriate the appropriate single node k8s script before you run the test program.

3.  make sure you create the namespace 'junk6', which you can do by saving 
    the lines below in a file ns.yaml, then typing    kubectl create -f ns.yaml

            kind: Namespace
            apiVersion: v1
            metadata:
              name: junk6
              labels:
                name: junk6

4.  For k8s 1.1 you will see a log message containing the string REPCONTROLLER, and the message
    future is done...     You will not see these when you run under k8s 1.2.4 because it seems 
    the watch message is never received.

5.  The name of the rep controller is spark master, but the image is redis. Please ignore that.

可重复的测试用例

package com.foo.blah

import com.fasterxml.jackson.databind.ObjectMapper
import com.typesafe.scalalogging.StrictLogging
import io.fabric8.kubernetes.api.model.ReplicationController
import io.fabric8.kubernetes.client.DefaultKubernetesClient.ConfigBuilder

//import io.fabric8.kubernetes.client.DefaultKubernetesClient.ConfigBuilder   /* comment this out for new fabric8 */
import io.fabric8.kubernetes.client.Watcher.Action
import io.fabric8.kubernetes.client._

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import scala.util.Try

//  To run:
//          make sure you have a namespace called  junk5   !

object BugReport extends App with StrictLogging {

  class RepControllerAndWatchCreator(val rc: io.fabric8.kubernetes.api.model.ReplicationController,
                                     namespace: String) {

    def instantiate(kube: KubernetesClient, name: String): Future[Unit] = {
      {
        val promise = Promise[Unit]()
        logger.debug(s"setting up  'create complete' watch for component $name ")
        kube.replicationControllers().inNamespace(namespace).create(rc) /*  create the rc ! */
        val rcWatcher = getRcWatch(name, namespace,  promise)
        val rcWatch = kube.replicationControllers().inNamespace(namespace).withName(name).watch(rcWatcher)
        logger.debug(s"after rc watch - name $name")
        promise.future
      }
    }

    private[this] def getRcWatch(name: String,
                                 nameSpace: String,
                                 promise: Promise[Unit]): Watcher[ReplicationController] = {
      logger.debug(s"setting up  'create complete' watch for component $name ns=$namespace")

      new Watcher[ReplicationController]() {
        def eventReceived(action: Action, watchedRc: ReplicationController) {
          logger.debug(s"event recv'd for REPCONTROLLER  $name. action=$action [ $watchedRc ] ")
          promise.success()
        }
        /*      Uncomment this for newer version of fabric8 API.

        override def onClose(cause: KubernetesClientException): Unit = {
          if (!promise.isCompleted) {
            logger.trace("Watcher is close but promise is not completed.")
          }
        }

         */
      }
    }

    private[this] def isRcComplete(rc: ReplicationController) = {
      val retval = rc.getSpec.getReplicas == rc.getStatus.getReplicas
      logger.debug(s"isRcComplete [ ${rc.getMetadata.getName} ] = $retval")
      retval
    }
  }


  val k8sUrl = "http://localhost:8080"
  val namespaceName = "junk6"

  def go(): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global
    val kube: KubernetesClient = getConnection
    val rc: ReplicationController = getRc

    val result: Future[Unit] = new RepControllerAndWatchCreator(rc, namespaceName). instantiate(kube ,"spark-master-rc")
    result onComplete  { (fut: Try[Unit]) =>
      println(s"future is done: $fut")
    }

    Thread.sleep(500 * 1000)
  }

  def getRc: ReplicationController = {
    val jsonTemplate =
      """
    |{
    |  "kind": "ReplicationController",
    |  "apiVersion": "v1",
    |  "metadata": {
    |    "name": "spark-master-rc",
    |    "labels": {
    |      "name": "spark-master"
    |    }
    |  },
    |  "spec": {
    |    "replicas": 1,
    |    "selector": {
    |      "name": "spark-master"
    |    },
    |    "template": {
    |      "metadata": {
    |        "labels": {
    |          "name": "spark-master"
    |        }
    |      },
    |      "spec": {
    |        "containers": [
    |          {
    |            "name": "spark-master",
    |            "image": "redis",
    |            "imagePullPolicy": "Always",
    |            "ports": [
    |              {
    |                "containerPort": 7077
    |              },
    |              {
    |                "containerPort": 8080
    |              }
    |            ],
    |            "resources": {
    |              "resources": {
    |                "cpu": "2000m",
    |                "memory": "4Gi"
    |              },
    |              "limits": {
    |                "cpu": "2000m",
    |                "memory": "4Gi"
    |              }
    |            }
    |          }
    |        ]
    |      }
    |    }
    |  }
    |}
      """.
    stripMargin
    System.out.println("json:" + jsonTemplate);
    new ObjectMapper().readValue(jsonTemplate, classOf[ReplicationController])
  }

  def getConnection = {
    //val configBuilder =   new ConfigBuilder()       /*   For newer fabric8, replace with:  Config.builder() */
    val configBuilder =   new ConfigBuilder()       /*   For newer fabric8, replace with:  Config.builder() */
    val config =
      configBuilder.
        //masterUrl(k8sUrl).                    /* For newer fabric8, replace with:  withMasterUrl  */
        //withMasterUrl(k8sUrl).                    /* For older fabric8, replace with:  masterUrl  */
        masterUrl(k8sUrl).                    /* For newer fabric8, replace with:  withMasterUrl  */
        build()
    new DefaultKubernetesClient(config)
  }


  go()
}

K8s 1.1 的启动脚本

#!/usr/bin/env bash

#   magic selinux context set command is required. for details, see: http://stackoverflow.com/questions/34777111/cannot-create-a-shared-volume-mount-via-emptydir-on-single-node-kubernetes-on
#
sudo chcon -Rt svirt_sandbox_file_t /var/lib/kubelet


docker run --net=host -d gcr.io/google_containers/etcd:2.0.12 /usr/local/bin/etcd --addr=127.0.0.1:4001 --bind-addr=0.0.0.0:4001 --data-dir=/var/etcd/data


docker run \
    --volume=/:/rootfs:ro \
    --volume=/sys:/sys:ro \
    --volume=/dev:/dev \
    --vol

K8s 1.2.4 的启动脚本

#!/usr/bin/env bash

#   magic selinux context set command is required. for details, see: http://stackoverflow.com/questions/34777111/cannot-create-a-shared-volume-mount-via-emptydir-on-single-node-kubernetes-on
#
sudo chcon -Rt svirt_sandbox_file_t /var/lib/kubelet

#docker run --net=host -d gcr.io/google_containers/etcd:2.0.12 /usr/local/bin/etcd --addr=127.0.0.1:4001 --bind-addr=0.0.0.0:4001 --data-dir=/var/etcd/data

# Update k8s local cluster to latest stable version according to "Running Kubernetes Locally via Docker" 
# http://kubernetes.io/docs/getting-started-guides/docker/
# export K8S_VERSION=$(curl -sS https://storage.googleapis.com/kubernetes-release/release/stable.txt)
export K8S_VERSION=v1.2.4
export ARCH=amd64

docker run \
    --volume=/:/rootfs:ro \
    --volume=/sys:/sys:ro \
    --volume=/dev:/dev \
    --volume=/var/lib/docker/:/var/lib/docker:ro \
    --volume=/var/lib/kubelet/:/var/lib/kubelet:rw \
    --volume=/var/run:/var/run:rw \
    --net=host \
    --pid=host \
    --privileged=true \
    -d \
    gcr.io/google_containers/hyperkube-${ARCH}:${K8S_VERSION} \
    /hyperkube kubelet \
    --containerized \
    --hostname-override="127.0.0.1" \
    --address="0.0.0.0" \
    --api-servers=http://localhost:8080 \
    --config=/etc/kubernetes/manifests \
    --allow-privileged --v=2

#docker run -d --net=host --privileged gcr.io/google_containers/hyperkube:v1.0.1 /hyperkube proxy --master=http://127.0.0.1:8080 --v=2

sleep 5   # give everything time to launch
4

1 回答 1

1

因此,我的同事 Vijay 的答案在下面的验证程序中得到了表达。

关键问题(在这个给fabric8的错误报告中也提到过)是对象的创建顺序和对对象的监视很重要。谢谢,维杰!

总结错误报告:

如果您切换了这些语句的顺序

client.replicationControllers().inNamespace(namespace).withLabel("l", "v").watch(watcher);
createRc(client, namespace, podName, image);

对此:

createRc(client, namespace, podName, image);
client.replicationControllers().inNamespace(namespace).withLabel("l", "v").watch(watcher);

```

该程序将停止工作。从我所做的测试中可以看出,在 1.2.2 中切换顺序会很好。

** Vijays 解决方案**

import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.scalalogging.StrictLogging;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ReplicationController;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.fabric8.kubernetes.client.*;

import java.util.HashMap;
import java.util.Map;


public class Vijay {


  public static DefaultKubernetesClient getConnection ()  {
    ConfigBuilder
         configBuilder =   Config.builder() ;
    Config config =
      configBuilder.
        withMasterUrl("http://localhost:8080").
        build();
    return new DefaultKubernetesClient(config);
  }


  public static void main(String[] args) throws Exception {
    DefaultKubernetesClient client = getConnection();

    String namespace = "junk6";
    String podName = "prom";
    String image = "nginx";

    Watcher<ReplicationController> watcher = new Watcher<ReplicationController>() {

      @Override
      public void onClose(KubernetesClientException cause) {
        // TODO Auto-generated method stub

      }

      @Override
      public void eventReceived(Action action, ReplicationController resource) {
        System.out.println(action + ":" + resource);

      }
    };


    client.replicationControllers().inNamespace(namespace).withLabel("l", "v").watch(watcher);

    createRc(client, namespace, podName, image);

  }

  private static void createRc(DefaultKubernetesClient client, String namespace, String podName, String image) {
    try {
      Map<String, String> labels = new HashMap<String, String>();
      labels.put("l", "v");
      ReplicationController rc = client.replicationControllers().inNamespace(namespace)
          .createNew()
          .withNewMetadata()
          .withName(podName)
          .addToLabels(labels)
          .endMetadata()
          .withNewSpec()
          .withReplicas(1)
          .withSelector(labels)
          .withNewTemplate()
          .withNewMetadata()
          .addToLabels(labels)
          .endMetadata()
          .withNewSpec()
          .addNewContainer()
          .withName(podName)
          .withImage(image)
          .withImagePullPolicy("Always")
          .withNewResources()
          .addToLimits("cpu", new Quantity("100m"))
          .addToLimits("memory", new Quantity("100Mi"))
          .endResources()
          .endContainer()
          .endSpec()
          .endTemplate()
          .endSpec()
          .done();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}
于 2016-05-27T19:49:26.757 回答