diff --git a/api.go b/api.go index 09d6b47..772bf67 100644 --- a/api.go +++ b/api.go @@ -10,6 +10,7 @@ import ( "github.com/play-with-docker/play-with-docker/event" "github.com/play-with-docker/play-with-docker/handlers" "github.com/play-with-docker/play-with-docker/id" + "github.com/play-with-docker/play-with-docker/k8s" "github.com/play-with-docker/play-with-docker/provisioner" "github.com/play-with-docker/play-with-docker/pwd" "github.com/play-with-docker/play-with-docker/pwd/types" @@ -23,18 +24,21 @@ func main() { e := initEvent() s := initStorage() - f := initFactory(s) + df := initDockerFactory(s) + kf := initK8sFactory(s) - ipf := provisioner.NewInstanceProvisionerFactory(provisioner.NewWindowsASG(f, s), provisioner.NewDinD(id.XIDGenerator{}, f, s)) - sp := provisioner.NewOverlaySessionProvisioner(f) + ipf := provisioner.NewInstanceProvisionerFactory(provisioner.NewWindowsASG(df, s), provisioner.NewDinD(id.XIDGenerator{}, df, s)) + sp := provisioner.NewOverlaySessionProvisioner(df) - core := pwd.NewPWD(f, e, s, sp, ipf) + core := pwd.NewPWD(df, e, s, sp, ipf) tasks := []scheduler.Task{ - task.NewCheckPorts(e, f), - task.NewCheckSwarmPorts(e, f), - task.NewCheckSwarmStatus(e, f), - task.NewCollectStats(e, f, s), + task.NewCheckPorts(e, df), + task.NewCheckSwarmPorts(e, df), + task.NewCheckSwarmStatus(e, df), + task.NewCollectStats(e, df, s), + task.NewCheckK8sClusterStatus(e, kf), + task.NewCheckK8sClusterExposedPorts(e, kf), } sch, err := scheduler.NewScheduler(tasks, s, e, core) if err != nil { @@ -69,6 +73,10 @@ func initEvent() event.EventApi { return event.NewLocalBroker() } -func initFactory(s storage.StorageApi) docker.FactoryApi { +func initDockerFactory(s storage.StorageApi) docker.FactoryApi { return docker.NewLocalCachedFactory(s) } + +func initK8sFactory(s storage.StorageApi) k8s.FactoryApi { + return k8s.NewLocalCachedFactory(s) +} diff --git a/docker/factory.go b/docker/factory.go index 36e441c..95bd12e 100644 --- a/docker/factory.go +++ b/docker/factory.go @@ -8,8 +8,8 @@ import ( "net/url" "time" - "docker.io/go-docker/api" client "docker.io/go-docker" + "docker.io/go-docker/api" "github.com/docker/go-connections/tlsconfig" "github.com/play-with-docker/play-with-docker/pwd/types" "github.com/play-with-docker/play-with-docker/router" diff --git a/dockerfiles/k8s/Dockerfile b/dockerfiles/k8s/Dockerfile new file mode 100644 index 0000000..eb87dfc --- /dev/null +++ b/dockerfiles/k8s/Dockerfile @@ -0,0 +1,32 @@ +FROM centos:7 + +COPY ./systemctl /usr/bin/systemctl +COPY ./kubernetes.repo /etc/yum.repos.d/ + + + +RUN yum install -y kubelet kubeadm \ + && mv -f /etc/systemd/system/kubelet.service.d/10-kubeadm.conf /etc/systemd/system/kubelet.service \ + && yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo \ + && yum install -y docker-ce \ + && sed -i -e '4d;5d;8d' /lib/systemd/system/docker.service \ + && yum clean all + +RUN curl -Lf -o jq https://github.com/stedolan/jq/releases/download/jq-1.5/jq-linux64 \ + && chmod +x ./jq && mv jq /usr/bin + +COPY ./kube* /etc/systemd/system/ +COPY ./wrapkubeadm.sh /usr/local/bin/kubeadm +COPY ./tokens.csv /etc/pki/tokens.csv +COPY ./daemon.json /etc/docker/ + +COPY motd /etc/motd + +RUN echo $'cat /etc/motd \n\ +export PS1="[\h \W]$ "' >> /root/.bash_profile + +RUN mkdir -p /root/.kube && ln -s /etc/kubernetes/admin.conf /root/.kube/config \ + && rm -f /etc/machine-id + +CMD systemctl start docker && systemctl start kubelet \ + && while true; do bash -l; done diff --git a/dockerfiles/k8s/Dockerfile.final b/dockerfiles/k8s/Dockerfile.final new file mode 100644 index 0000000..188696c --- /dev/null +++ b/dockerfiles/k8s/Dockerfile.final @@ -0,0 +1,8 @@ +FROM franela/kind_builder + +COPY motd /etc/motd +RUN echo $'cat /etc/motd \n\ +export PS1="[\h \W]$ "' >> /root/.bash_profile + +CMD systemctl start docker && systemctl start kubelet \ + && while true; do bash -l; done diff --git a/dockerfiles/k8s/daemon.json b/dockerfiles/k8s/daemon.json new file mode 100644 index 0000000..2792f93 --- /dev/null +++ b/dockerfiles/k8s/daemon.json @@ -0,0 +1,7 @@ +{ + "experimental": true, + "debug": true, + "log-level": "info", + "insecure-registries": ["127.0.0.1"], + "hosts": ["unix:///var/run/docker.sock", "tcp://0.0.0.0:2375"] +} diff --git a/dockerfiles/k8s/kubelet.env b/dockerfiles/k8s/kubelet.env new file mode 100644 index 0000000..a49f308 --- /dev/null +++ b/dockerfiles/k8s/kubelet.env @@ -0,0 +1,8 @@ +KUBELET_KUBECONFIG_ARGS=" --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --kubeconfig=/etc/kubernetes/kubelet.conf" +KUBELET_SYSTEM_PODS_ARGS="--pod-manifest-path=/etc/kubernetes/manifests --allow-privileged=true" +KUBELET_NETWORK_ARGS="--network-plugin=cni --cni-conf-dir=/etc/cni/net.d --cni-bin-dir=/opt/cni/bin" +KUBELET_DNS_ARGS="--cluster-dns=10.96.0.10 --cluster-domain=cluster.local" +KUBELET_AUTHZ_ARGS="--authorization-mode=Webhook --client-ca-file=/etc/kubernetes/pki/ca.crt" +KUBELET_CADVISOR_ARGS="--cadvisor-port=0" +KUBELET_CGROUP_ARGS="--cgroup-driver=cgroupfs" +KUBELET_EXTRA_ARGS="--fail-swap-on=false" diff --git a/dockerfiles/k8s/kubelet.service b/dockerfiles/k8s/kubelet.service new file mode 100755 index 0000000..3b852aa --- /dev/null +++ b/dockerfiles/k8s/kubelet.service @@ -0,0 +1,4 @@ +[Service] +Restart=always +EnvironmentFile=/etc/systemd/system/kubelet.env +ExecStart=/usr/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_SYSTEM_PODS_ARGS $KUBELET_NETWORK_ARGS $KUBELET_DNS_ARGS $KUBELET_AUTHZ_ARGS $KUBELET_CADVISOR_ARGS $KUBELET_CGROUP_ARGS $KUBELET_EXTRA_ARGS diff --git a/dockerfiles/k8s/kubernetes.repo b/dockerfiles/k8s/kubernetes.repo new file mode 100644 index 0000000..827d07a --- /dev/null +++ b/dockerfiles/k8s/kubernetes.repo @@ -0,0 +1,8 @@ +[kubernetes] +name=Kubernetes +baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-x86_64 +enabled=1 +gpgcheck=1 +repo_gpgcheck=1 +gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg + https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg diff --git a/dockerfiles/k8s/motd b/dockerfiles/k8s/motd new file mode 100644 index 0000000..e7b9f82 --- /dev/null +++ b/dockerfiles/k8s/motd @@ -0,0 +1,30 @@ + + WARNING!!!! + + This is a sandbox environment. Using personal credentials + is HIGHLY! discouraged. Any consequences of doing so, are + completely the user's responsibilites. + + You can bootstrap a cluster as follows: + + 1. Initializes cluster master node: + + kubeadm init --apiserver-advertise-address $(hostname -i) + + + 2. Initialize cluster networking: + + kubectl apply -n kube-system -f \ + "https://cloud.weave.works/k8s/net?k8s-version=$(kubectl version | base64 | tr -d '\n')" + + + 3. (Optional) Initialize kube-dashboard: + + curl -L -s https://git.io/kube-dashboard | sed 's/targetPort: 9090/targetPort: 9090\n type: LoadBalancer/' | \ + kubectl apply -f - + + + The PWK team. + + + diff --git a/dockerfiles/k8s/systemctl b/dockerfiles/k8s/systemctl new file mode 100755 index 0000000..eb16052 --- /dev/null +++ b/dockerfiles/k8s/systemctl @@ -0,0 +1,281 @@ +#!/bin/bash + +function get_unit_file(){ + + local UNIT=$1 + + for DIR in ${UNIT_PATHS[@]} ; do + if [ -f "${DIR}${UNIT}" ] ; then + echo "${DIR}${UNIT}" + break + fi + done + +} + +function read_option(){ + local OPTION="$1" + local UNIT_FILE="$2" + local UNIT_INSTANCE="$3" + + local UNIT=`basename $UNIT_FILE` + local UNIT_FULL=`echo $UNIT | sed "s/@/@$UNIT_INSTANCE/"` + + VALUE="$(grep '^'$OPTION'[= ]' "$UNIT_FILE" | cut -d '=' -f2- | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')" + + VALUE="` + echo $VALUE | + sed -e "s/%[i]/$UNIT_INSTANCE/g" \ + -e "s/%[I]/\"$UNIT_INSTANCE\"/g" \ + -e "s/%[n]/$UNIT_FULL/g" \ + -e "s/%[N]/\"$UNIT_FULL\"/g" + `" + # TODO: Add more options from: + # https://www.freedesktop.org/software/systemd/man/systemd.unit.html#Specifiers + + echo $VALUE +} + +function get_unit_wants() { + + local UNIT_FILE=$1 + local UNIT=`basename $UNIT_FILE` + + sort -u <<< `( + # Print wants from UNIT_PATHS + for DIR in ${UNIT_PATHS[@]} ; do + if [ -d "${DIR}${UNIT}.wants" ] ; then + ls -1 "${DIR}${UNIT}.wants/" | tr '\n' ' ' + fi + done + + # Print wants from unit-file + read_option Wants $UNIT_FILE + )` +} + +function action_start(){ + + # Find depended services + local UNIT_FILE=$1 + local UNIT_WANTS=(`get_unit_wants $1`) + local UNIT_INSTANCE=$2 + + # Start depended services + for UNIT in ${UNIT_WANTS[@]}; do + exec_action start $UNIT + done + + # Load options + local User=`read_option User $UNIT_FILE $UNIT_INSTANCE` + local Type=`read_option Type $UNIT_FILE $UNIT_INSTANCE` + local EnvironmentFile=`read_option EnvironmentFile $UNIT_FILE $UNIT_INSTANCE` + local ExecStartPre=(`read_option ExecStartPre $UNIT_FILE $UNIT_INSTANCE`) + local ExecStart=`read_option ExecStart $UNIT_FILE $UNIT_INSTANCE` + local ExecStartPost=(`read_option ExecStartPost $UNIT_FILE $UNIT_INSTANCE`) + local Restart=(`read_option Restart $UNIT_FILE $UNIT_INSTANCE`) + local RestartSec=(`read_option RestartSec $UNIT_FILE $UNIT_INSTANCE`) + RestartSec=${RestartSec:=5} + + [ -f "$EnvironmentFile" ] && source "$EnvironmentFile" + + # Start service + if [ -z $Type ] || [[ "${Type,,}" == *"simple"* ]] ; then + if [ "$Restart" == "always" ]; then + COMMAND='nohup bash -c "while true ; do '"$ExecStart"'; sleep $RestartSec; done" &>/dev/null &' + else + COMMAND='nohup '"$ExecStart"' >>/dev/null 2>&1 &' + fi + elif [[ "${Type,,}" == *"forking"* ]] || [[ "${Type,,}" == *"oneshot"* ]] ; then + COMMAND="$ExecStart" + else + >&2 echo "Unknown service type $Type" + fi + + #[ -z $User ] || COMMAND="su $User -c \"$COMMAND\"" + + while IFS=$'\n' read -a i; do + eval $i + done <<< "${ExecStartPre[@]}" + + eval "$COMMAND" + + while IFS=$'\n' read -a i; do + eval $i + done <<< "${ExecStartPost[@]}" +} + +function action_stop(){ + + # Find depended services + local UNIT_FILE=$1 + local UNIT_WANTS=(`get_unit_wants $1`) + local UNIT_INSTANCE=$2 + + # Load options + local User=`read_option User $UNIT_FILE $UNIT_INSTANCE` + local Type=`read_option Type $UNIT_FILE $UNIT_INSTANCE` + local EnvironmentFile=`read_option EnvironmentFile $UNIT_FILE $UNIT_INSTANCE` + local ExecStopPre=(`read_option ExecStartPre $UNIT_FILE $UNIT_INSTANCE`) + local ExecStop=`read_option ExecStop $UNIT_FILE $UNIT_INSTANCE` + local ExecStopPost=(`read_option ExecStartPost $UNIT_FILE $UNIT_INSTANCE`) + local ExecStart=`read_option ExecStart $UNIT_FILE $UNIT_INSTANCE` + + [ -f "$EnvironmentFile" ] && source "$EnvironmentFile" + + # Stop service + if [ -z $ExecStop ] ; then + COMMAND="kill -TERM \$(pgrep -f \"$ExecStart\")" + else + COMMAND="$ExecStop" + fi + + #[ -z $User ] || COMMAND="su $User -c \"$COMMAND\"" + + while IFS=$'\n' read -a i; do + eval $i + done <<< "${ExecStopPre[@]}" + + eval "$COMMAND" + + while IFS=$'\n' read -a i; do + eval $i + done <<< "${ExecStopPost[@]}" +} + +function action_restart(){ + local UNIT_FILE=$1 + local UNIT_INSTANCE=$2 + + action_start $UNIT_FILE $UNIT_INSTANCE + action_stop $UNIT_FILE $UNIT_INSTANCE +} + + +function action_enable(){ + + local UNIT_FILE=$1 + local UNIT=`basename $UNIT_FILE` + local UNIT_INSTANCE=$2 + local UNIT_FULL=`echo $UNIT | sed "s/@/@$UNIT_INSTANCE/"` + + local WantedBy=`read_option WantedBy $UNIT_FILE` + + if [ -z $WantedBy ] ; then + >&2 echo "Unit $UNIT have no WantedBy option." + exit 1 + fi + + local WANTEDBY_DIR="/etc/systemd/system/$WantedBy.wants" + + if [ ! -f "$WANTEDBY_DIR/$UNIT_FULL" ] ; then + mkdir -p $WANTEDBY_DIR + echo Created symlink from $WANTEDBY_DIR/$UNIT_FULL to $UNIT_FILE. + ln -s $WANTEDBY_DIR/$UNIT_FULL $UNIT_FILE + fi + +} + +function action_disable(){ + + local UNIT_FILE=$1 + local UNIT=`basename $UNIT_FILE` + local UNIT_INSTANCE=$2 + local UNIT_FULL=`echo $UNIT | sed "s/@/@$UNIT_INSTANCE/"` + + local WantedBy=`read_option WantedBy $UNIT_FILE` + + if [ -z $WantedBy ] ; then + >&2 echo "Unit $UNIT have no WantedBy option." + exit 1 + fi + + local WANTEDBY_DIR="/etc/systemd/system/$WantedBy.wants" + + if [ -f "$WANTEDBY_DIR/$UNIT_FULL" ] ; then + echo Removed $WANTEDBY_DIR/$UNIT_FULL. + rm -f $WANTEDBY_DIR/$UNIT_FULL. + rmdir --ignore-fail-on-non-empty $WANTEDBY_DIR + fi + +} + +function action_status(){ + + # Find depended services + local UNIT_FILE=$1 + local UNIT_WANTS=(`get_unit_wants $1`) + local UNIT_INSTANCE=$2 + + local ExecStart=`read_option ExecStart $UNIT_FILE $UNIT_INSTANCE` + + + COMMAND="pgrep -f \"$ExecStart\" &>/dev/null" + + + if eval "$COMMAND"; then + exit 0 + fi + + >&2 echo "Loaded: not-found" + exit 1 +} + +function action_is_enabled(){ + exit 0 +} + +function action_is_active(){ + local UNIT=`basename $1` + if systemctl status $UNIT ; then + >&2 echo "active" + exit 0 + fi + + exit 1 + +} + +function exec_action(){ + + local ACTION=$1 + local UNIT=$2 + + [[ $UNIT =~ '.' ]] || UNIT="$UNIT.service" + + if [[ $UNIT =~ '@' ]] ; then + local UNIT_INSTANCE=`echo $UNIT | cut -d'@' -f2- | cut -d. -f1` + local UNIT=`echo $UNIT | sed "s/$UNIT_INSTANCE//"` + fi + + UNIT_FILE=`get_unit_file $UNIT` + + if [ -z $UNIT_FILE ] ; then + >&2 echo "Failed to $ACTION $UNIT: Unit $UNIT not found." + exit 1 + else + case "$ACTION" in + start ) action_start $UNIT_FILE $UNIT_INSTANCE ;; + stop ) action_stop $UNIT_FILE $UNIT_INSTANCE ;; + restart ) action_restart $UNIT_FILE $UNIT_INSTANCE ;; + enable ) action_enable $UNIT_FILE $UNIT_INSTANCE ;; + disable ) action_disable $UNIT_FILE $UNIT_INSTANCE ;; + status ) action_status $UNIT_FILE $UNIT_INSTANCE ;; + is-enabled ) action_is_enabled $UNIT_FILE $UNIT_INSTANCE ;; + is-active ) action_is_active $UNIT_FILE $UNIT_INSTANCE ;; + * ) >&2 echo "Unknown operation $ACTION." ; exit 1 ;; + esac + fi +} + +ACTION="$1" +UNITS="${@:2}" +UNIT_PATHS=( + /etc/systemd/system/ + /usr/lib/systemd/system/ +) + + +for UNIT in ${UNITS[@]}; do + exec_action $ACTION $UNIT +done diff --git a/dockerfiles/k8s/tokens.csv b/dockerfiles/k8s/tokens.csv new file mode 100644 index 0000000..e7a4f92 --- /dev/null +++ b/dockerfiles/k8s/tokens.csv @@ -0,0 +1 @@ +31ada4fd-adec-460c-809a-9e56ceb75269,pwd,pwd,"system:admin,system:masters" diff --git a/dockerfiles/k8s/wrapkubeadm.sh b/dockerfiles/k8s/wrapkubeadm.sh new file mode 100755 index 0000000..349a223 --- /dev/null +++ b/dockerfiles/k8s/wrapkubeadm.sh @@ -0,0 +1,141 @@ +#!/bin/bash +# Copyright 2017 Mirantis +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o pipefail +set -o errtrace + +apiserver_static_pod="/etc/kubernetes/manifests/kube-apiserver" + +# jq filters follow. + +# TODO: think about more secure possibilities +apiserver_anonymous_auth='.spec.containers[0].command|=map(select(startswith("--token-auth-file")|not))+["--token-auth-file=/etc/pki/tokens.csv"]' +# Make apiserver accept insecure connections on port 8080 +# TODO: don't use insecure port +#apiserver_insecure_bind_port='.spec.containers[0].command|=map(select(startswith("--insecure-port=")|not))+["--insecure-port=2375"]' + + +# Update kube-proxy CIDR, enable --masquerade-all and disable conntrack (see dind::frob-proxy below) +function dind::proxy-cidr-and-no-conntrack { + cluster_cidr="$(ip addr show docker0 | grep -w inet | awk '{ print $2; }')" + echo ".items[0].spec.template.spec.containers[0].command |= .+ [\"--cluster-cidr=${cluster_cidr}\", \"--masquerade-all\", \"--conntrack-max=0\", \"--conntrack-max-per-core=0\"]" +} + + + +function dind::join-filters { + local IFS="|" + echo "$*" +} + +function dind::frob-apiserver { + local -a filters=("${apiserver_anonymous_auth}") + + dind::frob-file "${apiserver_static_pod}" "${filters[@]}" +} + +function dind::frob-file { + local path_base="$1" + shift + local filter="$(dind::join-filters "$@")" + local status=0 + if [[ -f ${path_base}.yaml ]]; then + dind::yq "${filter}" "${path_base}.yaml" || status=$? + else + echo "${path_base}.json or ${path_base}.yaml not found" >&2 + return 1 + fi + if [[ ${status} -ne 0 ]]; then + echo "Failed to frob ${path_base}.yaml" >&2 + return 1 + fi +} + +function dind::yq { + local filter="$1" + local path="$2" + # We need to use a temp file here because if you feed an object to + # 'kubectl convert' via stdin, you'll get a List object because + # multiple input objects are implied + tmp="$(mktemp tmp-XXXXXXXXXX.json)" + kubectl convert -f "${path}" --local -o json 2>/dev/null | + jq "${filter}" > "${tmp}" + kubectl convert -f "${tmp}" --local -o yaml 2>/dev/null >"${path}" + rm -f "${tmp}" +} + +function dind::frob-proxy { + # Trying to change conntrack settings fails even in priveleged containers, + # so we need to avoid it. Here's sample error message from kube-proxy: + # I1010 21:53:00.525940 1 conntrack.go:57] Setting conntrack hashsize to 49152 + # Error: write /sys/module/nf_conntrack/parameters/hashsize: operation not supported + # write /sys/module/nf_conntrack/parameters/hashsize: operation not supported + # + # Recipe by @errordeveloper: + # https://github.com/kubernetes/kubernetes/pull/34522#issuecomment-253248985 + local force_apply=--force + if ! kubectl version --short >&/dev/null; then + # kubectl 1.4 doesn't have version --short and also it doesn't support apply --force + force_apply= + fi + KUBECONFIG=/etc/kubernetes/admin.conf kubectl -n kube-system get ds -l k8s-app=kube-proxy -o json | + jq "$(dind::join-filters "$(dind::proxy-cidr-and-no-conntrack)")" | KUBECONFIG=/etc/kubernetes/admin.conf kubectl apply ${force_apply} -f - + + KUBECONFIG=/etc/kubernetes/admin.conf kubectl -n kube-system delete pods --now -l "k8s-app=kube-proxy" +} + + +function dind::wait-for-apiserver { + echo -n "Waiting for api server to startup" + local url="https://localhost:6443/api" + local n=60 + while true; do + if curl -k -s "${url}" >&/dev/null; then + break + fi + if ((--n == 0)); then + echo "Error: timed out waiting for apiserver to become available" >&2 + fi + echo -n "." + sleep 0.5 + done + echo "" +} + +function dind::frob-cluster { + dind::frob-apiserver + dind::wait-for-apiserver + dind::frob-proxy +} + +# Weave depends on /etc/machine-id being unique +if [[ ! -f /etc/machine-id ]]; then + rm -f /etc/machine-id + systemd-machine-id-setup +fi + +if [[ "$@" == "init"* || "$@" == "join"* ]]; then +# Call kubeadm with params and skip flag + /usr/bin/kubeadm "$@" --skip-preflight-checks +else +# Call kubeadm with params + /usr/bin/kubeadm "$@" +fi + +# Frob cluster +if [[ "$@" == "init"* && $? -eq 0 && ! "$@" == *"--help"* ]]; then + dind::frob-cluster +fi + diff --git a/haproxy/haproxy.cfg b/haproxy/haproxy.cfg index e92965b..9cd6ede 100644 --- a/haproxy/haproxy.cfg +++ b/haproxy/haproxy.cfg @@ -1,8 +1,6 @@ defaults mode http timeout connect 5000ms - timeout client 50000ms - timeout server 50000ms frontend http-in bind *:8080 diff --git a/k8s/factory.go b/k8s/factory.go new file mode 100644 index 0000000..9f26016 --- /dev/null +++ b/k8s/factory.go @@ -0,0 +1,140 @@ +package k8s + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net" + "net/http" + "net/url" + "time" + + "github.com/docker/go-connections/tlsconfig" + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/router" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +type FactoryApi interface { + GetForInstance(instance *types.Instance) (*kubernetes.Clientset, error) + GetKubeletForInstance(instance *types.Instance) (*kubeletClient, error) +} + +func NewClient(instance *types.Instance, proxyHost string) (*kubernetes.Clientset, error) { + var durl string + + host := router.EncodeHost(instance.SessionId, instance.RoutableIP, router.HostOpts{EncodedPort: 6443}) + + var tlsConfig *tls.Config + tlsConfig = tlsconfig.ClientDefault() + tlsConfig.InsecureSkipVerify = true + tlsConfig.ServerName = host + + var transport http.RoundTripper + transport = &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSClientConfig: tlsConfig, + MaxIdleConnsPerHost: 5, + } + + durl = fmt.Sprintf("https://%s", proxyHost) + + cc := rest.ContentConfig{ + ContentType: "application/json", + GroupVersion: &schema.GroupVersion{Version: "v1"}, + NegotiatedSerializer: serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}, + } + restConfig := &rest.Config{ + Host: durl, + APIPath: "/api/", + BearerToken: "31ada4fd-adec-460c-809a-9e56ceb75269", + ContentConfig: cc, + } + + transport, err := rest.HTTPWrappersForConfig(restConfig, transport) + if err != nil { + return nil, fmt.Errorf("Error wrapping transport %v", err) + } + cli := &http.Client{ + Transport: transport, + } + + rc, err := rest.RESTClientFor(restConfig) + rc.Client = cli + if err != nil { + return nil, fmt.Errorf("Error creating K8s client %v", err) + } + + return kubernetes.New(rc), nil +} + +func NewKubeletClient(instance *types.Instance, proxyHost string) (*kubeletClient, error) { + var durl string + + host := router.EncodeHost(instance.SessionId, instance.RoutableIP, router.HostOpts{EncodedPort: 10255}) + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConnsPerHost: 5, + } + + durl = fmt.Sprintf("http://%s", host) + transport.Proxy = http.ProxyURL(&url.URL{Host: proxyHost}) + + cli := &http.Client{ + Transport: transport, + } + kc := &kubeletClient{client: cli, baseURL: durl} + return kc, nil +} + +type kubeletClient struct { + client *http.Client + baseURL string +} + +func (c *kubeletClient) Get(path string) (*http.Response, error) { + return c.client.Get(c.baseURL + path) +} + +type metadata struct { + Labels map[string]string +} + +type item struct { + Metadata metadata +} + +type kubeletPodsResponse struct { + Items []item +} + +func (c *kubeletClient) IsManager() (bool, error) { + res, err := c.client.Get(c.baseURL + "/pods") + if err != nil { + return false, err + } + podsData := &kubeletPodsResponse{} + + json.NewDecoder(res.Body).Decode(podsData) + + for _, i := range podsData.Items { + for _, v := range i.Metadata.Labels { + if v == "kube-apiserver" { + return true, nil + } + } + } + + return false, nil +} diff --git a/k8s/factory_mock.go b/k8s/factory_mock.go new file mode 100644 index 0000000..6a4b290 --- /dev/null +++ b/k8s/factory_mock.go @@ -0,0 +1,21 @@ +package k8s + +import ( + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/stretchr/testify/mock" + "k8s.io/client-go/kubernetes" +) + +type FactoryMock struct { + mock.Mock +} + +func (m *FactoryMock) GetKubeletForInstance(i *types.Instance) (*kubeletClient, error) { + args := m.Called(i) + return args.Get(0).(*kubeletClient), args.Error(1) +} + +func (m *FactoryMock) GetForInstance(instance *types.Instance) (*kubernetes.Clientset, error) { + args := m.Called(instance) + return args.Get(0).(*kubernetes.Clientset), args.Error(1) +} diff --git a/k8s/local_cached_factory.go b/k8s/local_cached_factory.go new file mode 100644 index 0000000..10ccb12 --- /dev/null +++ b/k8s/local_cached_factory.go @@ -0,0 +1,124 @@ +package k8s + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/play-with-docker/play-with-docker/pwd/types" + "github.com/play-with-docker/play-with-docker/storage" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +type localCachedFactory struct { + rw sync.Mutex + irw sync.Mutex + sessionClient *kubernetes.Clientset + instanceClients map[string]*instanceEntry + storage storage.StorageApi +} + +type instanceEntry struct { + rw sync.Mutex + client *kubernetes.Clientset + kubeletClient *kubeletClient +} + +func (f *localCachedFactory) GetForInstance(instance *types.Instance) (*kubernetes.Clientset, error) { + key := instance.Name + + f.irw.Lock() + c, found := f.instanceClients[key] + if !found { + c := &instanceEntry{} + f.instanceClients[key] = c + } + c = f.instanceClients[key] + f.irw.Unlock() + + c.rw.Lock() + defer c.rw.Unlock() + + if c.client == nil { + kc, err := NewClient(instance, "l2:443") + if err != nil { + return nil, err + } + c.client = kc + } + + err := f.check(func() error { + _, err := c.client.CoreV1().Pods("").List(metav1.ListOptions{}) + return err + }) + if err != nil { + return nil, err + } + + return c.client, nil +} + +func (f *localCachedFactory) GetKubeletForInstance(instance *types.Instance) (*kubeletClient, error) { + key := instance.Name + + f.irw.Lock() + c, found := f.instanceClients[key] + if !found { + c := &instanceEntry{} + f.instanceClients[key] = c + } + c = f.instanceClients[key] + f.irw.Unlock() + + c.rw.Lock() + defer c.rw.Unlock() + + if c.kubeletClient == nil { + kc, err := NewKubeletClient(instance, "l2:443") + if err != nil { + return nil, err + } + c.kubeletClient = kc + } + + err := f.check(func() error { + r, err := c.kubeletClient.Get("/pods") + if err != nil { + return err + } + defer r.Body.Close() + return nil + }) + if err != nil { + return nil, err + } + + return c.kubeletClient, nil +} + +func (f *localCachedFactory) check(fn func() error) error { + ok := false + for i := 0; i < 5; i++ { + err := fn() + if err != nil { + log.Printf("Connection to k8s api has failed, maybe instance is not ready yet, sleeping and retrying in 1 second. Try #%d. Got: %v\n", i+1, err) + time.Sleep(time.Second) + continue + } + ok = true + break + } + if !ok { + return fmt.Errorf("Connection to k8s api was not established.") + } + return nil +} + +func NewLocalCachedFactory(s storage.StorageApi) *localCachedFactory { + return &localCachedFactory{ + instanceClients: make(map[string]*instanceEntry), + storage: s, + } +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 3eb3b88..75031e8 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -76,39 +76,30 @@ func (s *scheduler) processInstance(ctx context.Context, si *scheduledInstance) defer s.unscheduleInstance(si.instance) for { select { - case <-si.ticker.C: - // First check if instance still exists - _, err := s.storage.InstanceGet(si.instance.Name) - if err != nil { - if storage.NotFound(err) { - // Instance doesn't exists anymore. Unschedule. - log.Printf("Instance %s doesn't exists in storage.\n", si.instance.Name) - return - } - log.Printf("Error retrieving instance %s from storage. Got: %v\n", si.instance.Name, err) - continue - } - failed := false - for _, task := range s.tasks { - err := task.Run(ctx, si.instance) - if err != nil { - failed = true - log.Printf("Error running task %s on instance %s. Got: %v\n", task.Name(), si.instance.Name, err) - // Since one task failed, we just assume something might be wrong with the instance, so we don't try to process the rest of the tasks. - si.fails++ - if si.fails > 5 { - log.Printf("Instance %s has failed to execute tasks too many times. Giving up.\n", si.instance.Name) - return - } - break - } - } - if !failed { - si.fails = 0 - } case <-ctx.Done(): log.Printf("Processing tasks for instance %s has been canceled.\n", si.instance.Name) return + default: + select { + case <-si.ticker.C: + // First check if instance still exists + _, err := s.storage.InstanceGet(si.instance.Name) + if err != nil { + if storage.NotFound(err) { + // Instance doesn't exists anymore. Unschedule. + log.Printf("Instance %s doesn't exists in storage.\n", si.instance.Name) + return + } + log.Printf("Error retrieving instance %s from storage. Got: %v\n", si.instance.Name, err) + continue + } + for _, task := range s.tasks { + err := task.Run(ctx, si.instance) + if err != nil { + log.Printf("Error running task %s on instance %s. Got: %v\n", task.Name(), si.instance.Name, err) + } + } + } } } } diff --git a/scheduler/task/check_k8s_cluster_exposed_ports.go b/scheduler/task/check_k8s_cluster_exposed_ports.go new file mode 100644 index 0000000..8f22ca6 --- /dev/null +++ b/scheduler/task/check_k8s_cluster_exposed_ports.go @@ -0,0 +1,77 @@ +package task + +import ( + "context" + "log" + + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/k8s" + "github.com/play-with-docker/play-with-docker/pwd/types" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type checkK8sClusterExposedPortsTask struct { + event event.EventApi + factory k8s.FactoryApi +} + +var CheckK8sClusterExpoedPortsEvent event.EventType + +func init() { + CheckK8sClusterExpoedPortsEvent = event.EventType("instance k8s cluster ports") +} + +func (t *checkK8sClusterExposedPortsTask) Name() string { + return "CheckK8sClusterPorts" +} + +func NewCheckK8sClusterExposedPorts(e event.EventApi, f k8s.FactoryApi) *checkK8sClusterExposedPortsTask { + return &checkK8sClusterExposedPortsTask{event: e, factory: f} +} + +func (c checkK8sClusterExposedPortsTask) Run(ctx context.Context, i *types.Instance) error { + + kc, err := c.factory.GetKubeletForInstance(i) + if err != nil { + return err + } + + if isManager, err := kc.IsManager(); err != nil { + log.Println(err) + return err + } else if !isManager { + return nil + } + + k8s, err := c.factory.GetForInstance(i) + if err != nil { + log.Println(err) + return err + } + + list, err := k8s.CoreV1().Services("").List(meta_v1.ListOptions{}) + if err != nil { + return err + } + exposedPorts := []int{} + + for _, svc := range list.Items { + for _, p := range svc.Spec.Ports { + if p.NodePort > 0 { + exposedPorts = append(exposedPorts, int(p.NodePort)) + } + } + } + + nodeList, err := k8s.CoreV1().Nodes().List(meta_v1.ListOptions{}) + if err != nil { + return err + } + instances := []string{} + for _, node := range nodeList.Items { + instances = append(instances, node.Name) + } + + c.event.Emit(CheckSwarmPortsEvent, i.SessionId, ClusterPorts{Manager: i.Name, Instances: instances, Ports: exposedPorts}) + return nil +} diff --git a/scheduler/task/check_k8s_cluster_status_task.go b/scheduler/task/check_k8s_cluster_status_task.go new file mode 100644 index 0000000..a72eb22 --- /dev/null +++ b/scheduler/task/check_k8s_cluster_status_task.go @@ -0,0 +1,54 @@ +package task + +import ( + "context" + "log" + + "github.com/play-with-docker/play-with-docker/event" + "github.com/play-with-docker/play-with-docker/k8s" + "github.com/play-with-docker/play-with-docker/pwd/types" +) + +type checkK8sClusterStatusTask struct { + event event.EventApi + factory k8s.FactoryApi +} + +var CheckK8sStatusEvent event.EventType + +func init() { + CheckK8sStatusEvent = event.EventType("instance k8s status") +} + +func NewCheckK8sClusterStatus(e event.EventApi, f k8s.FactoryApi) *checkK8sClusterStatusTask { + return &checkK8sClusterStatusTask{event: e, factory: f} +} + +func (c *checkK8sClusterStatusTask) Name() string { + return "CheckK8sClusterStatus" +} + +func (c checkK8sClusterStatusTask) Run(ctx context.Context, i *types.Instance) error { + status := ClusterStatus{Instance: i.Name} + + kc, err := c.factory.GetKubeletForInstance(i) + if err != nil { + log.Println(err) + c.event.Emit(CheckSwarmStatusEvent, i.SessionId, status) + return err + } + + if isManager, err := kc.IsManager(); err != nil { + c.event.Emit(CheckSwarmStatusEvent, i.SessionId, status) + return err + } else if !isManager { + // Not a manager node, nothing to do for this task + status.IsWorker = true + } else { + status.IsManager = true + } + + c.event.Emit(CheckK8sStatusEvent, i.SessionId, status) + + return nil +} diff --git a/scheduler/task/check_swarm_ports.go b/scheduler/task/check_swarm_ports.go index 51254e4..63c73a9 100644 --- a/scheduler/task/check_swarm_ports.go +++ b/scheduler/task/check_swarm_ports.go @@ -9,12 +9,6 @@ import ( "github.com/play-with-docker/play-with-docker/pwd/types" ) -type DockerSwarmPorts struct { - Manager string `json:"manager"` - Instances []string `json:"instances"` - Ports []int `json:"ports"` -} - type checkSwarmPorts struct { event event.EventApi factory docker.FactoryApi @@ -57,7 +51,7 @@ func (t *checkSwarmPorts) Run(ctx context.Context, instance *types.Instance) err ports[i] = int(port) } - t.event.Emit(CheckSwarmPortsEvent, instance.SessionId, DockerSwarmPorts{Manager: instance.Name, Instances: hosts, Ports: ports}) + t.event.Emit(CheckSwarmPortsEvent, instance.SessionId, ClusterPorts{Manager: instance.Name, Instances: hosts, Ports: ports}) return nil } diff --git a/scheduler/task/check_swarm_status.go b/scheduler/task/check_swarm_status.go index a5fd9a2..7cd9396 100644 --- a/scheduler/task/check_swarm_status.go +++ b/scheduler/task/check_swarm_status.go @@ -10,12 +10,6 @@ import ( "github.com/play-with-docker/play-with-docker/pwd/types" ) -type DockerSwarmStatus struct { - IsManager bool `json:"is_manager"` - IsWorker bool `json:"is_worker"` - Instance string `json:"instance"` -} - type checkSwarmStatus struct { event event.EventApi factory docker.FactoryApi @@ -53,8 +47,8 @@ func NewCheckSwarmStatus(e event.EventApi, f docker.FactoryApi) *checkSwarmStatu return &checkSwarmStatus{event: e, factory: f} } -func getDockerSwarmStatus(ctx context.Context, client docker.DockerApi) (DockerSwarmStatus, error) { - status := DockerSwarmStatus{} +func getDockerSwarmStatus(ctx context.Context, client docker.DockerApi) (ClusterStatus, error) { + status := ClusterStatus{} info, err := client.GetDaemonInfo() if err != nil { return status, err diff --git a/scheduler/task/types.go b/scheduler/task/types.go new file mode 100644 index 0000000..6519207 --- /dev/null +++ b/scheduler/task/types.go @@ -0,0 +1,13 @@ +package task + +type ClusterStatus struct { + IsManager bool `json:"is_manager"` + IsWorker bool `json:"is_worker"` + Instance string `json:"instance"` +} + +type ClusterPorts struct { + Manager string `json:"manager"` + Instances []string `json:"instances"` + Ports []int `json:"ports"` +} diff --git a/www/assets/app.js b/www/assets/app.js index 4db6347..30e092a 100644 --- a/www/assets/app.js +++ b/www/assets/app.js @@ -330,6 +330,20 @@ $scope.$apply(); }); + socket.on('instance k8s status', function(status) { + if (!$scope.idx[status.instance]) { + return + } + if (status.is_manager) { + $scope.idx[status.instance].isK8sManager = true + } else if (status.is_worker) { + $scope.idx[status.instance].isK8sManager = false + } else { + $scope.idx[status.instance].isK8sManager = null + } + $scope.$apply(); + }); + socket.on('instance docker ports', function(status) { if (!$scope.idx[status.instance]) { return diff --git a/www/index.html b/www/index.html index 94c1a70..136f1d3 100644 --- a/www/index.html +++ b/www/index.html @@ -59,7 +59,7 @@ {{newInstanceBtnText}} - +