#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2013 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . import parent from testlib import * import subprocess import re def kill_user_admin(machine): # logind from systemd 208 is buggy, so we use systemd directly if it fails # https://bugs.freedesktop.org/show_bug.cgi?id=71092 machine.execute("loginctl terminate-user admin || systemctl kill user-1000.slice") def authorize_user(m, user): m.execute("mkdir -p /home/{0}/.ssh".format(user)) m.upload(["verify/files/ssh/id_rsa.pub"], "/home/{0}/.ssh/authorized_keys".format(user)) m.execute("chown -R {0}:{0} /home/{0}/.ssh/".format(user)) m.execute("chmod 600 /home/{0}/.ssh/authorized_keys".format(user)) m.execute("chmod 700 /home/{0}/.ssh".format(user)) LOAD_KEYS = [ "id_rsa", # password: foobar "id_ecdsa", # no password "id_dsa", # password: badbad "id_ed25519", # password: foobar ] KEY_IDS = [ "2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw /home/admin/.ssh/id_rsa (RSA)", "256 SHA256:dyHF4jiKz6RolQqORIATqhbZ4kil5cyiMQWizbQWU8k /home/admin/.ssh/id_ecdsa (ECDSA)", "256 SHA256:Wd028KYmG3OVLp7dBmdx0gMR7VcarJVIfaTtKqYCmak /home/admin/.ssh/id_ed25519 (ED25519)" ] KEY_IDS_MD5 = [ "2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1 /home/admin/.ssh/id_rsa (RSA)", "256 bd:56:df:c3:ff:e4:1d:9d:f5:c4:b9:cc:64:00:d5:93 /home/admin/.ssh/id_ecdsa (ECDSA)", "256 b5:80:1b:f5:98:89:2a:39:f3:78:b3:64:5c:64:33:17 /home/admin/.ssh/id_ed25519 (ED25519)" ] @skipImage("No cockpit-ssh on Atomic", "continuous-atomic", "fedora-atomic", "rhel-atomic") class TestMultiMachineKeyAuth(MachineCase): additional_machines = { 'machine2': { } } def load_key(self, name, password): self.browser.switch_to_top() self.browser.eval_js("loaded = false") self.browser.eval_js(""" load = function (user) {{ var proc = cockpit.spawn([ 'ssh-add', '{0}' ], {{ pty: true, directory: user.home + '/.ssh' }}) .stream(function(data) {{ if (data.indexOf('passphrase') !== -1) proc.input('{1}\\n', true); console.log(data); }}) .done(function() {{ loaded = true; }}) .fail(function(ex) {{ console.log(ex); }}); }} """.format(name, password)) self.browser.eval_js("cockpit.user().done(load)") def check_keys(self, keys_md5, keys): def normalize(k): return re.sub("/home/admin/\\.ssh/[^ ]*|test@test|ecdsa w/o comment", "", k) self.assertIn(normalize(self.browser.eval_js("cockpit.spawn([ 'ssh-add', '-l' ])")), [normalize("\n".join(keys_md5) + "\n"), normalize("\n".join(keys) + "\n")]) def setUp(self): MachineCase.setUp(self) self.machine2 = self.machines['machine2'] # Add user self.machine2.disconnect() self.machine2.execute("useradd user -c 'User' || true", direct=True) self.machine2.execute("sed -i 's/.*PasswordAuthentication.*/PasswordAuthentication no/' /etc/ssh/sshd_config", direct=True) self.machine2.execute("( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service", direct=True) self.machine2.wait_execute() def tearDown(self): if self.runner and self.runner.wasSuccessful(): self.check_journal_messages(self.machine2) MachineCase.tearDown(self) def testBasic(self): b = self.browser m1 = self.machine m2 = self.machine2 # Load keys m1.execute("mkdir -p /home/admin/.ssh") m1.upload(["verify/files/ssh/{0}".format(k) for k in LOAD_KEYS], "/home/admin/.ssh/") m1.upload(["verify/files/ssh/{0}.pub".format(k) for k in LOAD_KEYS], "/home/admin/.ssh/") m1.execute("chmod 400 /home/admin/.ssh/*") m1.execute("chown -R admin:admin /home/admin/.ssh") self.login_and_go("/system") try: m1.execute("ps xa | grep ssh-agent | grep -v grep") except subprocess.CalledProcessError: assert False, "No running ssh-agent found" # pam-ssh-add isn't used on atomic if m1.atomic_image: self.load_key('id_rsa', 'foobar') b.wait_js_cond('loaded === true'); self.check_keys(["2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1 id_rsa (RSA)"], ["2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw id_rsa (RSA)"]) # Check our keys were loaded. self.check_keys(KEY_IDS_MD5, KEY_IDS) # Add machine b.switch_to_top() b.go("/@%s" % m2.address) b.wait_visible("#machine-troubleshoot") b.click('#machine-troubleshoot') b.wait_popup('troubleshoot-dialog') b.wait_text('#troubleshoot-dialog .btn-primary', "Add") b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])") b.click('#troubleshoot-dialog .btn-primary') b.wait_in_text('#troubleshoot-dialog', "Fingerprint") b.click('#troubleshoot-dialog .btn-primary') b.wait_in_text('#troubleshoot-dialog h4', "Log in to") b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])") b.wait_visible("#login-available") b.wait_not_present("#login-diff-password") b.wait_present("#login-custom-user") self.assertEqual(b.val("#login-custom-user"), "") b.set_val("#login-custom-user", "user") self.assertEqual(b.text("#login-type button span"), "Using available credentials") b.wait_not_present("#login-type li[value=password] a"); b.wait_visible("#login-available") b.wait_not_in_text("#login-available", "Login Password") b.wait_in_text("#login-available", "id_rsa") if not m1.atomic_image: b.wait_in_text("#login-available", "id_ecdsa") b.wait_in_text("#login-available", "id_ed25519") # add key authorize_user(m2, "user") # Login b.click("#troubleshoot-dialog .btn-primary") b.wait_popdown('troubleshoot-dialog') b.enter_page("/system", host="user@{0}".format(m2.address)) # Logout b.logout() b.wait_visible("#login") # Make sure ssh-agent exits wait(lambda: "ssh-agent" not in m1.execute("ps xa | grep ss[h]-agent || true")) # Remove authorized keys m2.execute("rm /home/user/.ssh/authorized_keys") authorize_user(m2, "admin") self.login_and_go("/system") b.switch_to_top() # pam-ssh-add isn't used on atomic if m1.atomic_image: self.load_key('id_rsa', 'foobar') b.go("/@%s" % m2.address) b.wait_visible("#machine-troubleshoot") b.click('#machine-troubleshoot') b.wait_popup('troubleshoot-dialog') b.wait_present('#troubleshoot-dialog h4') b.wait_in_text('#troubleshoot-dialog h4', "Log in to") b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])") b.wait_visible("#login-available") b.wait_visible("#login-custom-user") self.assertEqual(b.val("#login-custom-user"), "user") b.set_val("#login-custom-user", "") b.click("#troubleshoot-dialog .btn-primary") b.wait_popdown('troubleshoot-dialog') b.enter_page("/system", host=m2.address) b.wait_not_present("iframe.container-frame[name='cockpit1:user@{0}/system']".format(m2.address)) # Change user authorize_user(m2, "user") self.allow_restart_journal_messages() self.allow_hostkey_messages() # Might happen when killing the bridge. self.allow_journal_messages("localhost: dropping message while waiting for child to exit", "Received message for unknown channel: .*", ".*: error reading from ssh", ".*: bridge program failed: Child process exited with code .*", # Since there is not password, # reauthorize doesn't work on m2 "received authorize command for wrong user: user", ".*: user admin reauthorization failed", "Error executing command as another user: Not authorized", "This incident has been reported.", "sudo: a password is required") # No dashboard on Atomic Host if m1.image in [ "continuous-atomic", "fedora-atomic", "rhel-atomic" ]: return b.go("/dashboard") b.enter_page("/dashboard") b.click('#dashboard-enable-edit') b.wait_present("#dashboard-hosts .list-group-item[data-address='{0}']".format(m2.address)) b.wait_visible("#dashboard-hosts .list-group-item[data-address='{0}'] button.pficon-edit".format(m2.address)) b.click("#dashboard-hosts .list-group-item[data-address='{0}'] button.pficon-edit".format(m2.address)) b.wait_popup('host-edit-dialog') b.wait_visible('#host-edit-user') self.assertEqual(b.val("#host-edit-user"), "") b.set_val('#host-edit-user', 'user') b.click('#host-edit-apply') b.wait_popdown('host-edit-dialog') b.wait_not_present("iframe.container-frame[name='cockpit1:{0}/system']".format(m2.address)) b.click("#dashboard-hosts .list-group-item[data-address='{0}']".format(m2.address)) b.enter_page("/system", host="user@{0}".format(m2.address)) if __name__ == '__main__': test_main()