#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2013 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . import parent from testlib import * import subprocess import base64 class TestConnection(MachineCase): def testBasic(self): b = self.browser m = self.machine m.start_cockpit() # take cockpit-ws down on the login page b.open("/system") b.wait_visible("#login") b.set_val("#login-user-input", "admin") b.set_val("#login-password-input", "foobar") m.stop_cockpit() b.click('#login-button') b.wait_text_not('#login-fatal-message', "") m.start_cockpit() b.reload() b.wait_visible("#login") b.set_val("#login-user-input", "admin") b.set_val("#login-password-input", "foobar") b.click('#login-button') b.expect_load() b.enter_page("/system") # take cockpit-ws down on the server page m.stop_cockpit() b.switch_to_top() b.wait_visible(".curtains-ct") b.wait_in_text(".curtains-ct h1", "Disconnected") m.start_cockpit() b.click("#machine-reconnect") b.expect_load() b.wait_visible("#login") b.set_val("#login-user-input", "admin") b.set_val("#login-password-input", "foobar") # sever the connection on the login page m.execute("iptables -w -I INPUT 1 -p tcp --dport 9090 -j REJECT") b.click('#login-button') with b.wait_timeout(20): b.wait_text_not('#login-fatal-message', "") m.execute("iptables -w -D INPUT 1") b.reload() b.wait_visible("#login") b.set_val("#login-user-input", "admin") b.set_val("#login-password-input", "foobar") b.click('#login-button') b.expect_load() b.enter_page("/system") # sever the connection on the server page m.execute("iptables -w -I INPUT 1 -p tcp --dport 9090 -j REJECT") b.switch_to_top() with b.wait_timeout(60): b.wait_visible(".curtains-ct") b.wait_in_text(".curtains-ct h1", "Disconnected") b.wait_in_text('.curtains-ct p', "Connection has timed out.") m.execute("iptables -w -D INPUT 1") b.click("#machine-reconnect") b.expect_load() b.enter_page("/system") # Reauthorization can fail due to disconnects above self.allow_authorize_journal_messages() self.allow_restart_journal_messages() # Debian 8 doesn't have systemd with coredump support if m.image in [ "debian-stable" ]: return # Lets crash a systemd-crontrolled process and see if we get a proper backtrace in the logs # This helps with debugging failures in the tests elsewhere m.execute("systemctl start systemd-hostnamed; pkill -e -SEGV systemd-hostnam") wait(lambda: m.execute("journalctl -b | grep 'Process.*systemd-hostnam.*of user.*dumped core.'")) # Make sure the core dumps exist in the directory, so we can download them cores = m.execute("find /var/lib/systemd/coredump -type f") self.assertNotEqual(cores, "") self.allow_journal_messages(".*org.freedesktop.hostname1.*DBus.Error.NoReply.*") def testTls(self): m = self.machine # Start Cockpit with TLS m.start_cockpit(tls=True) null = open("/dev/null") args = ['openssl', 's_client', '-connect', m.address + ":9090" ] # A normal TLS connection works m.message(repr(args)) output = subprocess.check_output(args, stdin=null, stderr=subprocess.STDOUT) m.message(output) self.assertIn("DONE", output) # SSLv3 should not work try: cmd = args + [ '-ssl3' ] m.message(repr(cmd)) output = subprocess.check_output(cmd, stdin=null, stderr=subprocess.STDOUT) except subprocess.CalledProcessError, ex: m.message(ex.output) # Some operating systems fail SSL3 on the server side if m.image in [ "debian-testing", "fedora-25", "fedora-26", "fedora-i386", "fedora-testing", "ubuntu-stable" ]: self.assertIn("ssl handshake failure", ex.output) else: self.assertIn("wrong version number", ex.output) else: m.message(output) self.fail("SSL3 should not have been successful") # RC4 should not work try: cmd = args + [ '-cipher', 'RC4' ] m.message(repr(cmd)) output = subprocess.check_output(cmd, stdin=null, stderr=subprocess.STDOUT) except subprocess.CalledProcessError, ex: m.message(ex.output) self.assertIn("ssl handshake failure", ex.output) else: m.message(output) self.fail("RC4 cipher should not have been usable") # Install a certificate chain, and give it an arbitrary bad file context m.upload([ "verify/files/cert-chain.cert" ], "/etc/cockpit/ws-certs.d") m.execute("! selinuxenabled || chcon --type svirt_sandbox_file_t /etc/cockpit/ws-certs.d/cert-chain.cert") # This should also reset the file context m.restart_cockpit() # Should use the new certificates and entire chain should show up m.message(repr(args)) output = subprocess.check_output(args, stdin=null, stderr=subprocess.STDOUT) m.message(output) self.assertIn("DONE", output) self.assertIn("s:/CN=localhost", output) self.assertIn("1 s:/OU=Intermediate", output) # login handler: correct password m.execute("curl -k -c cockpit.jar -s --head --header 'Authorization: Basic {}' https://{}:9090/cockpit/login".format(base64.b64encode("admin:foobar"), m.address)) headers = m.execute("curl -k --head -b cockpit.jar -s https://{}:9090/".format(m.address)) self.assertIn("default-src 'self' https://{}:9090; connect-src 'self' https://{}:9090 ws: wss".format(m.address, m.address), headers) self.assertIn("Access-Control-Allow-Origin: https://{}:9090".format(m.address), headers) self.allow_journal_messages( ".*Peer failed to perform TLS handshake", ".*Error performing TLS handshake: Could not negotiate a supported cipher suite.") def testConfigOrigins(self): m = self.machine m.execute('mkdir -p /etc/cockpit/ && echo "[WebService]\nOrigins = http://other-origin:9090 http://localhost:9090" > /etc/cockpit/cockpit.conf') m.start_cockpit() output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/cockpit/socket') self.assertIn('"no-session"', output) # The socket should also answer at /socket output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/socket') self.assertIn('"no-session"', output) self.allow_journal_messages('peer did not close io when expected') @skipImage("Atomic changes ports differently", "fedora-atomic", "rhel-atomic", "continuous-atomic") def testSocketPort(self): m = self.machine # Change port according to documentation: http://cockpit-project.org/guide/latest/listen.html m.execute('! selinuxenabled || semanage port -m -t websm_port_t -p tcp 443') m.execute('mkdir -p /etc/systemd/system/cockpit.socket.d/ && printf "[Socket]\nListenStream=\nListenStream=443" > /etc/systemd/system/cockpit.socket.d/listen.conf') m.start_cockpit(tls=True) output = m.execute('curl -k https://localhost 2>&1 || true') self.assertIn('Loading...', output) output = m.execute('curl -k https://localhost:9090 2>&1 || true') self.assertIn('Connection refused', output) self.allow_journal_messages(".*Peer failed to perform TLS handshake") @skipImage("Atomic doesn't have cockpit-ws", "fedora-atomic", "rhel-atomic", "continuous-atomic") def testCommandline(self): m = self.machine m.execute('mkdir -p /test/cockpit/ws-certs.d && echo "[WebService]\nLoginTitle = A Custom Title" > /test/cockpit/cockpit.conf') m.execute('mkdir -p /test/cockpit/static/ && echo "

