Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 107 additions & 81 deletions providers/resource/bin/block_device_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import os
import re
import shlex
from glob import glob
from subprocess import Popen, PIPE, check_output, CalledProcessError
import textwrap
from pathlib import Path
from itertools import chain
from subprocess import check_output, CalledProcessError

rootdir_pattern = re.compile('^.*?/devices')
rootdir_pattern = re.compile("^.*?/devices")

# NOTE: If raid_types changes, also change it in disk_smart script!
raid_types = ["megaraid", "cciss", "3ware", "areca"]
Expand All @@ -15,57 +17,74 @@
def device_state(name):
"""Follow pmount policy to find if a device is removable or internal."""

path = rootdir_pattern.sub('', os.readlink('/sys/block/%s' % name))
path = rootdir_pattern.sub("", os.readlink("/sys/block/%s" % name))
hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire")
for bus in hotplug_buses:
if os.path.exists('/sys/bus/%s' % bus):
for device_bus in os.listdir('/sys/bus/%s/devices' % bus):
device_link = rootdir_pattern.sub('', os.readlink(
'/sys/bus/%s/devices/%s' % (bus, device_bus)))
if os.path.exists("/sys/bus/%s" % bus):
for device_bus in os.listdir("/sys/bus/%s/devices" % bus):
device_link = rootdir_pattern.sub(
"",
os.readlink("/sys/bus/%s/devices/%s" % (bus, device_bus)),
)
if re.search(device_link, path):
return 'removable'
return "removable"

return 'internal'
return "internal"


def usb_support(name, version):
"""Check the USB specification number for both hub port and device."""
path = rootdir_pattern.sub('', os.readlink('/sys/block/%s' % name))
path = rootdir_pattern.sub("", os.readlink("/sys/block/%s" % name))

# Remove the usb config.interface part of the path
m = re.match('((.*usb\d+).*\/)\d-[\d\.:\-]+\/.*', path) # noqa: W605
m = re.match(r"((.*usb\d+).*\/)\d-[\d\.:\-]+\/.*", path)
if m:
device_path = m.group(1)
hub_port_path = m.group(2)

# Check the highest version of USB the device supports
with open('/sys/devices/%s/version' % device_path, "rt") as f:
with open("/sys/devices/%s/version" % device_path, "rt") as f:
if float(f.readline()) < version:
return 'unsupported'
return "unsupported"

# Check the highest version of USB the hub supports
with open('/sys/devices/%s/version' % hub_port_path, "rt") as f:
with open("/sys/devices/%s/version" % hub_port_path, "rt") as f:
if float(f.readline()) < version:
return 'unsupported'
return "unsupported"

return 'supported'
return "supported"

return 'unsupported'
return "unsupported"


def device_rotation(name):
"""
Check the device queue/rotational parameter to determine if it's a spinning
device or a non-spinning device, which indicates it's an SSD.
"""
path = '/sys/block/{0}/device/block/{0}/queue/rotational'.format(name)
path = "/sys/block/{0}/device/block/{0}/queue/rotational".format(name)
if not os.path.exists(path):
return 'no'
return "no"
with open(path, "rt") as f:
if f.read(1) == '1':
return 'yes'
if f.read(1) == "1":
return "yes"

return 'no'
return "no"


def smart_supporting_diskinfo(diskinfo) -> bool:
# if a diskinfo line contains any of the following (all on one line)
# assume the disk supports SMART
# ex. SMART support is: Avaliable
indicators = [("SMART support is", "Available"), ("SMART", "test result")]

def contains_indicator(line):
return any(
all(indicator_segment in line for indicator_segment in indicator)
for indicator in indicators
)

return any(contains_indicator(line) for line in diskinfo)


