Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 77 additions & 91 deletions checkbox-support/checkbox_support/scripts/run_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import argparse
import logging
import os
import pathlib
import re
import select
import sys
Expand All @@ -21,11 +20,17 @@

from checkbox_support.helpers.timeout import timeout
from checkbox_support.scripts.zapper_proxy import zapper_run
from checkbox_support.scripts.usb_read_write import (
mount_usb_storage,
gen_random_file,
write_test,
read_test,
)


logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))


ACTION_TIMEOUT = 30

Expand Down Expand Up @@ -67,10 +72,12 @@ class StorageWatcher(StorageInterface):
function to detect the insertion and removal of storage.
"""

def __init__(self, testcase, storage_type, zapper_usb_address):
self.testcase = testcase
def __init__(self, storage_type, zapper_usb_address):
self.storage_type = storage_type
self.zapper_usb_address = zapper_usb_address
self.testcase = None
self.test_passed = False
self.mounted_partition = None

def run(self):
j = journal.Reader()
Expand Down Expand Up @@ -106,6 +113,8 @@ def run(self):
self._process_lines(
[e["MESSAGE"] for e in j if e and "MESSAGE" in e]
)
if self.test_passed:
return

def _process_lines(self, lines):
"""
Expand All @@ -121,52 +130,37 @@ def _process_lines(self, lines):
elif self.testcase == "removal":
self._parse_journal_line(line_str)
self._validate_removal()

def _store_storage_info(self, mounted_partition=""):
"""
Store the mounted partition info to the shared directory.
"""

plainbox_session_share = os.environ.get("PLAINBOX_SESSION_SHARE")
# TODO: Should name the file by the value of storage_type variable as
# prefix. e.g. thunderbolt_insert_info, mediacard_insert_info.
# Since usb_insert_info is used by usb_read_write script, we
# should refactor usb_read_write script to adopt different files
file_name = "usb_insert_info"

if not plainbox_session_share:
logger.error("no env var PLAINBOX_SESSION_SHARE")
sys.exit(1)

# backup the storage partition info
if mounted_partition:
logger.info(
"cache file {} is at: {}".format(
file_name, plainbox_session_share
)
)
file_path = pathlib.Path(plainbox_session_share, file_name)
with open(file_path, "w") as file_to_share:
file_to_share.write(mounted_partition + "\n")

def _remove_storage_info(self):
"""Remove the file containing the storage info from the shared
directory.
"""

plainbox_session_share = os.environ.get("PLAINBOX_SESSION_SHARE")
file_name = "usb_insert_info"

if not plainbox_session_share:
logger.error("no env var PLAINBOX_SESSION_SHARE")
sys.exit(1)

file_path = pathlib.Path(plainbox_session_share, file_name)
if pathlib.Path(file_path).exists():
os.remove(file_path)
logger.info("cache file {} removed".format(file_name))
else:
logger.error("cache file {} not found".format(file_name))
if self.test_passed:
return

@timeout(ACTION_TIMEOUT) # 30 seconds timeout
def run_insertion(self):
print("\n--------- Testing insertion ---------")
self.testcase = "insertion"
self.run()
print("\n------- Insertion test passed -------")
return self.mounted_partition

@timeout(ACTION_TIMEOUT) # 30 seconds timeout
def run_removal(self, mounted_partition):
print("\n---------- Testing removal ----------")
self.testcase = "removal"
self.mounted_partition = mounted_partition
self.run()
print("\n-------- Removal test passed --------")

def run_storage(self, mounted_partition):
print("\n--------- Testing read/write --------")
with gen_random_file() as random_file:
# initialize the necessary tasks before performing read/write test
print("Mounting the USB storage")
print(mounted_partition)
with mount_usb_storage(mounted_partition):
# write test
write_test(random_file)
# read test
read_test(random_file)
print("\n------- Read/Write test passed -------")


class USBStorage(StorageWatcher):
Expand Down Expand Up @@ -202,17 +196,12 @@ def _validate_insertion(self):
else:
sys.exit("Wrong USB type detected.")

# backup the storage info
self._store_storage_info(self.mounted_partition)
sys.exit()
self.test_passed = True

def _validate_removal(self):
if self.action == "removal":
logger.info("Removal test passed.")

# remove the storage info
self._remove_storage_info()
sys.exit()
self.test_passed = True

def _parse_journal_line(self, line_str):
"""
Expand Down Expand Up @@ -284,18 +273,12 @@ def _validate_insertion(self):
)
)
logger.info("Mediacard insertion test passed.")

# backup the storage info
self._store_storage_info(self.mounted_partition)
sys.exit()
self.test_passed = True

def _validate_removal(self):
if self.action == "removal":
logger.info("Mediacard removal test passed.")

# remove the storage info
self._remove_storage_info()
sys.exit()
self.test_passed = True

def _parse_journal_line(self, line_str):
"""
Expand Down Expand Up @@ -344,18 +327,12 @@ def _validate_insertion(self):
if self.action == "insertion" and self.mounted_partition:
logger.info("usable partition: {}".format(self.mounted_partition))
logger.info("Thunderbolt insertion test passed.")

# backup the storage info
self._store_storage_info(self.mounted_partition)
sys.exit()
self.test_passed = True

def _validate_removal(self):
if self.action == "removal":
logger.info("Thunderbolt removal test passed.")

# remove the storage info
self._remove_storage_info()
sys.exit()
self.test_passed = True

def _parse_journal_line(self, line_str):

Expand All @@ -379,12 +356,15 @@ def _parse_journal_line(self, line_str):
self.action = "removal"


def launch_watcher():
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"testcase",
choices=["insertion", "removal"],
help=("insertion or removal"),
choices=["insertion", "storage"],
help=(
"insertion: Tests insertion and removal of storage\n"
"storage: Tests insertion, read and write, and removal\n"
),
)
parser.add_argument(
"storage_type",
Expand All @@ -396,27 +376,33 @@ def launch_watcher():
type=str,
help="Zapper's USB switch address to use",
)
args = parser.parse_args()
return parser.parse_args()


def main():
args = parse_args()

watcher = None
if args.storage_type == "thunderbolt":
watcher = ThunderboltStorage(
args.testcase, args.storage_type, args.zapper_usb_address
args.storage_type, args.zapper_usb_address
)
elif args.storage_type == "mediacard":
watcher = MediacardStorage(
args.testcase, args.storage_type, args.zapper_usb_address
)
watcher = MediacardStorage(args.storage_type, args.zapper_usb_address)
else:
watcher = USBStorage(
args.testcase, args.storage_type, args.zapper_usb_address
)
watcher.run()


@timeout(ACTION_TIMEOUT) # 30 seconds timeout
def main():
launch_watcher()
watcher = USBStorage(args.storage_type, args.zapper_usb_address)

if args.testcase == "insertion":
mounted_partition = watcher.run_insertion()
watcher.run_removal(mounted_partition)
elif args.testcase == "storage":
mounted_partition = watcher.run_insertion()
watcher.run_storage(mounted_partition)
print("Press Enter to start removal", flush=True)
input()
watcher.run_removal(mounted_partition)
else:
raise SystemExit("Invalid test case")


if __name__ == "__main__":
Expand Down
Loading