#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2013 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . import parent from testlib import * import os import re RESOLV_SCRIPT = """ set -e # HACK: Racing with operating systems reading/updating resolv.conf and # the fact that resolv.conf can be a symbolic link. Avoid failures like: # chattr: Operation not supported while reading flags on /etc/resolv.conf for x in 1 2 3 4 5; do hostnamectl set-hostname x0.cockpit.lan printf 'domain cockpit.lan\nsearch cockpit.lan\nnameserver {address}\n' >/etc/resolv2.conf chcon -v unconfined_u:object_r:net_conf_t:s0 /etc/resolv2.conf || true mv /etc/resolv2.conf /etc/resolv.conf if chattr +i /etc/resolv.conf; then break else sleep $x fi done """ def prepare_resolv(machine, address): machine.execute(script=RESOLV_SCRIPT.format(address=address)) @skipImage("No realmd available", "continuous-atomic", "fedora-atomic", "rhel-atomic") @skipImage("No freeipa available", "debian-stable", "debian-testing") class TestRealms(MachineCase): additional_machines = { 'ipa': { 'machine': { 'image': 'ipa' }, 'start': { 'memory_mb': 2048 } } } def testIpa(self): m = self.machine b = self.browser self.login_and_go("/system") # Use the FreeIPA instance as the DNS server prepare_resolv(m, self.machines['ipa'].address) # Wait for DNS to work as expected. # https://bugzilla.redhat.com/show_bug.cgi?id=1071356#c11 # wait(lambda: m.execute("nslookup -type=SRV _ldap._tcp.cockpit.lan")) def wait_number_domains(n): if n == 0: b.wait_text("#system-info-domain a", "Join Domain") else: b.wait_text_not("#system-info-domain a", "Join Domain") b.wait_not_attr("#system-info-domain a", "disabled", "disabled") wait_number_domains(0) # Join cockpit.lan b.click("#system-info-domain a") b.wait_popup("realms-op") with b.wait_timeout(180): b.set_val(".realms-op-address", "cockpit.lan") b.wait_attr(".realms-op-admin", "placeholder", 'e.g. "admin"') b.set_val(".realms-op-admin", "admin") b.set_val(".realms-op-admin-password", "foobarfoo") b.wait_not_visible(".realms-op-leave-only-row") b.click(".realms-op-apply") b.wait_popdown("realms-op") # Check that this has worked wait_number_domains(1) # Leave the domain b.click("#system-info-domain a") b.wait_popup("realms-op") b.wait_visible(".realms-op-leave-only-row") b.wait_in_text(".realms-op-leave-only-row", "cockpit.lan") b.click(".realms-op-apply") b.wait_popdown("realms-op") wait_number_domains(0) # Send a wrong password b.click("#system-info-domain a") b.wait_popup("realms-op") b.set_val(".realms-op-address", "cockpit.lan") b.wait_attr(".realms-op-admin", "placeholder", 'e.g. "admin"') b.set_val(".realms-op-admin", "admin") b.set_val(".realms-op-admin-password", "foo") b.click(".realms-op-apply") b.wait_text_not(".realms-op-error", "") b.wait_not_visible(".realms-op-leave-only-row") b.click(".realms-op-cancel") b.wait_popdown("realms-op") # Try to join a non-existing domain b.click("#system-info-domain a") b.wait_popup("realms-op") b.set_val(".realms-op-address", "NOPE") b.wait_js_cond("$('.realms-op-address-error').attr('title') != ''") b.click(".realms-op-cancel") b.wait_popdown("realms-op") # Cancel a join b.click("#system-info-domain a") b.wait_popup("realms-op") b.set_val(".realms-op-address", "cockpit.lan") b.wait_attr(".realms-op-admin", "placeholder", 'e.g. "admin"') b.set_val(".realms-op-admin", "admin") b.set_val(".realms-op-admin-password", "foobarfoo") b.click(".realms-op-apply") b.wait_visible(".realms-op-spinner") b.click(".realms-op-cancel") b.wait_popdown("realms-op") # HACK sssd regression # https://github.com/cockpit-project/cockpit/issues/5142 # https://bugzilla.redhat.com/show_bug.cgi?id=1380953 self.allow_journal_messages('.*denied.*{ getattr } for.*comm="sssd" name="/".*') def testNotSupported(self): m = self.machine b = self.browser # Disable sssd support in realmd realmd_distro_conf = "/usr/lib/realmd/realmd-distro.conf" if "rhel" in m.image or "centos" in m.image: realmd_distro_conf = "/usr/lib64/realmd/realmd-distro.conf" m.execute("echo -e '[providers]\nsssd = no\n' >>%s" % realmd_distro_conf) self.login_and_go("/system") # Use the FreeIPA instance as the DNS server prepare_resolv(m, self.machines['ipa'].address) # Wait for DNS to work as expected. # https://bugzilla.redhat.com/show_bug.cgi?id=1071356#c11 # wait(lambda: m.execute("nslookup -type=SRV _ldap._tcp.cockpit.lan")) # Join cockpit.lan b.click("#system-info-domain a") b.wait_popup("realms-op") b.set_val(".realms-op-address", "cockpit.lan") b.wait_js_cond("$('.realms-op-address-error').attr('title') != ''") b.set_val(".realms-op-admin", "admin") b.set_val(".realms-op-admin-password", "foobarfoo") b.click(".realms-op-apply") b.wait_text(".realms-op-error", "Joining this domain is not supported") JOIN_SCRIPT = """ set -ex # HACK: https://bugzilla.redhat.com/show_bug.cgi?id=1071356#c11 for x in $(seq 1 20); do if nslookup -type=SRV _ldap._tcp.cockpit.lan; then break else sleep $x fi done if ! echo '%(password)s' | realm join -U admin cockpit.lan; then journalctl -u realmd.service exit 1 fi echo '%(password)s' | kinit -f admin@COCKPIT.LAN # HACK: https://bugzilla.redhat.com/show_bug.cgi?id=1144292 curl --insecure -s --negotiate -u : https://f0.cockpit.lan/ipa/json --header 'Referer: https://f0.cockpit.lan/ipa' --header "Content-Type: application/json" --header "Accept: application/json" --data '{"params": [["HTTP/x0.cockpit.lan@COCKPIT.LAN"], {"raw": false, "all": false, "version": "2.101", "force": true, "no_members": false, "ipakrbokasdelegate": true}], "method": "service_add", "id": 0}' ipa-getkeytab -q -s f0.cockpit.lan -p HTTP/x0.cockpit.lan -k /etc/krb5.keytab # HACK: https://bugs.freedesktop.org/show_bug.cgi?id=98479 if ! grep -q 'services.*nss' /etc/sssd/sssd.conf; then sed -i 's/^services = sudo, ssh$/services = sudo, ssh, nss, pam/' /etc/sssd/sssd.conf systemctl restart sssd fi # HACK: This needs to work, but may take a minute for x in $(seq 1 60); do if getent passwd admin@cockpit.lan; then break fi sleep $x done # This directory should be owned by the domain user getent passwd admin@cockpit.lan chown -R admin@cockpit.lan /home/admin # HACK: This needs to work but may take a minute for x in $(seq 1 60); do if ssh -oStrictHostKeyChecking=no -oBatchMode=yes -l admin@cockpit.lan x0.cockpit.lan true; then break fi sleep $x done """ # This is here because our test framework can't run ipa VM's twice @skipImage("No realmd available", "continuous-atomic", "fedora-atomic", "rhel-atomic") @skipImage("No freeipa available", "debian-stable", "debian-testing") class TestKerberos(MachineCase): additional_machines = { 'ipa': { 'machine': { 'image': 'ipa' }, 'start': { 'memory_mb': 2048 } } } def configure_kerberos(self): # Setup a place for kerberos caches args = { "addr": self.machines['ipa'].address, "password": "foobarfoo" } prepare_resolv(self.machine, self.machines['ipa'].address) self.machine.execute(script=JOIN_SCRIPT % args) def tearDown(self): if 'KRB5CCNAME' in os.environ: del os.environ['KRB5CCNAME'] if 'KRB_CONFIG' in os.environ: del os.environ['KRB5_CONFIG'] MachineCase.tearDown(self) def testNegotiate(self): self.allow_authorize_journal_messages() self.allow_hostkey_messages() b = self.browser # HACK: There is no operating system where the domain admins are admins by default # This is something that needs to be worked on at an OS level. We use admin level # features below, such as adding a machine to the dashboard self.machine.execute("echo 'admin@cockpit.lan ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers") # Make sure negotiate auth is not offered first self.machine.start_cockpit() output = self.machine.execute(['/usr/bin/curl -v -s', '--resolve', 'x0.cockpit.lan:9090:%s' % self.machine.address, 'http://x0.cockpit.lan:9090/cockpit/login', '2>&1']) self.assertIn("HTTP/1.1 401", output) self.assertNotIn("WWW-Authenticate: Negotiate", output) self.configure_kerberos() self.machine.restart_cockpit() output = self.machine.execute(['/usr/bin/curl', '-s', '--negotiate', '--delegation', 'always', '-u', ':', "-D", "-", '--resolve', 'x0.cockpit.lan:9090:%s' % self.machine.address, 'http://x0.cockpit.lan:9090/cockpit/login']) self.assertIn("HTTP/1.1 200 OK", output) self.assertIn('"csrf-token"', output) cookie = re.search("Set-Cookie: cockpit=([^ ;]+)", output).group(1) b.open("/system/terminal", cookie={ "name": "cockpit", "value": cookie, "domain": self.machine.address, "path": "/" }) b.wait_present('#content') b.wait_visible('#content') b.enter_page("/system/terminal") b.wait_present(".terminal") b.focus(".terminal") def line_sel(i): return '.terminal > div:nth-child(%d)' % i def wait_prompt(b, line): b.wait_js_func("(function (sel) { return ph_text(sel).trim() != ''})", line_sel(line)) wait_prompt(b, 1) # run kinit and see if got forwarded the kerberos ticket into the session b.key_press(list("klist") + [ "Return" ]) wait_prompt(b, 2) text = b.text(line_sel(2)).replace(u"\xa0", " ").strip() self.assertIn("Ticket cache", text) # Now connect to another machine self.assertNotIn("admin@cockpit.lan", self.machine.execute("ps -xa | grep sshd")) b.switch_to_top() b.go("/@x0.cockpit.lan/system/terminal") b.wait_visible("#machine-troubleshoot") b.click("#machine-troubleshoot") b.wait_popup('troubleshoot-dialog') b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])") b.wait_text('#troubleshoot-dialog .btn-primary', "Add") b.click('#troubleshoot-dialog .btn-primary') b.wait_in_text('#troubleshoot-dialog', "Fingerprint") b.click('#troubleshoot-dialog .btn-primary') b.wait_popdown('troubleshoot-dialog') b.enter_page("/system/terminal", host="x0.cockpit.lan") b.wait_present(".terminal") b.wait_visible(".terminal") # Make sure we connected via SSH self.assertIn("admin@cockpit.lan", self.machine.execute("ps -xa | grep sshd")) if __name__ == '__main__': test_main()