#!/usr/bin/python
# -*- coding: utf-8 -*-
# This file is part of Cockpit.
#
# Copyright (C) 2017 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see .
import parent
from testlib import *
@skipImage("Image uses OSTree", "continuous-atomic", "fedora-atomic", "rhel-atomic")
@skipImage("PackageKit crashes, https://launchpad.net/bugs/1689820", "ubuntu-1604")
class TestUpdates(MachineCase):
def setUp(self):
MachineCase.setUp(self)
self.isApt = "debian" in self.machine.image or "ubuntu" in self.machine.image
# disable all existing repositories to avoid hitting the network
if self.isApt:
self.machine.execute("rm -f /etc/apt/sources.list.d/*; echo > /etc/apt/sources.list; apt-get update")
else:
self.machine.execute("rm -f /etc/yum.repos.d/* /var/cache/yum/*")
# have PackageKit start from a clean slate
self.machine.execute("systemctl stop packagekit; rm -rf /var/cache/PackageKit")
if self.machine.image in ["ubuntu-1604", "debian-stable"]:
# old PackageKit+NM on Debian/Ubuntu misdetect online status with ifupdown; work around
# https://launchpad.net/bugs/1694438; this doesn't affect pure ifupdown systems (servers) and
# pure NM systems (desktops)
self.machine.execute("sed -i '/managed=/ s/false/true/' /etc/NetworkManager/NetworkManager.conf; systemctl restart NetworkManager")
self.updateInfo = {}
def testBasic(self):
# no security updates, no changelogs
b = self.browser
m = self.machine
m.start_cockpit()
b.login_and_go("/updates")
# no repositories at all, thus no updates
b.wait_present(".content-header-extra td button")
b.wait_in_text("#state", "No updates pending")
b.wait_present(".content-header-extra td.text-right span")
# PK starts from a blank state, thus should force refresh and set the "time since" to 0
self.assertEqual(b.text(".content-header-extra td.text-right span"), "Last checked: a few seconds ago")
# empty state visible in main area
b.wait_present(".container-fluid div.blank-slate-pf")
# create two updates
self.createPackage("vanilla", "1.0", "1", install=True)
self.createPackage("vanilla", "1.0", "2")
self.createPackage("chocolate", "2.0", "1", install=True)
self.createPackage("chocolate", "2.0", "2")
self.enableRepo()
# check again
b.wait_in_text(".content-header-extra td button", "Check for updates")
b.click(".content-header-extra td button")
b.wait_present(".content-header-extra td.text-right span")
self.assertEqual(b.text(".content-header-extra td.text-right span"), "Last checked: a few seconds ago")
b.wait_present(".container-fluid h2")
b.wait_in_text(".container-fluid h2", "Available Updates")
self.assertEqual(b.text("#state"), "2 updates")
b.wait_present("table.listing-ct")
b.wait_in_text("table.listing-ct", "vanilla")
# chocolate update to 2.0-2
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) th span"), "chocolate")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) th .tooltip-inner"), "dummy chocolate")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) td:nth-of-type(1)"), "2.0-2")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) td:nth-of-type(2)"), "") # no bugs
# vanilla update to 1.0-2
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) th span"), "vanilla")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) th .tooltip-inner"), "dummy vanilla")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) td:nth-of-type(1)"), "1.0-2")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) td:nth-of-type(2)"), "") # no bugs
# old versions are still installed
m.execute("test -f /stamp-vanilla-1.0-1 && test -f /stamp-chocolate-2.0-1")
# should only have one button (no security updates)
self.assertEqual(b.text("#app .container-fluid button"), "Install all updates")
b.click("#app .container-fluid button")
b.wait_in_text("#state", "Applying updates")
b.wait_present("#app div.progress-bar")
# no refresh button or "last checked", but Cancel button
b.wait_present(".content-header-extra td.text-right button")
self.assertFalse(b.is_present(".content-header-extra td.text-right span"))
b.wait_in_text(".content-header-extra td button", "Cancel")
# Cancel button should eventually get disabled
b.wait_present(".content-header-extra td button:disabled")
# should have succeeded and show restart page; cancel
b.wait_present("#app .container-fluid h1")
b.wait_in_text("#app .container-fluid h1", "Restart Recommended")
b.wait_present("#app .container-fluid button.btn-primary")
self.assertEqual(b.text("#app .container-fluid button.btn-primary"), "Restart Now")
b.wait_present("#app .container-fluid button.btn-default")
self.assertEqual(b.text("#app .container-fluid button.btn-default"), "Ignore")
b.click("#app .container-fluid button.btn-default")
# should go back to updates overview, nothing pending any more
b.wait_present("#state")
b.wait_in_text("#state", "No updates pending")
b.wait_present(".content-header-extra td.text-right span")
b.wait_in_text(".content-header-extra td.text-right span", "Last checked:")
# empty state visible in main area
b.wait_present(".container-fluid div.blank-slate-pf")
# new versions are now installed
m.execute("test -f /stamp-vanilla-1.0-2 && test -f /stamp-chocolate-2.0-2")
@skipImage("apt on Debian 8 does not yet support custom changelog servers", "debian-stable")
def testInfoSecurity(self):
b = self.browser
m = self.machine
# just changelog
self.createPackage("norefs-bin", "1", "1", install=True)
self.createPackage("norefs-bin", "2", "1", severity="enhancement", changes="Now 10% more unicorns")
# binary from same source
self.createPackage("norefs-doc", "1", "1", install=True)
self.createPackage("norefs-doc", "2", "1", severity="enhancement", changes="Now 10% more unicorns")
# bug fixes
self.createPackage("buggy", "2", "1", install=True)
self.createPackage("buggy", "2", "2", changes="Fixit", bugs=[123, 456])
# security fix with proper CVE list and severity
self.createPackage("secdeclare", "3", "4.a1", install=True)
self.createPackage("secdeclare", "3", "4.b1", severity="security",
changes="stop kittens from dying", cves=['CVE-2016-0001'])
# security fix with parsing from changes
self.createPackage("secparse", "4", "1", install=True)
self.createPackage("secparse", "4", "2", changes="Fix CVE-2017-0001 and CVE-2017-0002.")
self.enableRepo()
m.execute("pkcon refresh")
m.start_cockpit()
b.login_and_go("/updates")
b.wait_present(".container-fluid h2")
b.wait_in_text(".container-fluid h2", "Available Updates")
self.assertEqual(b.text("#state"), "5 updates, including 2 security fixes")
b.wait_present("table.listing-ct")
b.wait_in_text("table.listing-ct", "secparse")
# security updates should get sorted on top and then alphabetically, so start with "secdeclare"
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) th span"), "secdeclare")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) td:nth-of-type(1)"), "3-4.b1")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) td:nth-of-type(2)"), "")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(1) td:nth-of-type(3) span.security-label-text"),
"Security Update: ")
desc = b.text("#app .listing-ct tbody:nth-of-type(1) td:nth-of-type(3)")
self.assertIn("stop kittens from dying", desc)
self.assertIn("CVE-2016-0001", desc)
# secparse should also be considered a security update as the changelog mentions CVEs
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) th span"), "secparse")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) td:nth-of-type(1)"), "4-2")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) td:nth-of-type(2)"), "")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(2) td:nth-of-type(3) span.security-label-text"),
"Security Update: ")
desc = b.text("#app .listing-ct tbody:nth-of-type(2) td:nth-of-type(3)")
self.assertIn("Fix CVE-2017-0001 and CVE-2017-0002.", desc)
# buggy: bug refs, no security
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(3) th span"), "buggy")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(3) td:nth-of-type(1)"), "2-2")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(3) td:nth-of-type(2)"), "123, 456")
self.assertIn("Fixit", b.text("#app .listing-ct tbody:nth-of-type(3) td:nth-of-type(3)"))
# norefs: just changelog, show both binary packages
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(4) th > div:nth-of-type(1) span"), "norefs-bin")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(4) th > div:nth-of-type(2) span"), "norefs-doc")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(4) td:nth-of-type(1)"), "2-1")
self.assertEqual(b.text("#app .listing-ct tbody:nth-of-type(4) td:nth-of-type(2)"), "") # no bugs
self.assertIn("Now 10% more unicorns", b.text("#app .listing-ct tbody:nth-of-type(4) td:nth-of-type(3)"))
# install only security updates
self.assertEqual(b.text("#app .container-fluid button.btn-default"), "Install security updates")
b.click("#app .container-fluid button.btn-default")
b.wait_in_text("#state", "Applying updates")
b.wait_present("#app div.progress-bar")
# should have succeeded and show restart page; cancel
b.wait_present("#app .container-fluid h1")
b.wait_in_text("#app .container-fluid h1", "Restart Recommended")
b.wait_present("#app .container-fluid button.btn-default")
b.click("#app .container-fluid button.btn-default")
# should have succeeded; 3 non-security updates left
b.wait_present("#state")
b.wait_in_text("#state", "3 updates")
b.wait_in_text(".container-fluid h2", "Available Updates")
b.wait_in_text("table.listing-ct", "norefs-doc")
self.assertIn("buggy", b.text("table.listing-ct"))
self.assertNotIn("secdeclare", b.text("table.listing-ct"))
self.assertNotIn("secparse", b.text("table.listing-ct"))
# new security versions are now installed
m.execute("test -f /stamp-secdeclare-3-4.b1 && test -f /stamp-secparse-4-2")
# but the three others are untouched
m.execute("test -f /stamp-buggy-2-1 && test -f /stamp-norefs-bin-1-1 && test -f /stamp-norefs-doc-1-1")
# should now only have one button (no security updates left)
self.assertEqual(b.text("#app .container-fluid button"), "Install all updates")
b.click("#app .container-fluid button")
b.wait_in_text("#state", "Applying updates")
b.wait_present("#app div.progress-bar")
# should have succeeded and show restart
b.wait_present("#app .container-fluid h1")
b.wait_in_text("#app .container-fluid h1", "Restart Recommended")
b.wait_present("#app .container-fluid button.btn-default")
# new versions are now installed
m.execute("test -f /stamp-norefs-bin-2-1 && test -f /stamp-norefs-doc-2-1")
# do the reboot; this will disconnect the web UI
m.reset_reboot_flag()
b.click("#app .container-fluid button.btn-primary")
b.switch_to_top()
b.wait_present(".curtains-ct")
b.wait_visible(".curtains-ct")
b.wait_in_text(".curtains-ct h1", "Disconnected")
# ensure that rebooting actually worked
m.wait_reboot()
m.start_cockpit()
b.reload()
b.login_and_go("/updates")
# no further updates
b.wait_present("#state")
b.wait_in_text("#state", "No updates pending")
# empty state visible in main area
b.wait_present(".container-fluid div.blank-slate-pf")
self.allow_restart_journal_messages()
def testUpdateError(self):
b = self.browser
m = self.machine
self.createPackage("vapor", "1", "1", install=True)
self.createPackage("vapor", "1", "2")
self.enableRepo()
m.execute("pkcon refresh")
# break the upgrade by removing the generated packages from the repo
m.execute("rm -f /tmp/repo/vapor*.deb /tmp/repo/vapor*.rpm")
m.start_cockpit()
b.login_and_go("/updates")
b.wait_present(".container-fluid h2")
b.wait_in_text(".container-fluid h2", "Available Updates")
self.assertEqual(b.text("#state"), "1 update")
b.wait_present("#app .container-fluid button")
b.click("#app .container-fluid button")
b.wait_in_text("#state", "Applying updates failed")
# expecting one error message, so this should be unique
b.wait_present("#app .container-fluid pre")
self.assertRegexpMatches(b.text("#app .container-fluid pre"), "missing|downloading|not.*available|No such file or directory")
# not expecting any buttons
self.assertFalse(b.is_present("#app button"))
def testRunningUpdate(self):
# The main case for this is that cockpit-ws itself gets upgraded, which
# restarts the service and terminates the connection. As we can't
# (efficiently) build a newer working cockpit-ws package, test the two
# parts (reconnect and warning about disconnect) separately.
# no security updates, no changelogs
b = self.browser
m = self.machine
# updating this package takes longer than a cockpit start and building the page
self.createPackage("slow", "1", "1", install=True)
self.createPackage("slow", "1", "2", postinst='sleep 10')
self.enableRepo()
m.execute("pkcon refresh")
m.start_cockpit()
b.login_and_go("/updates")
b.wait_present("#app .container-fluid button")
b.click("#app .container-fluid button")
b.wait_in_text("#state", "Applying updates")
# restarting should pick up that install progress
m.restart_cockpit()
b.login_and_go("/updates")
b.wait_present("#state")
b.wait_in_text("#state", "Applying updates")
b.wait_present("#app div.progress-bar")
# should have succeeded and show restart page; cancel
b.wait_present("#app .container-fluid h1")
b.wait_in_text("#app .container-fluid h1", "Restart Recommended")
b.wait_present("#app .container-fluid button.btn-default")
b.click("#app .container-fluid button.btn-default")
b.wait_present("#state")
b.wait_in_text("#state", "No updates pending")
# now pretend that there is a newer cockpit-ws available, warn about disconnect
self.createPackage("cockpit-ws", "999", "1")
self.createPackage("cockpit", "999", "1") # as that depends on same version of ws
self.enableRepo()
b.wait_in_text(".content-header-extra td button", "Check for updates")
b.click(".content-header-extra td button")
b.wait_present(".container-fluid h2")
b.wait_in_text(".container-fluid h2", "Available Updates")
self.assertEqual(b.text("#state"), "2 updates")
b.wait_present("table.listing-ct")
b.wait_in_text("table.listing-ct", "cockpit-ws")
b.wait_present("#app div.alert-warning")
b.wait_in_text("#app div.alert-warning", "Cockpit itself will be updated")
def testPackageKitCrash(self):
b = self.browser
m = self.machine
# make sure we have enough time to crash PK
self.createPackage("slow", "1", "1", install=True)
self.createPackage("slow", "1", "2", postinst='sleep 10')
self.enableRepo()
m.execute("pkcon refresh")
m.start_cockpit()
b.login_and_go("/updates")
b.wait_present("#app .container-fluid button")
b.click("#app .container-fluid button")
# let updates start and zap PackageKit
b.wait_present("#app div.progress-bar")
m.execute("systemctl kill --signal=SEGV packagekit.service")
b.wait_in_text("#state", "Applying updates failed")
b.wait_present("#app .container-fluid pre")
self.assertEqual(b.text("#app .container-fluid pre"), "PackageKit crashed")
def testNoPackageKit(self):
b = self.browser
m = self.machine
m.execute('''systemctl stop packagekit.service
rm `systemctl show -p FragmentPath packagekit.service | cut -f2 -d=`
rm /usr/share/dbus-1/system-services/org.freedesktop.PackageKit.service
systemctl daemon-reload''')
m.start_cockpit()
b.login_and_go("/updates")
b.wait_present("#state")
b.wait_in_text("#state", "Loading available updates failed")
b.wait_present("#app pre")
b.wait_in_text("#app pre", "PackageKit is not installed")
#
# Helper functions for creating packages/repository
#
def createPackage(self, name, version, release, install=False, postinst=None, **updateinfo):
'''Create a dummy package in /tmp/repo on self.machine
If install is True, install the package. Otherwise, update the package
index in /tmp/repo.
'''
if self.isApt:
self.createDeb(name, version + '-' + release, postinst, install)
else:
self.createRpm(name, version, release, postinst, install)
if updateinfo:
self.updateInfo[(name, version, release)] = updateinfo
def createDeb(self, name, version, postinst, install):
'''Create a dummy deb in /tmp/repo on self.machine
If install is True, install the package. Otherwise, update the package
index in /tmp/repo.
'''
deb = "/tmp/repo/{0}_{1}_all.deb".format(name, version)
if postinst:
postinstcode = "printf '#!/bin/sh\n{0}' > /tmp/b/DEBIAN/postinst; chmod 755 /tmp/b/DEBIAN/postinst".format(postinst)
else:
postinstcode = ''
cmd = '''mkdir -p /tmp/b/DEBIAN /tmp/repo
printf "Package: {0}\nVersion: {1}\nPriority: optional\nSection: test\nMaintainer: foo\nArchitecture: all\nDescription: dummy {0}\n" > /tmp/b/DEBIAN/control
{3}
touch /tmp/b/stamp-{0}-{1}
dpkg -b /tmp/b {2}
rm -r /tmp/b
'''.format(name, version, deb, postinstcode)
if install:
cmd += "dpkg -i " + deb
self.machine.execute(cmd)
def createRpm(self, name, version, release, post, install):
'''Create a dummy rpm in /tmp/repo on self.machine
If install is True, install the package. Otherwise, update the package
index in /tmp/repo.
'''
if post:
postcode = '\n%%post\n' + post
else:
postcode = ''
cmd = '''printf 'Summary: dummy {0}\nName: {0}\nVersion: {1}\nRelease: {2}\nLicense: BSD\nBuildArch: noarch\n
%%install\ntouch $RPM_BUILD_ROOT/stamp-{0}-{1}-{2}\n
%%description\nTest package.\n
%%files\n/stamp-*\n
{3}' > /tmp/spec
rpmbuild -bb /tmp/spec
mkdir -p /tmp/repo
cp ~/rpmbuild/RPMS/noarch/*.rpm /tmp/repo
rm -rf ~/rpmbuild
'''.format(name, version, release, postcode)
if install:
cmd += "rpm -i /tmp/repo/{0}-{1}-{2}.*.rpm".format(name, version, release)
self.machine.execute(cmd)
def createAptChangelogs(self):
# apt metadata has no formal field for bugs/CVEs, they are parsed from the changelog
for ((pkg, ver, rel), info) in self.updateInfo.items():
changes = info.get("changes", "some changes")
if info.get("bugs"):
changes += " (Closes: {0})".format(", ".join(["#" + str(b) for b in info["bugs"]]))
if info.get("cves"):
changes += "\n * " + ", ".join(info["cves"])
path = "/tmp/repo/changelogs/{0}/{1}/{1}_{2}-{3}".format(pkg[0], pkg, ver, rel)
contents = '''{0} ({1}-{2}) unstable; urgency=medium
* {3}
-- Joe Developer Wed, 31 May 2017 14:52:25 +0200
'''.format(pkg, ver, rel, changes)
self.machine.execute("mkdir -p $(dirname {0}); echo '{1}' > {0}".format(path, contents))
def createYumUpdateInfo(self):
xml = '\n\n'
for ((pkg, ver, rel), info) in self.updateInfo.items():
refs = ""
for b in info.get("bugs", []):
refs += ' \n'.format(b)
for c in info.get("cves", []):
refs += ' \n'.format(c)
xml += '''
UPDATE-{pkg}-{ver}-{rel}
{pkg} {ver}-{rel} update
{desc}
{refs}
{pkg}-{ver}-{rel}.noarch.rpm
'''.format(pkg=pkg, ver=ver, rel=rel, refs=refs,
desc=info.get("changes", ""), severity=info.get("severity", "bugfix"))
xml += '\n'
return xml
def enableRepo(self):
if self.isApt:
self.createAptChangelogs()
# HACK: on Debian jessie, apt has an error propagation bug that causes "Err file: Packages" for each absent
# compression format with file:// sources, which breaks PackageKit; work around by providing all formats
self.machine.execute('''echo 'deb [trusted=yes] file:///tmp/repo /' > /etc/apt/sources.list.d/test.list
cd /tmp/repo; apt-ftparchive packages . > Packages
gzip -c Packages > Packages.gz; bzip2 -c Packages > Packages.bz2; xz -c Packages > Packages.xz
O=$(apt-ftparchive -o APT::FTPArchive::Release::Origin=cockpittest release .); echo "$O" > Release
echo 'Changelogs: http://localhost:12345/changelogs/@CHANGEPATH@' >> Release
setsid python -m SimpleHTTPServer 12345 >/dev/null 2>&1 < /dev/null &
''')
self.machine.wait_for_cockpit_running(port=12345) # wait for changelog HTTP server to start up
else:
self.machine.execute('''printf '[updates]\nname=cockpittest\nbaseurl=file:///tmp/repo\nenabled=1\ngpgcheck=0\n' > /etc/yum.repos.d/cockpittest.repo
echo '{0}' > /tmp/updateinfo.xml
createrepo_c /tmp/repo
modifyrepo_c /tmp/updateinfo.xml /tmp/repo/repodata
$(which dnf 2>/dev/null|| which yum) clean all'''.format(self.createYumUpdateInfo()))
if __name__ == '__main__':
test_main()