diff --git a/metabox/metabox/core/lxd_execute.py b/metabox/metabox/core/lxd_execute.py index 86028632d8..ecae5f2bde 100644 --- a/metabox/metabox/core/lxd_execute.py +++ b/metabox/metabox/core/lxd_execute.py @@ -23,12 +23,15 @@ import time from contextlib import suppress +from subprocess import CalledProcessError +from urllib import parse from loguru import logger import metabox.core.keys as keys from metabox.core.utils import ExecuteResult from ws4py.client.threadedclient import WebSocketClient from metabox.core.utils import _re +from pylxd.models.operation import Operation base_env = { @@ -40,6 +43,10 @@ class SafeWebSocketClient(WebSocketClient): + def __init__(self, *args, resource="", **kwargs): + super().__init__(*args, **kwargs) + self.resource = resource + def send(self, *args, **kwargs): """ This makes it so send will raise ConnectionError when send fails @@ -57,14 +64,31 @@ class InteractiveWebsocket(SafeWebSocketClient): # https://stackoverflow.com/a/14693789/1154487 ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - def __init__(self, *args, **kwargs): + def __init__( + self, + cmd, + *args, + ctl=None, + verbose=False, + container=None, + operation_id=0, + **kwargs, + ): super().__init__(*args, **kwargs) + self.cmd = cmd + self.ctl = ctl + self.verbose = verbose + self.container = container + self.operation_id = operation_id + self.stdout_data: str = "" self.stdout_data_full: str = "" self.stdout_lock = threading.Lock() self._new_data = False self._lookup_by_id = False self._connection_closed = False + self.result = None + self._result_lock = threading.Lock() def received_message(self, message): if len(message.data) == 0: @@ -79,6 +103,46 @@ def received_message(self, message): self.stdout_data_full += message_data_str self._new_data = True + def result_fetching_daemon(self): + while True: + operation = self.client.operations.get(self.operation_id) + try: + result = operation.metadata["return"] + # don't get the lock when the operation fails + with self._result_lock: + self.result = result + return + except KeyError: + time.sleep(0.1) + + def check(self, timeout=0): + deadline = time.time() + timeout + while True: + with self._result_lock: + result = self.result + if result == 0: + return True + elif result == 137: + raise TimeoutError( + f"Timeout expired while waiting for cmd: '{self.cmd}'" + ) + elif result is not None: + raise CalledProcessError( + result, self.cmd, output=self.stdout_data_full + ) + elif timeout and time.time() > deadline: + raise TimeoutError( + "Check timeout has expired, unable to fetch result" + ) + time.sleep(0.1) + + def connect(self): + result_fetcher = threading.Thread( + target=self.result_fetching_daemon, daemon=True + ) + result_fetcher.start() + super().connect() + def get_search_split(self, search_pattern): if isinstance(search_pattern, _re): search = search_pattern.search @@ -119,7 +183,6 @@ def expect(self, pattern, timeout=0): msg = "'{}' not found! Timeout is reached (set to {})".format( pattern, timeout ) - logger.warning(msg) raise TimeoutError(msg) elif self._connection_closed: # this could have been updated from the other thread, lets @@ -198,30 +261,6 @@ def select_test_plan(self, data, timeout=0): def send_signal(self, signal): self.ctl.send(json.dumps({"command": "signal", "signal": signal})) - @property - def container(self): - return self._container - - @container.setter - def container(self, container): - self._container = container - - @property - def ctl(self): - return self._ctl - - @ctl.setter - def ctl(self, ctl): - self._ctl = ctl - - @property - def verbose(self): - return self._verbose - - @verbose.setter - def verbose(self, verbose): - self._verbose = verbose - def env_wrapper(env): env_cmd = ["env"] @@ -243,19 +282,26 @@ def timeout_wrapper(timeout): def interactive_execute(container, cmd, env={}, verbose=False, timeout=0): if verbose: logger.trace(cmd) - ws_urls = container.raw_interactive_execute( - login_shell + env_wrapper(env) + shlex.split(cmd) + ws_urls = raw_interactive_execute( + container, + timeout_wrapper(timeout) + shlex.split(cmd), + environment=env, + user=1000, ) base_websocket_url = container.client.websocket_url - ctl = SafeWebSocketClient(base_websocket_url) - ctl.resource = ws_urls["control"] + ctl = SafeWebSocketClient(base_websocket_url, resource=ws_urls["control"]) ctl.connect() - pts = InteractiveWebsocket(base_websocket_url) - pts.resource = ws_urls["ws"] - pts.verbose = verbose - pts.container = container - pts.ctl = ctl + pts = InteractiveWebsocket( + cmd, + base_websocket_url, + ctl=ctl, + verbose=verbose, + container=container, + operation_id=ws_urls["operation_id"], + resource=ws_urls["ws"], + ) + pts.client = container.client pts.connect() return pts @@ -301,3 +347,40 @@ def on_stderr(msg): "".join(stderr_data), "".join(outdata_full), ) + + +# https://github.com/canonical/pylxd/blob/330f4a34cc0978eb41d809c713799bd88a20a43b/pylxd/models/instance.py#L526 +# Fork of the upstream raw_interactive_execute that also returns the +# operation_id, we need this to query the outcome of a command in interactive +# mode, we have to do this instead of use execute because it doesn't support to +# interactively send commands, even the stdio parameter fd is read whole +# before sending it +def raw_interactive_execute( + container, commands, environment=None, user=None, group=None, cwd=None +): + if environment is None: + environment = {} + + response = container.api["exec"].post( + json={ + "command": commands, + "environment": environment, + "wait-for-websocket": True, + "interactive": True, + "user": user, + "group": group, + "cwd": cwd, + } + ) + + fds = response.json()["metadata"]["metadata"]["fds"] + operation_id = response.json()["operation"].split("/")[-1].split("?")[0] + parsed = parse.urlparse( + container.client.api.operations[operation_id].websocket._api_endpoint + ) + + return { + "ws": f"{parsed.path}?secret={fds['0']}", + "control": f"{parsed.path}?secret={fds['control']}", + "operation_id": Operation.extract_operation_id(operation_id), + } diff --git a/metabox/metabox/core/scenario.py b/metabox/metabox/core/scenario.py index a7dedd293a..be64b96d02 100644 --- a/metabox/metabox/core/scenario.py +++ b/metabox/metabox/core/scenario.py @@ -27,6 +27,7 @@ import time import shlex +from subprocess import CalledProcessError from pylxd.exceptions import NotFound from metabox.core.actions import Start, Expect, Send, SelectTestPlan @@ -110,7 +111,12 @@ def run(self): # step that fail explicitly return false or raise an exception if not step(self): self.failures.append(step) - except (TimeoutError, ConnectionError, NotFound): + except ( + TimeoutError, + ConnectionError, + NotFound, + CalledProcessError, + ): self.failures.append(step) break if self._pts: @@ -241,7 +247,27 @@ def select_test_plan(self, testplan_id, timeout=60): outcome = self._pts.select_test_plan(testplan_id, timeout) return outcome - def run_cmd(self, cmd, env={}, interactive=False, timeout=0, target="all"): + def check(self, result, timeout): + if timeout < 0: + timeout = 0 + if isinstance(result, list): + return all(self.check(x, timeout) for x in result) + elif isinstance(result, bool): + return result + return result.check(timeout) + + def run_cmd( + self, + cmd, + env={}, + interactive=False, + timeout=0, + target="all", + check=True, + ): + # interactive mode run_cmd is non-interactive, therefore we may need + # to wait till deadline to fetch the result + deadline = time.time() + timeout if self.mode == "remote": if target == "controller": result = self.controller_machine.run_cmd( @@ -252,14 +278,19 @@ def run_cmd(self, cmd, env={}, interactive=False, timeout=0, target="all"): cmd, env, interactive, timeout ) else: - self.controller_machine.run_cmd(cmd, env, interactive, timeout) - result = self.agent_machine.run_cmd( - cmd, env, interactive, timeout - ) + result = [ + self.controller_machine.run_cmd( + cmd, env, interactive, timeout + ), + self.agent_machine.run_cmd(cmd, env, interactive, timeout), + ] else: result = self.local_machine.run_cmd(cmd, env, interactive, timeout) - return result + if check: + return self.check(result, time.time() - deadline) + else: + return result def reboot(self, timeout=0, target="all"): if self.mode == "remote": @@ -323,7 +354,9 @@ def reboot_agent(self): def is_agent_active(self): return self.agent_machine.is_agent_active() - def mktree(self, path, privileged=False, timeout=0, target="all"): + def mktree( + self, path, privileged=False, timeout=0, target="all", check=False + ): """ Creates a directory including any missing parent """ @@ -331,7 +364,9 @@ def mktree(self, path, privileged=False, timeout=0, target="all"): if privileged: cmd = ["sudo"] + cmd cmd_str = shlex.join(cmd) - return self.run_cmd(cmd_str, target=target, timeout=timeout) + return self.run_cmd( + cmd_str, target=target, timeout=timeout, check=check + ) def run_manage(self, args, timeout=0, target="all"): """ @@ -351,6 +386,6 @@ def assert_in_file(self, pattern, path): if isinstance(path, Path): path = str(path) - result = self.run_cmd(f"cat {path}") + result = self.run_cmd(f"cat {path}", check=False) regex = re.compile(pattern) return bool(regex.search(result.stdout)) diff --git a/metabox/metabox/core/utils.py b/metabox/metabox/core/utils.py index 196e93c37c..88aaecbadd 100644 --- a/metabox/metabox/core/utils.py +++ b/metabox/metabox/core/utils.py @@ -18,6 +18,7 @@ # along with Checkbox. If not, see . import re from typing import NamedTuple +from subprocess import CalledProcessError __all__ = ("tag", "ExecuteResult") @@ -38,6 +39,13 @@ class ExecuteResult(NamedTuple): stderr: str outstr_full: str + def check(self, timeout=0): + if self.exit_code == 0: + return True + raise CalledProcessError( + self.exit_code, "", output=self.stdout, stderr=self.stderr + ) + class _re: def __init__(self, pattern, flags=0): diff --git a/metabox/metabox/tools/session_corruptor.py b/metabox/metabox/tools/session_corruptor.py index 3a4ae96f12..3d6950dcbb 100644 --- a/metabox/metabox/tools/session_corruptor.py +++ b/metabox/metabox/tools/session_corruptor.py @@ -5,7 +5,7 @@ def corrupt(path): - with gzip.open(str(path), "rb") as f: + with gzip.open(str(path), "rt") as f: session = json.load(f) session["session"]["desired_job_list"].append( "@ invalid id - intentionally corrupted session"