#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2015 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . import parent from testlib import * from storagelib import * @skipImage("UDisks doesn't have support for LVM", "debian-stable", "debian-testing", "ubuntu-1604", "ubuntu-stable") class TestStorage(StorageCase): def testLvm(self): m = self.machine b = self.browser mount_point_one = "/run/one" mount_point_thin = "/run/thin" self.login_and_go("/storage") b.wait_in_text("#drives", "VirtIO") m.add_disk("50M", serial="DISK1") m.add_disk("50M", serial="DISK2") m.add_disk("50M", serial="DISK3") b.wait_in_text("#drives", "DISK1") b.wait_in_text("#drives", "DISK2") b.wait_in_text("#drives", "DISK3") dev_1 = "/dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_DISK1" dev_2 = "/dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_DISK2" # Create a volume group out of two disks m.execute("vgcreate TEST1 %s %s" % (dev_1, dev_2)) b.wait_in_text("#vgroups", "TEST1") b.click('tr:contains("TEST1")') b.wait_visible("#storage-detail") b.wait_in_text("#detail-sidebar", "DISK1") b.wait_in_text("#detail-sidebar", "DISK2") # Both of them should be empty and removable b.wait_not_present('#detail-sidebar tr:nth-child(1) button.disabled') b.wait_not_present('#detail-sidebar tr:nth-child(2) button.disabled') # Create two logical volumes m.execute("lvcreate TEST1 -n one -L 20m") self.content_row_wait_in_col(1, 2, "/dev/TEST1/one") m.execute("lvcreate TEST1 -n two -L 20m") self.content_row_wait_in_col(2, 2, "/dev/TEST1/two") # Deactivate one m.execute("lvchange TEST1/one -a n") self.content_row_wait_in_col(1, 1, "Inactive volume") self.content_row_wait_in_col(1, 2, "one") # and remove it m.execute("lvremove TEST1/one") b.wait_not_in_text("#detail-content", "one") # remove a disk from the volume group m.execute("pvmove %s &>/dev/null || true" % dev_2) m.execute("vgreduce TEST1 %s" % dev_2) b.wait_not_in_text("#detail-sidebar", "DISK2") # The remaining lone disk is not removable b.wait_present('#detail-sidebar tr:nth-child(1) button.disabled') # Wipe the disk and make sure lvmetad forgets about it. This # might help with reusing it in the second half of this test. # # HACK - the pvscan is necessary because of # https://bugzilla.redhat.com/show_bug.cgi?id=1063813 # m.execute("wipefs -a %s" % dev_2) m.execute("pvscan --cache %s" % dev_2) # Thin volumes m.execute("lvcreate TEST1 --thinpool pool -L 20m") self.content_row_wait_in_col(1, 2, "pool") m.execute("lvcreate -T TEST1/pool -n thin -V 100m") self.content_row_wait_in_col(2, 2, "/dev/TEST1/thin") m.execute("dd if=/dev/urandom of=/dev/mapper/TEST1-thin bs=1M count=10 status=none") self.content_tab_wait_in_info(1, 1, "Data Used", "50%") m.execute("lvremove -f TEST1/thin") self.content_tab_wait_in_info(1, 1, "Data Used", "0%") # remove the volume group b.go("#/") b.wait_visible("#storage") m.execute("vgremove -f TEST1") b.wait_not_in_text("#vgroups", "TEST1") # create volume group in the UI b.wait_visible('#create-volume-group') self.dialog_with_retry(trigger = lambda: b.click('#create-volume-group'), expect = lambda: (self.dialog_is_present('disks', "DISK1") and self.dialog_is_present('disks', "DISK2") and self.dialog_check({ "name": "vgroup0" })), values = { "disks": { "DISK1": True, "DISK2": True }}) b.wait_in_text("#vgroups", "vgroup0") # Check that the next name is "vgroup1" b.click('#create-volume-group') self.dialog_wait_open() self.dialog_wait_val("name", "vgroup1") self.dialog_cancel() self.dialog_wait_close() b.click('tr:contains("vgroup0")') b.wait_visible('#storage-detail') # create a logical volume b.click("a:contains(Create new Logical Volume)") self.dialog(expect = { "name": "lvol0" }, values = { "purpose": "block", "size": 20 }) self.content_row_wait_in_col(1, 2, "/dev/vgroup0/lvol0") # check that the next default name is "lvol1" b.click("a:contains(Create new Logical Volume)") self.dialog_wait_open() self.dialog_wait_val("name", "lvol1") self.dialog_cancel() self.dialog_wait_close() # try to shrink it without a filesystem. self.content_tab_info_action(1, 1, "Size") self.dialog_wait_open() self.dialog_wait_val("size", 20) self.dialog_set_val("size", 10) self.dialog_apply() self.dialog_wait_error(None, "This logical volume cannot be made smaller.") self.dialog_cancel() # grow it self.content_tab_info_action(1, 1, "Size") self.dialog({ "size": 30 }) # format and mount it self.content_tab_action(1, 2, "Format") self.dialog({ "type": "ext4", "mounting": "custom", "mount_point": mount_point_one }) self.content_row_wait_in_col(1, 1, "ext4 File System") self.wait_in_storaged_configuration(mount_point_one) self.content_tab_action(1, 2, "Mount") self.content_tab_wait_in_info(1, 2, "Mounted At", mount_point_one) # unmount it self.content_tab_action(1, 2, "Unmount") b.wait_not_present(self.content_tab_info_row(1, 2, "Mounted At")) # shrink it, this time with a filesystem. self.content_tab_info_action(1, 1, "Size") self.dialog({ "size": 10 }) # delete it self.content_head_action(1, "Delete") self.confirm() b.wait_not_in_text("#detail-content", "/dev/vgroup0/lvol0") self.assertEqual(m.execute("grep %s /etc/fstab || true" % mount_point_one), "") # remove disk b.wait_present('#detail-sidebar tr:nth-child(2) button') b.click('#detail-sidebar tr:nth-child(2) button') b.wait_not_in_text("#detail-sidebar", "DISK2") b.wait_in_text("#detail-header .info-table-ct", "48 MiB") # create thin pool and volume # the pool will be maximum size, 48 MiB b.click("a:contains(Create new Logical Volume)") self.dialog(expect = { "size": 48 }, values = { "purpose": "pool", "name": "pool", "size": 38 }) self.content_row_wait_in_col(1, 2, "pool") self.content_row_action(1, "Create Thin Volume") self.dialog(expect = { "name": "lvol0" }, values = { "name": "thin", "size": 50 }) self.content_row_wait_in_col(2, 2, "/dev/vgroup0/thin") # add a disk and resize the pool b.click('#detail-sidebar .panel-heading button') self.dialog_wait_open() self.dialog_select('disks', "DISK2", True) self.dialog_apply() self.dialog_wait_close() b.wait_in_text("#detail-sidebar", "DISK2") b.wait_in_text("#detail-header .info-table-ct", "96 MiB") self.content_tab_info_action(1, 1, "Size") self.dialog({ "size": 70 }) # There is not enough free space to remove any of the disks. b.wait_present('#detail-sidebar tr:nth-child(1) button.disabled') b.wait_present('#detail-sidebar tr:nth-child(2) button.disabled') # use almost all of the pool by erasing the thin volume self.content_tab_action(2, 2, "Format") self.dialog({ "erase": "zero", "type": "ext4", "mounting": "custom", "mount_point": mount_point_thin }) self.content_row_wait_in_col(2, 1, "ext4 File System") self.wait_in_storaged_configuration(mount_point_thin) # remove volume group b.click_action_btn('.panel-heading:contains("Volume Group") .btn-group', "Delete") self.confirm() b.wait_visible("#storage") self.assertEqual(m.execute("grep %s /etc/fstab || true" % mount_point_thin), "") def testUnpartitionedSpace(self): m = self.machine b = self.browser self.login_and_go("/storage") b.wait_in_text("#drives", "VirtIO") m.add_disk("50M", serial="DISK1") m.add_disk("50M", serial="DISK2") b.wait_in_text("#drives", "DISK1") b.wait_in_text("#drives", "DISK2") # Put a partition table on DISK1 and DISK2 m.execute('parted -s /dev/sda mktable gpt') m.execute('parted -s /dev/sdb mktable gpt') # Create a volume group out of DISK1 b.wait_visible('#create-volume-group') self.dialog_with_retry(trigger = lambda: b.click('#create-volume-group'), expect = lambda: self.dialog_is_present('disks', "unpartitioned space on QEMU QEMU HARDDISK (DISK1)"), values = { "disks": { "DISK1": True }}) b.wait_in_text("#vgroups", "vgroup0") b.click('tr:contains("vgroup0")') b.wait_visible('#storage-detail') # Check the we are really using a partition on DISK1 now b.wait_in_text("#detail-sidebar", "Partition of QEMU QEMU HARDDISK (DISK1)") # Add the unused space of DISK2 self.dialog_with_retry(trigger = lambda: b.click('#detail-sidebar .panel-heading button'), expect = lambda: self.dialog_is_present('disks', "unpartitioned space on QEMU QEMU HARDDISK (DISK2)"), values = { "disks": { "DISK2": True }}) b.wait_in_text("#detail-sidebar", "Partition of QEMU QEMU HARDDISK (DISK2)") if __name__ == '__main__': test_main()