diff --git a/providers/resource/bin/block_device_resource.py b/providers/resource/bin/block_device_resource.py index ad6c5fb947..9a23a74947 100755 --- a/providers/resource/bin/block_device_resource.py +++ b/providers/resource/bin/block_device_resource.py @@ -3,10 +3,12 @@ import os import re import shlex -from glob import glob -from subprocess import Popen, PIPE, check_output, CalledProcessError +import textwrap +from pathlib import Path +from itertools import chain +from subprocess import check_output, CalledProcessError -rootdir_pattern = re.compile('^.*?/devices') +rootdir_pattern = re.compile("^.*?/devices") # NOTE: If raid_types changes, also change it in disk_smart script! raid_types = ["megaraid", "cciss", "3ware", "areca"] @@ -15,42 +17,44 @@ def device_state(name): """Follow pmount policy to find if a device is removable or internal.""" - path = rootdir_pattern.sub('', os.readlink('/sys/block/%s' % name)) + path = rootdir_pattern.sub("", os.readlink("/sys/block/%s" % name)) hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire") for bus in hotplug_buses: - if os.path.exists('/sys/bus/%s' % bus): - for device_bus in os.listdir('/sys/bus/%s/devices' % bus): - device_link = rootdir_pattern.sub('', os.readlink( - '/sys/bus/%s/devices/%s' % (bus, device_bus))) + if os.path.exists("/sys/bus/%s" % bus): + for device_bus in os.listdir("/sys/bus/%s/devices" % bus): + device_link = rootdir_pattern.sub( + "", + os.readlink("/sys/bus/%s/devices/%s" % (bus, device_bus)), + ) if re.search(device_link, path): - return 'removable' + return "removable" - return 'internal' + return "internal" def usb_support(name, version): """Check the USB specification number for both hub port and device.""" - path = rootdir_pattern.sub('', os.readlink('/sys/block/%s' % name)) + path = rootdir_pattern.sub("", os.readlink("/sys/block/%s" % name)) # Remove the usb config.interface part of the path - m = re.match('((.*usb\d+).*\/)\d-[\d\.:\-]+\/.*', path) # noqa: W605 + m = re.match(r"((.*usb\d+).*\/)\d-[\d\.:\-]+\/.*", path) if m: device_path = m.group(1) hub_port_path = m.group(2) # Check the highest version of USB the device supports - with open('/sys/devices/%s/version' % device_path, "rt") as f: + with open("/sys/devices/%s/version" % device_path, "rt") as f: if float(f.readline()) < version: - return 'unsupported' + return "unsupported" # Check the highest version of USB the hub supports - with open('/sys/devices/%s/version' % hub_port_path, "rt") as f: + with open("/sys/devices/%s/version" % hub_port_path, "rt") as f: if float(f.readline()) < version: - return 'unsupported' + return "unsupported" - return 'supported' + return "supported" - return 'unsupported' + return "unsupported" def device_rotation(name): @@ -58,14 +62,29 @@ def device_rotation(name): Check the device queue/rotational parameter to determine if it's a spinning device or a non-spinning device, which indicates it's an SSD. """ - path = '/sys/block/{0}/device/block/{0}/queue/rotational'.format(name) + path = "/sys/block/{0}/device/block/{0}/queue/rotational".format(name) if not os.path.exists(path): - return 'no' + return "no" with open(path, "rt") as f: - if f.read(1) == '1': - return 'yes' + if f.read(1) == "1": + return "yes" - return 'no' + return "no" + + +def smart_supporting_diskinfo(diskinfo) -> bool: + # if a diskinfo line contains any of the following (all on one line) + # assume the disk supports SMART + # ex. SMART support is: Avaliable + indicators = [("SMART support is", "Available"), ("SMART", "test result")] + + def contains_indicator(line): + return any( + all(indicator_segment in line for indicator_segment in indicator) + for indicator in indicators + ) + + return any(contains_indicator(line) for line in diskinfo) def smart_support_raid(name, raid_type): @@ -80,31 +99,26 @@ def smart_support_raid(name, raid_type): :returns: 'True' or 'False' as string (for return to Checkbox) """ - supported = 'False' disk_num = 0 - disk_exists = True # Loop through all disks in array to verify that SMART is available on # at least one of them. Note that if there's a mix of supported and # unsupported, this test returns 'True', which will result in a failure # of disk_smart. This is by design, since such a mix is likely an assembly # error by the manufacturer. - while disk_exists: - command = 'smartctl -i /dev/{} -d {},{}'.format(name, raid_type, - disk_num) - # Return 'True' if the output (diskinfo) includes - # "SMART support is.*Available", and terminate check when a failure - # is found or when number of disks rises above level supported by - # smartctl (which likely indicates a bug). + while True: + command = "smartctl -x /dev/{} -d {},{}".format( + name, raid_type, disk_num + ) try: - diskinfo = (check_output(shlex.split(command)).decode('utf-8'). - splitlines()) - if any("SMART support is" in s and "Available" in s for - s in diskinfo): - supported = 'True' - disk_num += 1 + diskinfo = check_output( + shlex.split(command), universal_newlines=True + ).splitlines() + if smart_supporting_diskinfo(diskinfo): + return "True" + + disk_num += 1 except CalledProcessError: - disk_exists = False - return supported + return "False" def smart_support(name): @@ -116,45 +130,57 @@ def smart_support(name): :returns: 'True' or 'False' as string (for return to Checkbox) """ - supported = 'False' # Check with smartctl to see if SMART is available and enabled on the disk - command = 'smartctl -i /dev/%s' % name - diskinfo_bytes = (Popen(command, stdout=PIPE, shell=True) - .communicate()[0]) - diskinfo = (diskinfo_bytes.decode(encoding='utf-8', errors='ignore') - .splitlines()) - - # Return True if the output (diskinfo) includes - # "SMART support is.*Available" - if len(diskinfo) > 2: - if any("SMART support is" in s and "Available" in s - for s in diskinfo): - supported = 'True' - else: - for type in raid_types: - if any("-d {},N".format(type) in s for s in diskinfo): - supported = smart_support_raid(name, type) - break - return supported - - -for path in glob('/sys/block/*/device') + glob('/sys/block/*/dm'): - name = re.sub('.*/(.*?)/(device|dm)', '\g<1>', path) # noqa: W605 - state = device_state(name) - usb2 = usb_support(name, 2.00) - usb3 = usb_support(name, 3.00) - rotation = device_rotation(name) - smart = smart_support(name) - print("""\ -name: %(name)s -state: %(state)s -usb2: %(usb2)s -usb3: %(usb3)s -rotation: %(rotation)s -smart: %(smart)s -""" % {"name": name, - "state": state, - "usb2": usb2, - "usb3": usb3, - "rotation": rotation, - "smart": smart}) + command = "smartctl -x /dev/%s" % name + try: + diskinfo = check_output( + shlex.split(command), universal_newlines=True + ).splitlines() + except CalledProcessError: + return "False" + + # First check if the current name supports SMART + if smart_supporting_diskinfo(diskinfo): + return "True" + # Try to check if the disk is in a raid configuration + for type in raid_types: + if any("-d {},N".format(type) in s for s in diskinfo): + return smart_support_raid(name, type) + return "False" + + +def main(): + sys_block = Path("/sys/block") + # match the name any dir in sys block that has a subdirectory device or dm + disk_names = ( + path.parent.name + for path in chain(sys_block.glob("*/device"), sys_block.glob("*/dm")) + ) + for disk_name in disk_names: + state = device_state(disk_name) + usb2 = usb_support(disk_name, 2.00) + usb3 = usb_support(disk_name, 3.00) + rotation = device_rotation(disk_name) + smart = smart_support(disk_name) + resource_text = textwrap.dedent( + """ + name: {disk_name} + state: {state} + usb2: {usb2} + usb3: {usb3} + rotation: {rotation} + smart: {smart} + """.format( + disk_name=disk_name, + state=state, + usb2=usb2, + usb3=usb3, + rotation=rotation, + smart=smart, + ) + ).lstrip() + print(resource_text) + + +if __name__ == "__main__": + main() diff --git a/providers/resource/tests/test_block_device_resource.py b/providers/resource/tests/test_block_device_resource.py new file mode 100644 index 0000000000..f0f4d09129 --- /dev/null +++ b/providers/resource/tests/test_block_device_resource.py @@ -0,0 +1,208 @@ +import unittest +import textwrap +import block_device_resource +from subprocess import CalledProcessError +from unittest.mock import patch, mock_open, MagicMock + + +class TestDeviceState(unittest.TestCase): + @patch("os.path.exists") + @patch("os.listdir") + @patch("os.readlink") + def test_device_state_removable( + self, mock_readlink, mock_listdir, mock_exists + ): + mock_readlink.return_value = "/sys/bus/usb/devices/usb1" + mock_exists.return_value = True + mock_listdir.return_value = ["usb1"] + result = block_device_resource.device_state("sda") + self.assertEqual(result, "removable") + + @patch("os.path.exists") + @patch("os.listdir") + @patch("os.readlink") + def test_device_state_internal( + self, mock_readlink, mock_listdir, mock_exists + ): + mock_readlink.return_value = "/sys/bus/sata/devices/sata1" + mock_exists.return_value = False + result = block_device_resource.device_state("sdb") + self.assertEqual(result, "internal") + + +class TestUsbSupport(unittest.TestCase): + @patch("builtins.open", new_callable=mock_open, read_data="3.00") + @patch("os.readlink") + def test_usb_support_supported(self, mock_readlink, mock_file): + mock_readlink.return_value = "/dev/usb12ab/1-1.2/1-1.2:1.0" + result = block_device_resource.usb_support("sda", 2.00) + self.assertEqual(result, "supported") + + @patch("builtins.open", new_callable=mock_open, read_data="1.00") + @patch("os.readlink") + def test_usb_support_unsupported(self, mock_readlink, mock_file): + mock_readlink.return_value = "/dev/usb12ab/1-1.2/1-1.2:1.0" + result = block_device_resource.usb_support("sda", 3.00) + self.assertEqual(result, "unsupported") + + +class TestDeviceRotation(unittest.TestCase): + @patch("builtins.open", new_callable=mock_open, read_data="1") + @patch("os.path.exists") + def test_device_rotation_spinning(self, mock_path_exists, mock_file): + mock_path_exists.return_value = True + result = block_device_resource.device_rotation("sda") + self.assertEqual(result, "yes") + + @patch("builtins.open", new_callable=mock_open, read_data="0") + @patch("os.path.exists") + def test_device_rotation_non_spinning(self, mock_path_exists, mock_file): + mock_path_exists.return_value = True + result = block_device_resource.device_rotation("sdb") + self.assertEqual(result, "no") + + +class TestSmartSupportDiskInfo(unittest.TestCase): + def test_smart_supporting_diskinfo_supported(self): + diskinfo = ["SMART support is: Available", "other info"] + result = block_device_resource.smart_supporting_diskinfo(diskinfo) + self.assertTrue(result) + + def test_smart_supporting_diskinfo_unsupported(self): + diskinfo = ["Some other line", "more info"] + result = block_device_resource.smart_supporting_diskinfo(diskinfo) + self.assertFalse(result) + + @patch("block_device_resource.check_output") + def test_smart_support_enabled(self, mock_check_output): + mock_check_output.return_value = textwrap.dedent( + """ + Some intro information on the drive + === START OF SMART DATA SECTION === + SMART overall-health self-assessment test result: PASSED + """ + ) + + result = block_device_resource.smart_support("sda") + self.assertEqual(result, "True") + + @patch("block_device_resource.check_output") + def test_smart_support_disabled(self, mock_check_output): + mock_check_output.return_value = ( + "some output indicating no SMART support" + ) + + result = block_device_resource.smart_support("sdb") + self.assertEqual(result, "False") + + @patch("block_device_resource.check_output") + def test_smart_support_failed_to_fetch(self, mock_check_output): + mock_check_output.side_effect = CalledProcessError("cmd", 1) + result = block_device_resource.smart_support("sdb") + self.assertEqual(result, "False") + + @patch("block_device_resource.check_output") + def test_smart_support_enabled_raid(self, mock_check_output): + mock_check_output.side_effect = [ + textwrap.dedent( + """ + Some intro information of the drive in raid + Raid configuration: some -d 3ware,N + """ + ), + # here we are checking inside the raid checking function + textwrap.dedent( + """ + Some intro information of the drive in raid + """ + ), + # Note: at least one disk in the raid doesn't support SMART, + # we report true here as this will make the subsequent + # test fail as this is likely to be a mistake from the OEM + textwrap.dedent( + """ + Some intro information of the drive in raid + === START OF SMART DATA SECTION === + SMART overall-health self-assessment test result: PASSED + """ + ), + CalledProcessError("cmd", 1), + ] + + result = block_device_resource.smart_support("sdb") + self.assertEqual(result, "True") + + @patch("block_device_resource.check_output") + def test_smart_support_disabled_raid(self, mock_check_output): + mock_check_output.side_effect = [ + textwrap.dedent( + """ + Some intro information of the drive in raid + Raid configuration: some -d 3ware,N + """ + ), + # here we are checking inside the raid checking function + textwrap.dedent( + """ + Some intro information of the drive in raid + """ + ), + textwrap.dedent( + """ + Some intro information of the drive in raid + """ + ), + CalledProcessError("cmd", 1), + ] + + result = block_device_resource.smart_support("sdb") + self.assertEqual(result, "False") + + +class TestMainFunction(unittest.TestCase): + @patch("block_device_resource.Path.glob") + @patch("block_device_resource.device_state") + @patch("block_device_resource.usb_support") + @patch("block_device_resource.device_rotation") + @patch("block_device_resource.smart_support") + def test_block_device_resource_main( + self, + mock_smart_support, + mock_device_rotation, + mock_usb_support, + mock_device_state, + mock_path_glob, + ): + device = MagicMock() + device.name = "device" + device.parent.name = "sda" + # Mocking Path.glob to simulate disk names + mock_path_glob.return_value = [device] + + # Mocking other functions to return specific values + mock_device_state.return_value = "internal" + mock_usb_support.side_effect = ( + lambda name, version: "supported" + if version == 3.00 + else "unsupported" + ) + mock_device_rotation.return_value = "yes" + mock_smart_support.return_value = "True" + + # Capturing the output of print statements + with patch("block_device_resource.print") as mocked_print: + block_device_resource.main() + + # Verifying the output + expected_output = textwrap.dedent( + """ + name: sda + state: internal + usb2: unsupported + usb3: supported + rotation: yes + smart: True + """ + ).lstrip() + + mocked_print.assert_called_with(expected_output)