#!/usr/bin/python
# -*- coding: utf-8 -*-
# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see .
import parent
from testlib import *
import subprocess
import base64
class TestConnection(MachineCase):
def testBasic(self):
b = self.browser
m = self.machine
m.start_cockpit()
# take cockpit-ws down on the login page
b.open("/system")
b.wait_visible("#login")
b.set_val("#login-user-input", "admin")
b.set_val("#login-password-input", "foobar")
m.stop_cockpit()
b.click('#login-button')
b.wait_text_not('#login-fatal-message', "")
m.start_cockpit()
b.reload()
b.wait_visible("#login")
b.set_val("#login-user-input", "admin")
b.set_val("#login-password-input", "foobar")
b.click('#login-button')
b.expect_load()
b.enter_page("/system")
# take cockpit-ws down on the server page
m.stop_cockpit()
b.switch_to_top()
b.wait_visible(".curtains-ct")
b.wait_in_text(".curtains-ct h1", "Disconnected")
m.start_cockpit()
b.click("#machine-reconnect")
b.expect_load()
b.wait_visible("#login")
b.set_val("#login-user-input", "admin")
b.set_val("#login-password-input", "foobar")
# sever the connection on the login page
m.execute("iptables -w -I INPUT 1 -p tcp --dport 9090 -j REJECT")
b.click('#login-button')
with b.wait_timeout(20):
b.wait_text_not('#login-fatal-message', "")
m.execute("iptables -w -D INPUT 1")
b.reload()
b.wait_visible("#login")
b.set_val("#login-user-input", "admin")
b.set_val("#login-password-input", "foobar")
b.click('#login-button')
b.expect_load()
b.enter_page("/system")
# sever the connection on the server page
m.execute("iptables -w -I INPUT 1 -p tcp --dport 9090 -j REJECT")
b.switch_to_top()
with b.wait_timeout(60):
b.wait_visible(".curtains-ct")
b.wait_in_text(".curtains-ct h1", "Disconnected")
b.wait_in_text('.curtains-ct p', "Connection has timed out.")
m.execute("iptables -w -D INPUT 1")
b.click("#machine-reconnect")
b.expect_load()
b.enter_page("/system")
# Reauthorization can fail due to disconnects above
self.allow_authorize_journal_messages()
self.allow_restart_journal_messages()
# Debian 8 doesn't have systemd with coredump support
if m.image in [ "debian-stable" ]:
return
# Lets crash a systemd-crontrolled process and see if we get a proper backtrace in the logs
# This helps with debugging failures in the tests elsewhere
m.execute("systemctl start systemd-hostnamed; pkill -e -SEGV systemd-hostnam")
wait(lambda: m.execute("journalctl -b | grep 'Process.*systemd-hostnam.*of user.*dumped core.'"))
# Make sure the core dumps exist in the directory, so we can download them
cores = m.execute("find /var/lib/systemd/coredump -type f")
self.assertNotEqual(cores, "")
self.allow_journal_messages(".*org.freedesktop.hostname1.*DBus.Error.NoReply.*")
def testTls(self):
m = self.machine
# Start Cockpit with TLS
m.start_cockpit(tls=True)
null = open("/dev/null")
args = ['openssl', 's_client', '-connect', m.address + ":9090" ]
# A normal TLS connection works
m.message(repr(args))
output = subprocess.check_output(args, stdin=null, stderr=subprocess.STDOUT)
m.message(output)
self.assertIn("DONE", output)
# SSLv3 should not work
try:
cmd = args + [ '-ssl3' ]
m.message(repr(cmd))
output = subprocess.check_output(cmd, stdin=null, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError, ex:
m.message(ex.output)
# Some operating systems fail SSL3 on the server side
if m.image in [ "debian-testing", "fedora-25", "fedora-26", "fedora-i386", "fedora-testing", "ubuntu-stable" ]:
self.assertIn("ssl handshake failure", ex.output)
else:
self.assertIn("wrong version number", ex.output)
else:
m.message(output)
self.fail("SSL3 should not have been successful")
# RC4 should not work
try:
cmd = args + [ '-cipher', 'RC4' ]
m.message(repr(cmd))
output = subprocess.check_output(cmd, stdin=null, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError, ex:
m.message(ex.output)
self.assertIn("ssl handshake failure", ex.output)
else:
m.message(output)
self.fail("RC4 cipher should not have been usable")
# Install a certificate chain, and give it an arbitrary bad file context
m.upload([ "verify/files/cert-chain.cert" ], "/etc/cockpit/ws-certs.d")
m.execute("! selinuxenabled || chcon --type svirt_sandbox_file_t /etc/cockpit/ws-certs.d/cert-chain.cert")
# This should also reset the file context
m.restart_cockpit()
# Should use the new certificates and entire chain should show up
m.message(repr(args))
output = subprocess.check_output(args, stdin=null, stderr=subprocess.STDOUT)
m.message(output)
self.assertIn("DONE", output)
self.assertIn("s:/CN=localhost", output)
self.assertIn("1 s:/OU=Intermediate", output)
# login handler: correct password
m.execute("curl -k -c cockpit.jar -s --head --header 'Authorization: Basic {}' https://{}:9090/cockpit/login".format(base64.b64encode("admin:foobar"), m.address))
headers = m.execute("curl -k --head -b cockpit.jar -s https://{}:9090/".format(m.address))
self.assertIn("default-src 'self' https://{}:9090; connect-src 'self' https://{}:9090 ws: wss".format(m.address, m.address), headers)
self.assertIn("Access-Control-Allow-Origin: https://{}:9090".format(m.address), headers)
self.allow_journal_messages(
".*Peer failed to perform TLS handshake",
".*Error performing TLS handshake: Could not negotiate a supported cipher suite.")
def testConfigOrigins(self):
m = self.machine
m.execute('mkdir -p /etc/cockpit/ && echo "[WebService]\nOrigins = http://other-origin:9090 http://localhost:9090" > /etc/cockpit/cockpit.conf')
m.start_cockpit()
output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/cockpit/socket')
self.assertIn('"no-session"', output)
# The socket should also answer at /socket
output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/socket')
self.assertIn('"no-session"', output)
self.allow_journal_messages('peer did not close io when expected')
@skipImage("Atomic changes ports differently", "fedora-atomic", "rhel-atomic", "continuous-atomic")
def testSocketPort(self):
m = self.machine
# Change port according to documentation: http://cockpit-project.org/guide/latest/listen.html
m.execute('! selinuxenabled || semanage port -m -t websm_port_t -p tcp 443')
m.execute('mkdir -p /etc/systemd/system/cockpit.socket.d/ && printf "[Socket]\nListenStream=\nListenStream=443" > /etc/systemd/system/cockpit.socket.d/listen.conf')
m.start_cockpit(tls=True)
output = m.execute('curl -k https://localhost 2>&1 || true')
self.assertIn('Loading...', output)
output = m.execute('curl -k https://localhost:9090 2>&1 || true')
self.assertIn('Connection refused', output)
self.allow_journal_messages(".*Peer failed to perform TLS handshake")
@skipImage("Atomic doesn't have cockpit-ws", "fedora-atomic", "rhel-atomic", "continuous-atomic")
def testCommandline(self):
m = self.machine
m.execute('mkdir -p /test/cockpit/ws-certs.d && echo "[WebService]\nLoginTitle = A Custom Title" > /test/cockpit/cockpit.conf')
m.execute('mkdir -p /test/cockpit/static/ && echo "
Custom Default Root
" > /test/cockpit/static/login.html')
m.execute("XDG_CONFIG_DIRS=/test XDG_DATA_DIRS=/test remotectl certificate --ensure")
self.assertTrue(m.execute("ls /test/cockpit/ws-certs.d/*"))
self.assertFalse(m.execute("ls /etc/cockpit/ws-certs.d/* || true"))
executable = "/usr/libexec/cockpit-ws"
if "debian" in m.image or "ubuntu" in m.image:
executable = "/usr/lib/cockpit/cockpit-ws"
m.execute("XDG_CONFIG_DIRS=/test XDG_DATA_DIRS=/test {} --port 9000 --address 127.0.0.1 0<&- &>/dev/null &".format(executable))
# The port may not be available immediately, so wait for it
wait(lambda: 'A Custom Title' in m.execute('curl -s -k https://localhost:9000/'))
output = m.execute('curl -s -S -k https://{}:9000/ 2>&1 || true'.format(m.address))
self.assertIn('Connection refused', output)
def testHeadRequest(self):
m = self.machine
m.start_cockpit()
# static handler
headers = m.execute("curl -s --head http://localhost:9090/cockpit/static/login.min.html")
self.assertIn("HTTP/1.1 200 OK\r\n", headers)
self.assertIn("Content-Type: text/html\r\n", headers)
# login.html is not always accessible as a file (e. g. in Atomic), so just assert a reasonable content length
self.assertIn("Content-Length: ", headers)
l = int(headers.split('Content-Length: ', 1)[1].split()[0])
self.assertGreater(l, 10000)
self.assertLess(l, 100000)
# login handler: wrong password
headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://localhost:9090/cockpit/login".format(
base64.b64encode("admin:hahawrong")))
self.assertIn("HTTP/1.1 401 Authentication failed\r\n", headers)
self.assertNotIn("Set-Cookie:", headers)
# login handler: correct password
headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://localhost:9090/cockpit/login".format(
base64.b64encode("admin:foobar")))
self.assertIn("HTTP/1.1 200 OK\r\n", headers)
self.assertIn("Set-Cookie: cockpit", headers)
# socket handler; this should refuse HEAD (as it makes little sense on sockets), so 404
headers = m.execute("curl -s --head http://localhost:9090/cockpit/socket")
self.assertIn("HTTP/1.1 404 Not Found\r\n", headers)
# external channel handler; unauthenticated, thus 404
headers = m.execute("curl -s --head http://localhost:9090/cockpit+123/channel/foo")
self.assertIn("HTTP/1.1 404 Not Found\r\n", headers)
if __name__ == '__main__':
test_main()