#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2013 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . import os import sys base_dir = os.path.dirname(os.path.realpath(__file__)) test_dir = os.path.abspath(os.path.join(base_dir, "..")) try: import testlib except: sys.path.append(test_dir) from common.testlib import * from common.testvm import VirtMachine from verify import kubelib class KubernetesContainerMachine(VirtMachine): def start_cockpit(self, *args, **kwargs): host = "172.17.0.1" if kwargs.pop('invalid_kube_service', False): host = "" self.execute("""#!/bin/sh systemctl start docker # HACK: Allow container to talk to the host iptables -I INPUT 4 -i docker0 -j ACCEPT /usr/bin/docker run -d -e COCKPIT_KUBE_INSECURE=true -e KUBERNETES_INSECURE=true -e KUBERNETES_SERVICE_PORT=6443 -e KUBERNETES_SERVICE_HOST={0} -p 9090:9090 cockpit/kubernetes """.format(host)) self.wait_for_cockpit_running(self.address, seconds=180) class KubernetesContainerCase(kubelib.KubernetesCase): machine_class = KubernetesContainerMachine def get_bad_auth_code(self, port=6443): code = self.machine.execute('curl -k -u bad:bad -s -o /dev/null -w "%{{http_code}}" https://{}:{}/api/namespaces/kubernetes'.format(self.machine.address, port)) return code.strip() def restart_api_server(self, **kwargs): # Delete the kubernetes service so that it will need to be recreated # Do this because RBAC takes a while to finish setup # and there is no good way to track progress # but once the service is recreated we know it's good to go self.machine.execute("kubectl delete svc/kubernetes") self.stop_kubernetes() self.start_kubernetes() super(KubernetesContainerCase, self).wait_api_server(**kwargs) def forbid_anon(self, **kwargs): self.machine.execute("sed -i 's/--basic_auth_file=/--anonymous-auth=false --basic_auth_file=/g' /etc/kubernetes/apiserver") self.restart_api_server(**kwargs) def wait_api_server(self, port=8080, timeout=60, scheme='http', forbid_anon=True): super(KubernetesContainerCase, self).wait_api_server(port=port, timeout=timeout, scheme=scheme) if forbid_anon and self.get_bad_auth_code(port=port) == "403": self.forbid_anon(timeout=timeout, port=port, scheme=scheme) class TestKubernetesLogin(KubernetesContainerCase): def login(self, user, password): self.browser.set_val('#login-user-input', user) self.browser.set_val('#login-password-input', password) self.browser.click('#login-button') def login_basic_broken(self, user, password): self.login(user, password) self.browser.wait_not_visible('#login-button') self.browser.wait_in_text("#login-fatal-message", "Turn off anonymous auth or upgrade") self.browser.reload() self.browser.wait_visible("#login") self.browser.wait_visible('#login-button') def check_login(self, mode): m = self.machine b = self.browser if self.get_bad_auth_code(port=6443) == "403": b.reload() b.wait_visible("#login") b.wait_visible('#login-button') if (mode == "RBAC"): self.login_basic_broken("admin", "rbac") else: b.reload() b.wait_visible("#login") self.login("admin", "abac") b.wait_text_not("#login-error-message", "") b.wait_visible('#login-button') self.login_basic_broken("admin", "fubar") self.forbid_anon(port=6443, scheme='https') b.reload() b.wait_visible("#login") self.login("admin", "bad") b.wait_text_not("#login-error-message", "") b.wait_visible('#login-button') self.login("admin", "fubar") with b.wait_timeout(10) as r: b.expect_load() b.wait_present("#content") b.wait_text('#content-user-name', 'admin') # reload, which should log us in with the cookie b.reload() b.wait_present("#content") b.wait_text('#content-user-name', 'admin') b.wait_in_text("title", "Kubernetes Cockpit") b.enter_page("/kubernetes") b.wait_present("#node-list") b.logout() def testKubernetesLogin(self): self.invalid_kube_service = True m = self.machine b = self.browser m.start_cockpit(invalid_kube_service=True) # Try to login, no api var b.open("/") b.wait_visible("#login") b.wait_not_visible("#option-group") b.wait_text("#server-name", "Kubernetes Cockpit") self.login("admin", "fubar") b.wait_in_text("#login-fatal-message", "No kubernetes") b.wait_not_visible('#login-button') m.execute('docker stop $(docker ps -q --filter ancestor="cockpit/kubernetes")') m.start_cockpit() b.open("/") b.wait_visible("#login") b.wait_text("#server-name", "Kubernetes Cockpit") self.login("admin", "fubar") b.wait_in_text("#login-fatal-message", "Couldn't connect to the api") b.wait_not_visible('#login-button') # Start kubernetes self.start_kubernetes() self.wait_api_server(port=6443, scheme='https', forbid_anon=False) self.check_login("ABAC") m.execute("sed -i 's/--anonymous-auth=false//g' /etc/kubernetes/apiserver") m.execute("sed -i 's#--authorization_policy_file=/etc/kubernetes/authorization##g' /etc/kubernetes/apiserver") m.execute("sed -i 's/--authorization_mode=ABAC/--authorization_mode=RBAC/g' /etc/kubernetes/apiserver") self.restart_api_server(port=6443, scheme='https') self.check_login("RBAC") # Remove authentication stuff m.execute("sed -i '/KUBE_API_ARGS=/d' /etc/kubernetes/apiserver") self.restart_api_server(port=6443, scheme='https') # https://github.com/kubernetes/kubernetes/pull/38706 # broke having an open API. So if api is closed # make use RBAC and make cluster bindings if self.get_bad_auth_code(port=6443) == "401": m.execute("echo 'KUBE_API_ARGS=\"--authorization_mode=RBAC\"' >> /etc/kubernetes/apiserver") m.execute("kubectl create clusterrolebinding anon-cluster-admin-binding --clusterrole=cluster-admin --user=system:anonymous") self.restart_api_server(port=6443, scheme='https') b.reload() b.wait_present("#content") b.wait_text('#content-user-name', 'Unauthenticated') b.wait_in_text("title", "Kubernetes Cockpit") b.enter_page("/kubernetes") b.wait_present("#node-list") b.switch_to_top() b.logout() class KubernetesCase(KubernetesContainerCase, kubelib.KubernetesCommonTests): machine_class = KubernetesContainerMachine def setUp(self): super(KubernetesCase, self).setUp() self.start_kubernetes() self.browser.password = "fubar" self.wait_api_server(port=6443, scheme='https') config = os.path.join(test_dir, "verify/files/mock-k8s-tiny-app.json") self.machine.upload([config], "/tmp") self.wait_api_server(port=6443, scheme='https') version = self.machine.execute("rpm -qa | grep kubernetes-master") self.supports_websocket = True if ("-1.1.0-0.17" in version or version < "kubernetes-master-1.1.0-0.17"): self.supports_websocket = False def check_logs(self, b): if not self.supports_websocket: print "No websocket support on kubernetes yet" else: super(KubernetesCase, self).check_logs(b) def check_shell(self, b): if not self.supports_websocket: print "No websocket support on kubernetes yet" else: super(KubernetesCase, self).check_shell(b) if __name__ == '__main__': test_main()