Custom Default Root

" > /test/cockpit/static/login.html') m.execute("XDG_CONFIG_DIRS=/test XDG_DATA_DIRS=/test remotectl certificate --ensure") self.assertTrue(m.execute("ls /test/cockpit/ws-certs.d/*")) self.assertFalse(m.execute("ls /etc/cockpit/ws-certs.d/* || true")) executable = "/usr/libexec/cockpit-ws" if "debian" in m.image or "ubuntu" in m.image: executable = "/usr/lib/cockpit/cockpit-ws" m.execute("XDG_CONFIG_DIRS=/test XDG_DATA_DIRS=/test {} --port 9000 --address 127.0.0.1 0<&- &>/dev/null &".format(executable)) # The port may not be available immediately, so wait for it wait(lambda: 'A Custom Title' in m.execute('curl -s -k https://localhost:9000/')) output = m.execute('curl -s -S -k https://{}:9000/ 2>&1 || true'.format(m.address)) self.assertIn('Connection refused', output) def testHeadRequest(self): m = self.machine m.start_cockpit() # static handler headers = m.execute("curl -s --head http://localhost:9090/cockpit/static/login.min.html") self.assertIn("HTTP/1.1 200 OK\r\n", headers) self.assertIn("Content-Type: text/html\r\n", headers) # login.html is not always accessible as a file (e. g. in Atomic), so just assert a reasonable content length self.assertIn("Content-Length: ", headers) l = int(headers.split('Content-Length: ', 1)[1].split()[0]) self.assertGreater(l, 10000) self.assertLess(l, 100000) # login handler: wrong password headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://localhost:9090/cockpit/login".format( base64.b64encode("admin:hahawrong"))) self.assertIn("HTTP/1.1 401 Authentication failed\r\n", headers) self.assertNotIn("Set-Cookie:", headers) # login handler: correct password headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://localhost:9090/cockpit/login".format( base64.b64encode("admin:foobar"))) self.assertIn("HTTP/1.1 200 OK\r\n", headers) self.assertIn("Set-Cookie: cockpit", headers) # socket handler; this should refuse HEAD (as it makes little sense on sockets), so 404 headers = m.execute("curl -s --head http://localhost:9090/cockpit/socket") self.assertIn("HTTP/1.1 404 Not Found\r\n", headers) # external channel handler; unauthenticated, thus 404 headers = m.execute("curl -s --head http://localhost:9090/cockpit+123/channel/foo") self.assertIn("HTTP/1.1 404 Not Found\r\n", headers) if __name__ == '__main__': test_main()