diff --git a/providers/base/bin/wol_check.py b/providers/base/bin/wol_check.py new file mode 100755 index 0000000000..bbae75e6ec --- /dev/null +++ b/providers/base/bin/wol_check.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Canonical Ltd. +# Written by: +# Eugene Wu +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import subprocess +import datetime +import re +import argparse +import logging +import sys + + +def get_timestamp(file): + with open(file, "r") as f: + saved_timestamp = float(f.read()) + readable_start_time = datetime.datetime.fromtimestamp(saved_timestamp) + logging.debug("Test started at: {}".format(readable_start_time)) + return saved_timestamp + + +def extract_timestamp(log_line): + pattern = r"(\d+\.\d+)" + match = re.search(pattern, log_line) + return float(match.group(1)) if match else None + + +def get_wakeup_timestamp(): + # Get the time stamp of the system resume from suspend for s3 + command = ["journalctl", "-b", "0", "--output=short-unix"] + result = subprocess.check_output( + command, shell=False, universal_newlines=True + ) + logs = result.splitlines() + + for log in reversed(logs): + if r"suspend exit" in log: + logging.debug(log) + latest_system_back_time = extract_timestamp(log) + readable_back_time = datetime.datetime.fromtimestamp( + latest_system_back_time + ) + logging.debug("System back time: {}".format(readable_back_time)) + return latest_system_back_time + + return None + + +def get_system_boot_time(): + """ + Read btime from /proc/stat and + return the system boot timestamp (Unix timestamp, in seconds). + """ + try: + with open("/proc/stat", "r") as f: + for line in f: + if line.startswith("btime"): + btime = float(line.split()[1]) + back_time = datetime.datetime.fromtimestamp(btime) + logging.debug("System back time: {}".format(back_time)) + return btime + logging.error("cannot find btime") + return None + except FileNotFoundError: + logging.error("cannot open /proc/stat.") + return None + except Exception as e: + logging.error("error while read btime: {}".format(e)) + return None + + +def parse_args(args=sys.argv[1:]): + """ + command line arguments parsing + + :param args: arguments from sys + :type args: sys.argv + """ + parser = argparse.ArgumentParser( + description="Parse command line arguments." + ) + + parser.add_argument("--powertype", type=str, help="Waked from s3 or s5.") + parser.add_argument( + "--timestamp_file", + type=str, + help="The file to store the timestamp of test start.", + ) + parser.add_argument( + "--delay", + type=int, + default=60, + help="Delay between attempts (in seconds).", + ) + parser.add_argument( + "--retry", type=int, default=3, help="Number of retry attempts." + ) + + return parser.parse_args(args) + + +def main(): + args = parse_args() + + logging.basicConfig( + level=logging.DEBUG, + stream=sys.stdout, + format="%(levelname)s: %(message)s", + ) + + logging.info("wake-on-LAN check test started.") + + powertype = args.powertype + timestamp_file = args.timestamp_file + delay = args.delay + max_retries = args.retry + + logging.info("PowerType: {}".format(powertype)) + + test_start_time = get_timestamp(timestamp_file) + if test_start_time is None: + raise SystemExit( + "Couldn't get the test start time from timestamp file." + ) + + system_back_time = ( + get_wakeup_timestamp() if powertype == "s3" else get_system_boot_time() + ) + if system_back_time is None: + raise SystemExit("Couldn't get system back time.") + + time_difference = system_back_time - test_start_time + logging.debug("time difference: {} seconds".format(time_difference)) + + # system_back_time - test_start_time > 1.5*max_retries*delay which meanse + # the system was bring up by rtc other than Wake-on-LAN + expect_time_range = 1.5 * max_retries * delay + if time_difference > expect_time_range: + raise SystemExit( + "The system took much longer than expected to wake up," + " and it wasn't awakened by wake-on-LAN." + ) + elif time_difference < 0: + raise SystemExit("System resumed earlier than expected.") + else: + logging.info("wake-on-LAN works well.") + return True + + +if __name__ == "__main__": + main() diff --git a/providers/base/bin/wol_client.py b/providers/base/bin/wol_client.py new file mode 100755 index 0000000000..f2d394fb95 --- /dev/null +++ b/providers/base/bin/wol_client.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Canonical Ltd. +# Written by: +# Eugene Wu +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import logging +import urllib +import urllib.request +import argparse +import subprocess +import sys +import time +import json +import socket +import fcntl +import struct + + +def send_request_to_wol_server(url, data=None, retry=3): + # Convert data to JSON format + data_encoded = json.dumps(data).encode("utf-8") + + # Construct request + headers = {"Content-Type": "application/json"} + req = urllib.request.Request(url, data=data_encoded, headers=headers) + + attempts = 0 + while attempts < retry: + try: + with urllib.request.urlopen(req) as response: + logging.info("in the urllib request.") + response_data = json.loads(response.read().decode("utf-8")) + logging.debug( + "Response message: {}".format(response_data["message"]) + ) + status_code = response.status + logging.debug("Status code: {}".format(status_code)) + if status_code == 200: + logging.info( + "Send request to Wake-on-lan server successful." + ) + return + else: + logging.error( + "Failded to send request to Wkae-on-lan server." + ) + except Exception as e: + logging.error("An unexpected error occurred: {}".format(e)) + + attempts += 1 + time.sleep(1) # Wait for a second before retrying + logging.debug("Retrying... ({}/{})".format(attempts, retry)) + + raise SystemExit( + "Failed to send request to WOL server. " + "Please ensure the WOL server setup correctlly." + ) + + +def check_wakeup(interface): + wakeup_file = "/sys/class/net/{}/device/power/wakeup".format(interface) + try: + with open(wakeup_file, "r") as f: + wakeup_status = f.read().strip() + + logging.info( + "Wakeup status for {}: {}".format(interface, wakeup_status) + ) + + if wakeup_status == "enabled": + return True + elif wakeup_status == "disabled": + return False + else: + raise ValueError( + "Unexpected wakeup status: {}".format(wakeup_status) + ) + + except FileNotFoundError: + raise FileNotFoundError( + "The network interface {} does not exist.".format(interface) + ) + + +def __get_ip_address(interface): + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + ip_addr = fcntl.ioctl( + s.fileno(), + 0x8915, + struct.pack("256s", interface[:15].encode("utf-8")), + ) + return socket.inet_ntoa(ip_addr[20:24]) + except IOError: + return None + + +def __get_mac_address(interface): + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + mac_addr = fcntl.ioctl( + s.fileno(), + 0x8927, + struct.pack("256s", interface[:15].encode("utf-8")), + ) + return ":".join("%02x" % b for b in mac_addr[18:24]) + except IOError: + raise SystemExit("Error: Unable to retrieve MAC address") + + +def get_ip_mac(interface): + ip_a = __get_ip_address(interface) + mac_a = __get_mac_address(interface) + + return ip_a, mac_a + + +# set the rtc wake time to bring up system in case the wake-on-lan failed +def set_rtc_wake(wake_time): + """ + Set the RTC (Real-Time Clock) to wake the system after a specified time. + Parameters: + wake_time (int): The time to wake up the system once wake on lan failed. + """ + command = ["rtcwake", "-m", "no", "-s", str(wake_time)] + + try: + subprocess.check_output(command, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise SystemExit( + "Failed to set RTC wake: {}".format(e.output.decode().strip()) + ) + except Exception as e: + raise SystemExit("An unexpected error occurred: {}".format(e)) + + +# try to suspend(s3) or power off(s5) the system +def s3_or_s5_system(type): + """ + Suspends or powers off the system using systemctl. + Args: + type: String, either "s3" for suspend or "s5" for poweroff. + Raises: + RuntimeError: If the type is invalid or the command fails. + """ + commands = { + "s3": ["systemctl", "suspend"], + "s5": ["systemctl", "poweroff"], + } + + if type not in commands: + raise RuntimeError( + "Error: type should be s3 or s5(provided: {})".format(type) + ) + + try: + subprocess.check_output(commands[type], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise RuntimeError("Try to enter {} failed: {}".format(type, e)) + + +# bring up the system by rtc or any other ways in case the wake-on-lan failed +def bring_up_system(way, time): + # try to wake up the system by rtc + if way == "rtc": + set_rtc_wake(time) + logging.debug("set the rtcwake time: {} seconds ".format(time)) + else: + # try to wake up the system other than RTC which not support + raise SystemExit( + "we don't have the way {} to bring up the system," + "Some error happened.".format(way) + ) + + +# write the time stamp to a file to record the test start time +def write_timestamp(timestamp_file): + with open(timestamp_file, "w") as f: + f.write(str(time.time())) + f.flush() + + +def parse_args(args=sys.argv[1:]): + parser = argparse.ArgumentParser( + description="Parse command line arguments." + ) + + parser.add_argument( + "--interface", required=True, help="The network interface to use." + ) + parser.add_argument( + "--target", required=True, help="The target IP address or hostname." + ) + parser.add_argument( + "--delay", + type=int, + default=60, + help="Delay between attempts (in seconds).", + ) + parser.add_argument( + "--retry", type=int, default=3, help="Number of retry attempts." + ) + parser.add_argument( + "--waketype", + default="g", + help="Type of wake operation.eg 'g' for magic packet", + ) + parser.add_argument("--powertype", type=str, help="Type of s3 or s5.") + parser.add_argument( + "--timestamp_file", + type=str, + help="The file to store the timestamp of test start.", + ) + + return parser.parse_args(args) + + +def main(): + args = parse_args() + + logging.basicConfig( + level=logging.DEBUG, + stream=sys.stdout, + format="%(levelname)s: %(message)s", + ) + + logging.info("wake-on-LAN test started.") + logging.info("Test network interface: {}".format(args.interface)) + + wakeup_enabled = check_wakeup(args.interface) + if not wakeup_enabled: + raise SystemExit( + "wake-on-LAN of {} is disabled!".format(args.interface) + ) + + delay = args.delay + retry = args.retry + + ip, mac = get_ip_mac(args.interface) + + logging.info("IP: {}, MAC: {}".format(ip, mac)) + + if ip is None: + raise SystemExit("Error: failed to get the ip address.") + + url = "http://{}".format(args.target) + req = { + "DUT_MAC": mac, + "DUT_IP": ip, + "delay": args.delay, + "retry_times": args.retry, + "wake_type": args.waketype, + } + + send_request_to_wol_server(url, data=req, retry=3) + + bring_up_system("rtc", delay * retry * 2) + + # write the time stamp + write_timestamp(args.timestamp_file) + + # s3 or s5 the system + s3_or_s5_system(args.powertype) + + +if __name__ == "__main__": + main() diff --git a/providers/base/tests/test_wol_check.py b/providers/base/tests/test_wol_check.py new file mode 100644 index 0000000000..33164fadb7 --- /dev/null +++ b/providers/base/tests/test_wol_check.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Canonical Ltd. +# Written by: +# Eugene Wu +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import unittest +from unittest.mock import patch, MagicMock, mock_open +from wol_check import ( + get_timestamp, + extract_timestamp, + get_wakeup_timestamp, + get_system_boot_time, + parse_args, + main, +) + + +class TestGetTimestamp(unittest.TestCase): + @patch("builtins.open") + def test_get_timestamp_success(self, mock_open): + mock_file = mock_open.return_value.__enter__.return_value + mock_file.read.return_value = "1622547800.0" + + result = get_timestamp("test_file.txt") + self.assertEqual(result, 1622547800.0) + + @patch("builtins.open") + def test_get_timestamp_file_not_found(self, mock_open): + mock_open.side_effect = FileNotFoundError + + with self.assertRaises(FileNotFoundError): + get_timestamp("nonexistent_file.txt") + + +class TestExtractTimeStamp(unittest.TestCase): + def test_extract_timestamp_with_timestamp(self): + log_line = r"1734472364.392919 M70s-Gen6-1 kernel: PM: suspend exit" + timestamp = extract_timestamp(log_line) + self.assertEqual(timestamp, 1734472364.392919) + + def test_extract_timestamp_without_timestamp(self): + log_line = "No timestamp here" + timestamp = extract_timestamp(log_line) + self.assertIsNone(timestamp) + + +class TestGetSystemBootTime(unittest.TestCase): + @patch("builtins.open") + def test_get_system_boot_time_success(self, mock_open): + mock_file = MagicMock() + mock_file.__enter__.return_value = mock_file + mock_file.__iter__.return_value = ["btime 1618912536\n"] + mock_open.return_value = mock_file + + boot_time = get_system_boot_time() + self.assertEqual(boot_time, 1618912536.0) + + @patch( + "builtins.open", new_callable=mock_open, read_data="some other data\n" + ) + def test_get_system_boot_time_no_btime(self, mock_file): + with self.assertLogs(level="ERROR") as log: + boot_time = get_system_boot_time() + self.assertIsNone(boot_time) + self.assertIn("cannot find btime", log.output[0]) + + @patch("builtins.open", side_effect=FileNotFoundError) + def test_get_system_boot_time_file_not_found(self, mock_file): + with self.assertLogs(level="ERROR") as log: + boot_time = get_system_boot_time() + self.assertIsNone(boot_time) + self.assertIn("cannot open /proc/stat.", log.output[0]) + + @patch("builtins.open", side_effect=Exception("some error")) + def test_get_system_boot_time_exception(self, mock_file): + with self.assertLogs(level="ERROR") as log: + boot_time = get_system_boot_time() + self.assertIsNone(boot_time) + self.assertIn("error while read btime: some error", log.output[0]) + + +class TestGetWakeupTimestamp(unittest.TestCase): + + @patch("subprocess.check_output") + def test_get_wakeup_timestamp(self, mock_check_output): + mock_check_output.return_value = ( + r"1734472364.392919 M70s-Gen6-1 kernel: PM: suspend exit" + ) + result = get_wakeup_timestamp() + + self.assertEqual(result, 1734472364.392919) + + @patch("subprocess.check_output") + def test_get_wakeup_timestamp_fail(self, mock_check_output): + mock_check_output.return_value = ( + r"1734472364.392919 M70s-Gen6-1 kernel: PM: no s3 key word" + ) + result = get_wakeup_timestamp() + + self.assertEqual(result, None) + + +class ParseArgsTests(unittest.TestCase): + def test_parse_args(self): + args = [ + "--delay", + "10", + "--retry", + "5", + "--powertype", + "s5", + "--timestamp_file", + "/tmp/time_file", + ] + rv = parse_args(args) + self.assertEqual(rv.powertype, "s5") + self.assertEqual(rv.timestamp_file, "/tmp/time_file") + self.assertEqual(rv.delay, 10) + self.assertEqual(rv.retry, 5) + + def test_parse_args_with_default_value(self): + args = ["--powertype", "s3"] + rv = parse_args(args) + self.assertEqual(rv.powertype, "s3") + self.assertIsNone(rv.timestamp_file) + self.assertEqual(rv.delay, 60) + self.assertEqual(rv.retry, 3) + + +class TestMain(unittest.TestCase): + def setUp(self): + self.args_mock = MagicMock() + self.args_mock.powertype = "s3" + self.args_mock.timestamp_file = "/tmp/test" + self.args_mock.delay = 60 + self.args_mock.retry = 3 + + @patch("wol_check.parse_args") + @patch("wol_check.get_timestamp") + @patch("wol_check.get_wakeup_timestamp") + def test_main_success( + self, mock_get_wakeup_timestamp, mock_get_timestamp, mock_parse_args + ): + mock_parse_args.return_value = self.args_mock + mock_get_timestamp.return_value = 100.0 + mock_get_wakeup_timestamp.return_value = 160.0 + + # Call main function + with self.assertLogs(level="INFO") as log_messages: + self.assertTrue(main()) + + # Verify logging messages + self.assertIn( + "wake-on-LAN check test started.", log_messages.output[0] + ) + self.assertIn("PowerType: s3", log_messages.output[1]) + self.assertIn("wake-on-LAN works well.", log_messages.output[2]) + + @patch("wol_check.parse_args") + @patch("wol_check.get_timestamp") + @patch("wol_check.get_wakeup_timestamp") + def test_main_wakeonlan_fail_too_large_difference( + self, mock_get_wakeup_timestamp, mock_get_timestamp, mock_parse_args + ): + mock_parse_args.return_value = self.args_mock + mock_get_timestamp.return_value = 100.0 + mock_get_wakeup_timestamp.return_value = 400.0 + + # Expect SystemExit exception with specific message + with self.assertRaises(SystemExit) as cm: + main() + self.assertEqual( + str(cm.exception), + "The system took much longer than expected to wake up," + " and it wasn't awakened by wake-on-LAN.", + ) + + @patch("wol_check.parse_args") + @patch("wol_check.get_timestamp") + @patch("wol_check.get_wakeup_timestamp") + def test_main_wakeonlan_fail_negative_difference( + self, mock_get_wakeup_timestamp, mock_get_timestamp, mock_parse_args + ): + mock_parse_args.return_value = self.args_mock + mock_get_timestamp.return_value = 150.0 + mock_get_wakeup_timestamp.return_value = 100.0 + + with self.assertRaises(SystemExit) as cm: + main() + self.assertEqual( + str(cm.exception), "System resumed earlier than expected." + ) + + @patch("wol_check.parse_args") + @patch("wol_check.get_timestamp") + @patch("wol_check.get_wakeup_timestamp") + def test_main_get_timestamp_none( + self, mock_get_wakeup_timestamp, mock_get_timestamp, mock_parse_args + ): + mock_parse_args.return_value = self.args_mock + mock_get_timestamp.return_value = None + mock_get_wakeup_timestamp.return_value = 100.0 + + with self.assertRaises(SystemExit) as cm: + main() + self.assertEqual( + str(cm.exception), + "Couldn't get the test start time from timestamp file.", + ) + + @patch("wol_check.parse_args") + @patch("wol_check.get_timestamp") + @patch("wol_check.get_wakeup_timestamp") + def test_main_get_systembacktime_none( + self, mock_get_wakeup_timestamp, mock_get_timestamp, mock_parse_args + ): + mock_parse_args.return_value = self.args_mock + mock_get_timestamp.return_value = 100.0 + mock_get_wakeup_timestamp.return_value = None + + with self.assertRaises(SystemExit) as cm: + main() + self.assertEqual(str(cm.exception), "Couldn't get system back time.") + + +if __name__ == "__main__": + unittest.main() diff --git a/providers/base/tests/test_wol_client.py b/providers/base/tests/test_wol_client.py new file mode 100644 index 0000000000..3eb7801cef --- /dev/null +++ b/providers/base/tests/test_wol_client.py @@ -0,0 +1,452 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Canonical Ltd. +# Written by: +# Eugene Wu +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import unittest +from unittest.mock import patch, MagicMock, mock_open, Mock +import subprocess +import json +import struct + +from wol_client import ( + send_request_to_wol_server, + check_wakeup, + get_ip_mac, + set_rtc_wake, + s3_or_s5_system, + bring_up_system, + write_timestamp, + parse_args, + main, +) + + +class TestSendRequestToWolServerFunction(unittest.TestCase): + + @patch("urllib.request.urlopen") + def test_send_request_success(self, mock_urlopen): + mock_response = MagicMock() + mock_response.read.return_value = json.dumps( + {"message": "success"} + ).encode("utf-8") + mock_response.status = 200 + mock_urlopen.return_value = mock_response + mock_urlopen.return_value.__enter__.return_value = mock_response + + url = "http://192.168.1.1" + data = {"key": "value"} + + result = send_request_to_wol_server(url, data) + + self.assertIsNone(result) + + @patch("urllib.request.urlopen") + def test_send_request_failed_status_not_200(self, mock_urlopen): + mock_response = MagicMock() + mock_response.read.return_value = json.dumps( + {"message": "failure"} + ).encode("utf-8") + mock_response.getcode.return_value = 400 + mock_urlopen.return_value = mock_response + mock_urlopen.return_value.__enter__.return_value = mock_response + + with self.assertRaises(SystemExit): + send_request_to_wol_server( + "http://192.168.1.1", data={"key": "value"} + ) + + @patch("urllib.request.urlopen") + def test_send_request_failed_response_not_success(self, mock_urlopen): + mock_response = MagicMock() + mock_response.read.return_value = json.dumps( + {"message": "failure"} + ).encode("utf-8") + mock_response.getcode.return_value = 500 + mock_urlopen.return_value = mock_response + mock_urlopen.return_value.__enter__.return_value = mock_response + + with self.assertRaises(SystemExit): + send_request_to_wol_server( + "http://192.168.1.1", data={"key": "value"} + ) + + @patch("urllib.request.urlopen") + def test_send_request_unexpected_exception(self, mock_urlopen): + # Mock an unexpected exception + mock_urlopen.side_effect = Exception("Unexpected error") + + with self.assertRaises(SystemExit): + send_request_to_wol_server( + "http://192.168.1.1", data={"key": "value"} + ) + + self.assertEqual(mock_urlopen.call_count, 3) + + +class TestCheckWakeup(unittest.TestCase): + @patch("builtins.open", new_callable=mock_open, read_data="enabled\n") + def test_wakeup_enabled(self, mock_file): + self.assertTrue(check_wakeup("eth0")) + mock_file.assert_called_with( + "/sys/class/net/eth0/device/power/wakeup", "r" + ) + + @patch("builtins.open", new_callable=mock_open, read_data="disabled\n") + def test_wakeup_disabled(self, mock_file): + self.assertFalse(check_wakeup("eth0")) + mock_file.assert_called_with( + "/sys/class/net/eth0/device/power/wakeup", "r" + ) + + @patch("builtins.open", new_callable=mock_open, read_data="unknown\n") + def test_wakeup_unexpected_status(self, mock_file): + with self.assertRaises(ValueError) as context: + check_wakeup("eth0") + self.assertEqual( + str(context.exception), "Unexpected wakeup status: unknown" + ) + + @patch("builtins.open", side_effect=FileNotFoundError) + def test_interface_not_found(self, mock_file): + with self.assertRaises(FileNotFoundError) as context: + check_wakeup("nonexistent") + self.assertEqual( + str(context.exception), + "The network interface nonexistent does not exist.", + ) + + @patch("builtins.open", side_effect=Exception("Unexpected error")) + def test_unexpected_error(self, mock_file): + with self.assertRaises(Exception) as context: + check_wakeup("eth0") + self.assertEqual(str(context.exception), "Unexpected error") + + +class TestGetIPMac(unittest.TestCase): + @patch("socket.socket") + @patch("fcntl.ioctl") + def test_get_ip_mac_success(self, mock_ioctl, mock_socket): + # Mock data + interface = "eth0" + mock_ip = b"\xc0\xa8\x00\x01" # 192.168.0.1 + mock_mac = b"\x00\x0c)\x85\xac\x0e" # 00:0c:29:85:ac:0e + + # Configure the mock objects + mock_socket_instance = MagicMock() + mock_socket.return_value = mock_socket_instance + + def ioctl_side_effect(fd, request, arg): + if request == 0x8915: + return b"\x00" * 20 + mock_ip + b"\x00" * (256 - 24) + elif request == 0x8927: + return b"\x00" * 18 + mock_mac + b"\x00" * (256 - 24) + # raise IOError("Invalid request") + + mock_ioctl.side_effect = ioctl_side_effect + + ip_address, mac_address = get_ip_mac(interface) + + self.assertEqual(ip_address, "192.168.0.1") + self.assertEqual(mac_address, "00:0c:29:85:ac:0e") + + @patch("socket.socket") + @patch("fcntl.ioctl") + def test_get_ip_address_failure(self, mock_ioctl, mock_socket): + # Mock data + interface = "eth0" + mock_mac = b"\x00\x0c)\x85\xac\x0e" # 00:0c:29:85:ac:0e + + mock_socket_instance = MagicMock() + mock_socket.return_value = mock_socket_instance + + def ioctl_side_effect(fd, request, arg): + if request == 0x8915: + raise IOError("IP address retrieval failed") + elif request == 0x8927: + # return struct.pack('256s', b'\x00' * 18) + mock_mac + return b"\x00" * 18 + mock_mac + b"\x00" * (256 - 24) + + mock_ioctl.side_effect = ioctl_side_effect + + ip_address, mac_address = get_ip_mac(interface) + + self.assertIsNone(ip_address) + self.assertEqual(mac_address, "00:0c:29:85:ac:0e") + + @patch("socket.socket") + @patch("fcntl.ioctl") + def test_get_mac_address_failure(self, mock_ioctl, mock_socket): + interface = "eth0" + mock_ip = b"\xc0\xa8\x00\x01" # 192.168.0.1 + + mock_socket_instance = MagicMock() + mock_socket.return_value = mock_socket_instance + + def ioctl_side_effect(fd, request, arg): + if request == 0x8915: + return struct.pack("256s", b"\x00" * 16) + mock_ip + elif request == 0x8927: + raise IOError("MAC address retrieval failed") + + mock_ioctl.side_effect = ioctl_side_effect + + with self.assertRaises(SystemExit): + get_ip_mac(interface) + + +class TestSetRTCWake(unittest.TestCase): + + @patch("wol_client.subprocess.check_output") + def test_set_rtc_wake_success(self, mock_check_output): + """Test successful RTC wake time setting.""" + expected_wake_time = 180 + mock_check_output.return_value = b"" # Simulate successful execution + set_rtc_wake(expected_wake_time) + mock_check_output.assert_called_once_with( + ["rtcwake", "-m", "no", "-s", str(expected_wake_time)], stderr=-2 + ) + + @patch("wol_client.subprocess.check_output") + def test_set_rtc_wake_failed(self, mock_check_output): + """Test handling of subprocess.CalledProcessError.""" + mock_check_output.side_effect = subprocess.CalledProcessError( + 1, "rtcwake", output=b"Error message" + ) + with self.assertRaises(SystemExit) as cm: + set_rtc_wake(60) + self.assertEqual( + str(cm.exception), "Failed to set RTC wake: Error message" + ) + + @patch("wol_client.subprocess.check_output") + def test_set_rtc_wake_unexpected_error(self, mock_check_output): + """Test handling of unexpected exceptions.""" + mock_check_output.side_effect = Exception("Unexpected error") + with self.assertRaises(SystemExit) as cm: + set_rtc_wake(60) + self.assertEqual( + str(cm.exception), "An unexpected error occurred: Unexpected error" + ) + + +class TestS3OrS5System(unittest.TestCase): + + @patch("wol_client.subprocess.check_output") + def test_s3_success(self, mock_check_output): + mock_check_output.return_value = b"" + s3_or_s5_system("s3") + mock_check_output.assert_called_once_with( + ["systemctl", "suspend"], stderr=subprocess.STDOUT + ) + + @patch("wol_client.subprocess.check_output") + def test_s5_success(self, mock_check_output): + mock_check_output.return_value = b"" + s3_or_s5_system("s5") + mock_check_output.assert_called_once_with( + ["systemctl", "poweroff"], stderr=subprocess.STDOUT + ) + + def test_invalid_type(self): + with self.assertRaises(RuntimeError) as cm: + s3_or_s5_system("invalid") + self.assertEqual( + str(cm.exception), + "Error: type should be s3 or s5(provided: invalid)", + ) + + @patch("wol_client.subprocess.check_output") + def test_subprocess_error(self, mock_check_output): + mock_check_output.side_effect = subprocess.CalledProcessError( + 1, "cmd", output="Failed" + ) + with self.assertRaises(RuntimeError) as cm: + s3_or_s5_system("s3") + self.assertIn("Try to enter s3 failed", str(cm.exception)) + + +class TestBringUpSystem(unittest.TestCase): + + @patch("wol_client.set_rtc_wake") + def test_bring_up_system_rtc(self, mock_set_rtc_wake): + bring_up_system("rtc", "10:00") + mock_set_rtc_wake.assert_called_once_with("10:00") + + def test_bring_up_system_invalid_way(self): + with self.assertRaises(SystemExit) as cm: + bring_up_system("invalid", "10:00") + self.assertEqual( + str(cm.exception), + "we don't have the way invalid to bring up the system," + "Some error happened.", + ) + + +class TestWriteTimestamp(unittest.TestCase): + @patch("builtins.open") + def test_write_timestamp(self, mock_file_open): + """Tests if the timestamp is correctly written to the file.""" + write_timestamp("/tmp/timestamp_file") + mock_file_open.assert_called_once_with("/tmp/timestamp_file", "w") + + +class TestParseArgs(unittest.TestCase): + def test_parse_all_arguments(self): + """Tests parsing all arguments.""" + args = [ + "--interface", + "enp0s31f6", + "--target", + "192.168.1.10", + "--delay", + "120", + "--retry", + "3", + "--waketype", + "m", + "--powertype", + "s5", + "--timestamp_file", + "/tmp/time_stamp", + ] + parsed_args = parse_args(args) + self.assertEqual(parsed_args.interface, "enp0s31f6") + self.assertEqual(parsed_args.target, "192.168.1.10") + self.assertEqual(parsed_args.delay, 120) + self.assertEqual(parsed_args.retry, 3) + self.assertEqual(parsed_args.waketype, "m") + self.assertEqual(parsed_args.powertype, "s5") + self.assertEqual(parsed_args.timestamp_file, "/tmp/time_stamp") + + def test_parse_required_arguments(self): + """Tests parsing required arguments.""" + args = ["--interface", "eth0", "--target", "192.168.1.10"] + parsed_args = parse_args(args) + self.assertEqual(parsed_args.interface, "eth0") + self.assertEqual(parsed_args.target, "192.168.1.10") + self.assertEqual(parsed_args.delay, 60) # Default value + self.assertEqual(parsed_args.retry, 3) # Default value + self.assertEqual(parsed_args.waketype, "g") # Default value + self.assertIsNone(parsed_args.powertype) + + +def create_mock_args(): + return MagicMock( + delay=10, + retry=3, + interface="eth0", + target="192.168.1.1", + waketype="magic_packet", + timestamp_file="/tmp/timestamp", + powertype="s3", + ) + + +def create_mock_response(status_code=200, result="success"): + mock_response = MagicMock() + mock_response.status_code = status_code + mock_response.json.return_value = {"result": result} + return mock_response + + +class TestMainFunction(unittest.TestCase): + + @patch("wol_client.s3_or_s5_system") + @patch("wol_client.write_timestamp") + @patch("wol_client.bring_up_system") + @patch("wol_client.send_request_to_wol_server") + @patch("wol_client.check_wakeup") + @patch("wol_client.get_ip_mac") + @patch("wol_client.parse_args") + def test_main_success( + self, + mock_parse_args, + mock_get_ip_mac, + mock_check_wakeup, + mock_send_request_to_wol_server, + mock_bring_up_system, + mock_write_timestamp, + mock_s3_or_s5_system, + ): + mock_parse_args.return_value = create_mock_args() + mock_get_ip_mac.return_value = ("192.168.1.100", "00:11:22:33:44:55") + mock_check_wakeup.return_value = True + mock_send_request_to_wol_server.return_value = create_mock_response() + + main() + + mock_get_ip_mac.assert_called_once_with("eth0") + mock_send_request_to_wol_server.assert_called_once_with( + "http://192.168.1.1", + data={ + "DUT_MAC": "00:11:22:33:44:55", + "DUT_IP": "192.168.1.100", + "delay": 10, + "retry_times": 3, + "wake_type": "magic_packet", + }, + retry=3, + ) + mock_bring_up_system.assert_called_once_with("rtc", 10 * 3 * 2) + mock_write_timestamp.assert_called_once_with("/tmp/timestamp") + mock_s3_or_s5_system.assert_called_once_with("s3") + + @patch("wol_client.send_request_to_wol_server") + @patch("wol_client.get_ip_mac") + @patch("wol_client.check_wakeup") + @patch("wol_client.parse_args") + def test_main_ip_none( + self, + mock_parse_args, + mock_check_wakeup, + mock_get_ip_mac, + mock_send_request_to_wol_server, + ): + mock_parse_args.return_value = create_mock_args() + mock_get_ip_mac.return_value = (None, "00:11:22:33:44:55") + mock_check_wakeup.return_value = True + + with self.assertRaises(SystemExit) as cm: + main() + self.assertEqual( + str(cm.exception), "Error: failed to get the ip address." + ) + + @patch("wol_client.send_request_to_wol_server") + @patch("wol_client.get_ip_mac") + @patch("wol_client.check_wakeup") + @patch("wol_client.parse_args") + def test_main_checkwakeup_disable( + self, + mock_parse_args, + mock_check_wakeup, + mock_get_ip_mac, + mock_send_request_to_wol_server, + ): + mock_parse_args.return_value = create_mock_args() + mock_check_wakeup.return_value = False + mock_get_ip_mac.return_value = ("192.168.1.100", "00:11:22:33:44:55") + mock_send_request_to_wol_server.return_value = create_mock_response() + + with self.assertRaises(SystemExit) as cm: + main() + self.assertIn("wake-on-LAN of eth0 is disabled!", str(cm.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/providers/base/units/ethernet/jobs.pxu b/providers/base/units/ethernet/jobs.pxu index 092249cc3e..4561003428 100644 --- a/providers/base/units/ethernet/jobs.pxu +++ b/providers/base/units/ethernet/jobs.pxu @@ -439,3 +439,27 @@ command: network_reconnect_resume_test.py -t 10 -d wired _summary: Network reconnect resume test (wired) _purpose: Checks the length of time it takes to reconnect an existing wired connection after a suspend/resume cycle. + +id: ethernet/wol_auto_S3_{{ interface }} +template-id: ethernet/wol_auto_S3_interface +category_id: com.canonical.plainbox::ethernet +unit: template +template-resource: device +template-engine: jinja2 +template-filter: device.category == 'NETWORK' and device.mac != 'UNKNOWN' +_summary: Wake on LAN (WOL) automatic test from S3 - {{ interface }} +plugin: shell +environ: SERVER_WAKE_ON_LAN WAKE_ON_LAN_DELAY WAKE_ON_LAN_RETRY PLAINBOX_SESSION_SHARE +imports: from com.canonical.plainbox import manifest +requires: + manifest.has_ethernet_adapter == 'True' + manifest.has_ethernet_wake_on_lan_support == 'True' + manifest.has_wake_on_lan_server == 'True' +command: + set -e + wol_client.py --interface {{ interface }} --target "$SERVER_WAKE_ON_LAN" --delay "$WAKE_ON_LAN_DELAY" --retry "$WAKE_ON_LAN_RETRY" --waketype g --powertype s3 --timestamp_file "$PLAINBOX_SESSION_SHARE"/{{ interface }}_s3_timestamp + sleep 30 + wol_check.py --delay "$WAKE_ON_LAN_DELAY" --retry "$WAKE_ON_LAN_RETRY" --powertype s3 --timestamp_file "$PLAINBOX_SESSION_SHARE"/{{ interface }}_s3_timestamp +user: root +estimated_duration: 600 +flags: preserve-locale diff --git a/providers/base/units/ethernet/manifest.pxu b/providers/base/units/ethernet/manifest.pxu index a5e70dee9d..98a408bc24 100644 --- a/providers/base/units/ethernet/manifest.pxu +++ b/providers/base/units/ethernet/manifest.pxu @@ -8,6 +8,11 @@ id: has_ethernet_wake_on_lan_support _name: Wake-on-LAN support through Ethernet port value-type: bool +unit: manifest entry +id: has_wake_on_lan_server +_name: Has wake-on-Lan server for automation setup and running +value-type: bool + unit: manifest entry id: _ignore_disconnected_ethernet_interfaces _name: Ignore disconnected Ethernet interfaces diff --git a/providers/base/units/ethernet/test-plan.pxu b/providers/base/units/ethernet/test-plan.pxu index 19e29f775c..f80719d9fb 100644 --- a/providers/base/units/ethernet/test-plan.pxu +++ b/providers/base/units/ethernet/test-plan.pxu @@ -27,6 +27,15 @@ include: bootstrap_include: device +id: ethernet-wake-on-lan-cert-automated +unit: test plan +_name: Ethernet wake-on-LAN tests for S3 (automated) +_description: Ethernet wake-on-LAN tests for S3 (automated) +include: + ethernet/wol_auto_S3_interface +bootstrap_include: + device + id: ethernet-cert-automated unit: test plan _name: Ethernet tests (automated) diff --git a/providers/base/units/ethernet/wake-on-LAN-automatic-tests.md b/providers/base/units/ethernet/wake-on-LAN-automatic-tests.md new file mode 100644 index 0000000000..bc305fdccb --- /dev/null +++ b/providers/base/units/ethernet/wake-on-LAN-automatic-tests.md @@ -0,0 +1,93 @@ +# This is a file introducing Wake-on-LAN automatic test jobs + + To make the test of Wake-on-LAN automatic, we need: + The device under test (DUT) obtains its own network interface's MAC and IP address, retrieves the Wake-on-LAN server's IP and port from environment variables, sends the IP and MAC to the Wake-on-LAN server, it records the current timestamp and suspends itself after receiving a successful response from the server. + + A Wake-on-LAN HTTP server that receives requests from the device under test (DUT), extracts the DUT's MAC and IP addresses from the request, and then sends a Wake-on-LAN command to the DUT in an attempt to power it on. + + Once the DUT wakes up, it compares the previously recorded timestamp with the time when the system last exited suspend mode. If the system wakes up within a reasonable timeframe, it can be inferred that the wake-up was triggered by the Wake-on-LAN request, indicating a successful test. Otherwise, the system was woken up by the RTC, it implies that the Wake-on-LAN attempt failed. + +## id: ethernet/wol_auto_S3_{{ interface }} + +## Test Case enviroment +WOL server: + - apt install wakeonlan + - apt install python3-fastapi + - apt install uvicorn + - running wol_server.py + +DUT: + - manifest: + - has_ethernet_adapter + - has_ethernet_wake_on_lan_support + - has_wake_on_lan_server + + - enviroment variable: + - SERVER_WAKE_ON_LAN + - Specifies the address of the server responsible for handling Wake-on-LAN requests. + - Format: : + - Example: SERVER_WAKE_ON_LAN=192.168.0.1:8090 + - WAKE_ON_LAN_DELAY + - The time (in seconds) to wait between sending the Wake-on-LAN packet and checking for a response from the target device. + - Example: WAKE_ON_LAN_DELAY=60 + - WAKE_ON_LAN_RETRY + - The number of times to retry sending the Wake-on-LAN packet if the initial attempt fails. + - Example: WAKE_ON_LAN_RETRY=3 + +## Test scripts +### 1. wol_client.py +``` +usage: wol_client.py [-h] --interface INTERFACE --target TARGET [--delay DELAY] [--retry RETRY] [--waketype WAKETYPE] [--powertype POWERTYPE] [--timestamp_file TIMESTAMP_FILE] + + options: + -h, --help show this help message and exit + --interface INTERFACE + The network interface to use. + --target TARGET The target IP address or hostname. + --delay DELAY Delay between attempts (in seconds). + --retry RETRY Number of retry attempts. + --waketype WAKETYPE Type of wake operation.eg 'g' for magic packet + --powertype POWERTYPE + Type of s3 or s5. + --timestamp_file TIMESTAMP_FILE + The file to store the timestamp of test start. +``` +### 2. wol_check.py +``` +usage: wol_check.py [-h] --interface INTERFACE [--powertype POWERTYPE] [--timestamp_file TIMESTAMP_FILE] [--delay DELAY] [--retry RETRY] + + options: + -h, --help show this help message and exit + --interface INTERFACE + The network interface to use. + --powertype POWERTYPE + Waked from s3 or s5. + --timestamp_file TIMESTAMP_FILE + The file to store the timestamp of test start. + --delay DELAY Delay between attempts (in seconds). + --retry RETRY Number of retry attempts. +``` +### 3. wol_server.py + +Listen on the specified port to receive and handle the DUT's requests. + +``` +uvicorn wol_server:app --host 0.0.0.0 --port 8090 +``` + +## Work process of the Wake-on-LAN automatic test +1. The DUT gets its own NIC's MAC and IP, fetches WOL server info from environment variables, sends data to the server, receives a success response, records timestamp, sets rtcwake, and suspends. + +2. The WOL server receives DUT requests, extracts MAC, IP, delay, and retry count. After sending a success response, it waits, sends a WOL command, waits, and pings. If the ping fails, it retries up to the specified retry times. + +3. After system resume up, the DUT compares the resume time to the stored timestamp. If the elapsed time is between 0 and 1.5(delay*retry), WOL is assumed; otherwise, an RTC wake-up is inferred. + +## Limitation and Future work +The initial plan was to automate Wake-on-LAN testing for both S3 and S5 system states. The test would be split into two sub-test jobs: + +1. Pre-S3/S5 Job (wol_client.py): This job would run before entering either the S3 or S5 state. Its primary function would be to gather information, send requests, and record timestamps. +2. Post-Recovery Job (wol_check.py): This job, running on the S3 or S5 system itself after recovery, would perform log checks to determine if WoL triggered system wake-up. + +However, due to current limitations in Checkbox, we cannot guarantee a strict execution order for test jobs. This makes the initial approach infeasible. Consequently, with the current setup, we can only automate WoL testing for S3. + +We would like to keep the two scripts separately. This allows for future implementation of automated WoL testing for S5 if we can find a way to specify the strictly execution order of test jobs in the future. diff --git a/providers/certification-client/units/client-cert-desktop-24-04.pxu b/providers/certification-client/units/client-cert-desktop-24-04.pxu index 312133748d..eda8d46ad2 100644 --- a/providers/certification-client/units/client-cert-desktop-24-04.pxu +++ b/providers/certification-client/units/client-cert-desktop-24-04.pxu @@ -70,7 +70,6 @@ nested_part: after-suspend-usb3-cert-full after-suspend-usb-c-cert-full # after-suspend-wireless-cert-full # auto only - ethernet-wake-on-lan-cert-manual info-attachment-cert-manual exclude: keys/hibernate @@ -144,6 +143,7 @@ nested_part: # with. power-automated tpm-cert-automated + ethernet-wake-on-lan-cert-auto bootstrap_include: device graphics_card