#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2013 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . import os import sys import tempfile import time base_dir = os.path.dirname(os.path.realpath(__file__)) test_dir = os.path.abspath(os.path.join(base_dir, "..")) try: import testlib except: sys.path.append(test_dir) from common.testlib import * from common.testvm import VirtMachine from verify import kubelib class OCBrowser(Browser): def login_and_go(self, path=None, user=None, host=None, authorized=None): if user is None: user = self.default_user href = path if not href: href = "/" if host: href = "/@" + host + href self.open(href) self.expect_load() self.wait_visible("#inputUsername") self.wait_visible("#inputPassword") self.set_val("#inputUsername", user) self.set_val("#inputPassword", "fubar") self.click('.btn-primary') self.expect_load() self.wait_present('#content') self.wait_visible('#content') if path: self.enter_page(path.split("#")[0], host=host) class OCMachine(VirtMachine): run_registry = False def wait_oc(self): i = 0 while True: try: self.execute("oc status 2>&1") break; except: if i > 20: raise i = i + 1 time.sleep(1) def start_cockpit(self, host=None): path = os.path.join(base_dir, "files", "cockpit-openshift-template.json") self.upload([path], "/tmp") self.wait_oc() # TODO: For now we only support cluster admins # This sometimes fails if openshift is still modifying the clusterpolicybinding cmd = "oadm policy add-cluster-role-to-user cluster-admin admin" try: self.execute(cmd) except: self.execute(cmd) args = [ "-v", "COCKPIT_KUBE_INSECURE=true", "-v", "COCKPIT_KUBE_URL=http://{0}:30000".format(self.address), "-v", "OPENSHIFT_OAUTH_PROVIDER_URL=https://{0}:8443".format(self.address) ] if self.run_registry: args += ["-v", "REGISTRY_ONLY=true"] self.execute("oc process -f /tmp/cockpit-openshift-template.json {0} > out.json".format(' '.join(args))) self.execute("oc create -f out.json") self.wait_for_cockpit_running(self.address, seconds=180, port=30000) class RegistryMachine(OCMachine): run_registry = True class OCCase(kubelib.KubernetesCase): def setUp(self, macaddr=None, memory_mb=None, cpus=None): self.machine = self.new_machine(machine_key='openshift', image='openshift') self.machine.start(macaddr=macaddr, memory_mb=memory_mb, cpus=cpus) self.machine.wait_boot() path = os.path.join(test_dir, "verify", "files", "mock-app-openshift.json") self.machine.upload([path], "/tmp") self.browser = OCBrowser(self.machine.address, self.label(), port=30000) self.addCleanup(lambda: self.browser.kill()) self.tmpdir = tempfile.mkdtemp() self.openshift = self.machine self.machine.wait_oc() # Expect the default container user limitations during testing self.openshift.execute("oc patch scc restricted -p '{ \"runAsUser\": { \"type\": \"MustRunAsRange\" } }'") class TestOpenshift(OCCase, kubelib.OpenshiftCommonTests): machine_class = OCMachine def testLogin(self): b = self.browser self.machine.start_cockpit() b.open("/") b.expect_load() b.wait_visible("#inputUsername") b.wait_visible("#inputPassword") b.wait_in_text("title", "OpenShift Origin") b.set_val("#inputUsername", "kuser") b.set_val("#inputPassword", "fubar") b.click('.btn-primary') b.expect_load() b.wait_present("#content") b.wait_text('#content-user-name', 'kuser') b.wait_in_text("title", "Openshift Cockpit") b.wait_in_text("#index-brand", "Openshift") b.enter_page("/kubernetes") b.wait_present("#node-list") b.logout() b.wait_in_text("#login-fatal-message", "Logout Successful") b.wait_not_visible('#login-button') b.open("/#error_description=AError") b.wait_not_visible('#login-button') b.wait_in_text("#login-fatal-message", "AError") b.click("#login-again") b.expect_load() b.set_val("#inputUsername", "kuser") b.set_val("#inputPassword", "foobar") b.click('.btn-primary') b.expect_load() b.wait_present("#content") class TestRegistry(OCCase): machine_class = RegistryMachine def testLogin(self): b = self.browser self.machine.start_cockpit() b.open("/") b.expect_load() b.wait_visible("#inputUsername") b.wait_visible("#inputPassword") b.wait_in_text("title", "OpenShift Origin") b.set_val("#inputUsername", "kuser") b.set_val("#inputPassword", "fubar") b.click('.btn-primary') b.expect_load() b.wait_present("#content") b.wait_text('#content-user-name', 'kuser') b.wait_in_text("title", "Image Registry") b.wait_in_text("#index-brand", "Atomic Registry") b.enter_page("/registry") b.wait_present(".dashboard-commands") b.switch_to_top() b.wait_not_visible("#content-navbar") b.logout() b.wait_in_text("#login-fatal-message", "Logout Successful") b.wait_not_visible('#login-button') b.open("/#error_description=AError") b.wait_not_visible('#login-button') b.wait_in_text("#login-fatal-message", "AError") b.click("#login-again") b.expect_load() b.set_val("#inputUsername", "kuser") b.set_val("#inputPassword", "foobar") b.click('.btn-primary') b.expect_load() b.wait_present("#content") if __name__ == '__main__': test_main()