def smart_support_raid(name, raid_type):
Expand All @@ -80,31 +99,26 @@ def smart_support_raid(name, raid_type):
:returns:
'True' or 'False' as string (for return to Checkbox)
"""
supported = 'False'
disk_num = 0
disk_exists = True
# Loop through all disks in array to verify that SMART is available on
# at least one of them. Note that if there's a mix of supported and
# unsupported, this test returns 'True', which will result in a failure
# of disk_smart. This is by design, since such a mix is likely an assembly
# error by the manufacturer.
while disk_exists:
command = 'smartctl -i /dev/{} -d {},{}'.format(name, raid_type,
disk_num)
# Return 'True' if the output (diskinfo) includes
# "SMART support is.*Available", and terminate check when a failure
# is found or when number of disks rises above level supported by
# smartctl (which likely indicates a bug).
while True:
command = "smartctl -x /dev/{} -d {},{}".format(
name, raid_type, disk_num
)
try:
diskinfo = (check_output(shlex.split(command)).decode('utf-8').
splitlines())
if any("SMART support is" in s and "Available" in s for
s in diskinfo):
supported = 'True'
disk_num += 1
diskinfo = check_output(
shlex.split(command), universal_newlines=True
).splitlines()
if smart_supporting_diskinfo(diskinfo):
return "True"

disk_num += 1
except CalledProcessError:
disk_exists = False
return supported
return "False"


def smart_support(name):
Expand All @@ -116,45 +130,57 @@ def smart_support(name):
:returns:
'True' or 'False' as string (for return to Checkbox)
"""
supported = 'False'
# Check with smartctl to see if SMART is available and enabled on the disk
command = 'smartctl -i /dev/%s' % name
diskinfo_bytes = (Popen(command, stdout=PIPE, shell=True)
.communicate()[0])
diskinfo = (diskinfo_bytes.decode(encoding='utf-8', errors='ignore')
.splitlines())

# Return True if the output (diskinfo) includes
# "SMART support is.*Available"
if len(diskinfo) > 2:
if any("SMART support is" in s and "Available" in s
for s in diskinfo):
supported = 'True'
else:
for type in raid_types:
if any("-d {},N".format(type) in s for s in diskinfo):
supported = smart_support_raid(name, type)
break
return supported


for path in glob('/sys/block/*/device') + glob('/sys/block/*/dm'):
name = re.sub('.*/(.*?)/(device|dm)', '\g<1>', path) # noqa: W605
state = device_state(name)
usb2 = usb_support(name, 2.00)
usb3 = usb_support(name, 3.00)
rotation = device_rotation(name)
smart = smart_support(name)
print("""\
name: %(name)s
state: %(state)s
usb2: %(usb2)s
usb3: %(usb3)s
rotation: %(rotation)s
smart: %(smart)s
""" % {"name": name,
"state": state,
"usb2": usb2,
"usb3": usb3,
"rotation": rotation,
"smart": smart})
command = "smartctl -x /dev/%s" % name
try:
diskinfo = check_output(
shlex.split(command), universal_newlines=True
).splitlines()
except CalledProcessError:
return "False"

# First check if the current name supports SMART
if smart_supporting_diskinfo(diskinfo):
return "True"
# Try to check if the disk is in a raid configuration
for type in raid_types:
if any("-d {},N".format(type) in s for s in diskinfo):
return smart_support_raid(name, type)
return "False"


def main():
sys_block = Path("/sys/block")
# match the name any dir in sys block that has a subdirectory device or dm
disk_names = (
path.parent.name
for path in chain(sys_block.glob("*/device"), sys_block.glob("*/dm"))
)
for disk_name in disk_names:
state = device_state(disk_name)
usb2 = usb_support(disk_name, 2.00)
usb3 = usb_support(disk_name, 3.00)
rotation = device_rotation(disk_name)
smart = smart_support(disk_name)
resource_text = textwrap.dedent(
"""
name: {disk_name}
state: {state}
usb2: {usb2}
usb3: {usb3}
rotation: {rotation}
smart: {smart}
""".format(
disk_name=disk_name,
state=state,
usb2=usb2,
usb3=usb3,
rotation=rotation,
smart=smart,
)
).lstrip()
print(resource_text)


if __name__ == "__main__":
main()
Loading