Skip to content
153 changes: 118 additions & 35 deletions metabox/metabox/core/lxd_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import time

from contextlib import suppress
from subprocess import CalledProcessError

from urllib import parse
from loguru import logger
import metabox.core.keys as keys
from metabox.core.utils import ExecuteResult
from ws4py.client.threadedclient import WebSocketClient
from metabox.core.utils import _re
from pylxd.models.operation import Operation


base_env = {
Expand All @@ -40,6 +43,10 @@


class SafeWebSocketClient(WebSocketClient):
def __init__(self, *args, resource="", **kwargs):
super().__init__(*args, **kwargs)
self.resource = resource

def send(self, *args, **kwargs):
"""
This makes it so send will raise ConnectionError when send fails
Expand All @@ -57,14 +64,31 @@ class InteractiveWebsocket(SafeWebSocketClient):
# https://stackoverflow.com/a/14693789/1154487
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

def __init__(self, *args, **kwargs):
def __init__(
self,
cmd,
*args,
ctl=None,
verbose=False,
container=None,
operation_id=0,
**kwargs,
):
super().__init__(*args, **kwargs)
self.cmd = cmd
self.ctl = ctl
self.verbose = verbose
self.container = container
self.operation_id = operation_id

self.stdout_data: str = ""
self.stdout_data_full: str = ""
self.stdout_lock = threading.Lock()
self._new_data = False
self._lookup_by_id = False
self._connection_closed = False
self.result = None
self._result_lock = threading.Lock()

def received_message(self, message):
if len(message.data) == 0:
Expand All @@ -79,6 +103,46 @@ def received_message(self, message):
self.stdout_data_full += message_data_str
self._new_data = True

def result_fetching_daemon(self):
while True:
operation = self.client.operations.get(self.operation_id)
try:
result = operation.metadata["return"]
# don't get the lock when the operation fails
with self._result_lock:
self.result = result
return
except KeyError:
time.sleep(0.1)

def check(self, timeout=0):
deadline = time.time() + timeout
while True:
with self._result_lock:
result = self.result
if result == 0:
return True
elif result == 137:
raise TimeoutError(
f"Timeout expired while waiting for cmd: '{self.cmd}'"
)
elif result is not None:
raise CalledProcessError(
result, self.cmd, output=self.stdout_data_full
)
elif timeout and time.time() > deadline:
raise TimeoutError(
"Check timeout has expired, unable to fetch result"
)
time.sleep(0.1)

def connect(self):
result_fetcher = threading.Thread(
target=self.result_fetching_daemon, daemon=True
)
result_fetcher.start()
super().connect()

def get_search_split(self, search_pattern):
if isinstance(search_pattern, _re):
search = search_pattern.search
Expand Down Expand Up @@ -119,7 +183,6 @@ def expect(self, pattern, timeout=0):
msg = "'{}' not found! Timeout is reached (set to {})".format(
pattern, timeout
)
logger.warning(msg)
raise TimeoutError(msg)
elif self._connection_closed:
# this could have been updated from the other thread, lets
Expand Down Expand Up @@ -198,30 +261,6 @@ def select_test_plan(self, data, timeout=0):
def send_signal(self, signal):
self.ctl.send(json.dumps({"command": "signal", "signal": signal}))

@property
def container(self):
return self._container

@container.setter
def container(self, container):
self._container = container

@property
def ctl(self):
return self._ctl

@ctl.setter
def ctl(self, ctl):
self._ctl = ctl

@property
def verbose(self):
return self._verbose

@verbose.setter
def verbose(self, verbose):
self._verbose = verbose


def env_wrapper(env):
env_cmd = ["env"]
Expand All @@ -243,19 +282,26 @@ def timeout_wrapper(timeout):
def interactive_execute(container, cmd, env={}, verbose=False, timeout=0):
if verbose:
logger.trace(cmd)
ws_urls = container.raw_interactive_execute(
login_shell + env_wrapper(env) + shlex.split(cmd)
ws_urls = raw_interactive_execute(
container,
timeout_wrapper(timeout) + shlex.split(cmd),
environment=env,
user=1000,
)

base_websocket_url = container.client.websocket_url
ctl = SafeWebSocketClient(base_websocket_url)
ctl.resource = ws_urls["control"]
ctl = SafeWebSocketClient(base_websocket_url, resource=ws_urls["control"])
ctl.connect()
pts = InteractiveWebsocket(base_websocket_url)
pts.resource = ws_urls["ws"]
pts.verbose = verbose
pts.container = container
pts.ctl = ctl
pts = InteractiveWebsocket(
cmd,
base_websocket_url,
ctl=ctl,
verbose=verbose,
container=container,
operation_id=ws_urls["operation_id"],
resource=ws_urls["ws"],
)
pts.client = container.client
pts.connect()
return pts

Expand Down Expand Up @@ -301,3 +347,40 @@ def on_stderr(msg):
"".join(stderr_data),
"".join(outdata_full),
)


# https://github.com/canonical/pylxd/blob/330f4a34cc0978eb41d809c713799bd88a20a43b/pylxd/models/instance.py#L526
# Fork of the upstream raw_interactive_execute that also returns the
# operation_id, we need this to query the outcome of a command in interactive
# mode, we have to do this instead of use execute because it doesn't support to
# interactively send commands, even the stdio parameter fd is read whole
# before sending it
def raw_interactive_execute(
container, commands, environment=None, user=None, group=None, cwd=None
):
if environment is None:
environment = {}

response = container.api["exec"].post(
json={
"command": commands,
"environment": environment,
"wait-for-websocket": True,
"interactive": True,
"user": user,
"group": group,
"cwd": cwd,
}
)

fds = response.json()["metadata"]["metadata"]["fds"]
operation_id = response.json()["operation"].split("/")[-1].split("?")[0]
parsed = parse.urlparse(
container.client.api.operations[operation_id].websocket._api_endpoint
)

return {
"ws": f"{parsed.path}?secret={fds['0']}",
"control": f"{parsed.path}?secret={fds['control']}",
"operation_id": Operation.extract_operation_id(operation_id),
}
55 changes: 45 additions & 10 deletions metabox/metabox/core/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import time
import shlex

from subprocess import CalledProcessError
from pylxd.exceptions import NotFound

from metabox.core.actions import Start, Expect, Send, SelectTestPlan
Expand Down Expand Up @@ -110,7 +111,12 @@ def run(self):
# step that fail explicitly return false or raise an exception
if not step(self):
self.failures.append(step)
except (TimeoutError, ConnectionError, NotFound):
except (
TimeoutError,
ConnectionError,
NotFound,
CalledProcessError,
):
self.failures.append(step)
break
if self._pts:
Expand Down Expand Up @@ -241,7 +247,27 @@ def select_test_plan(self, testplan_id, timeout=60):
outcome = self._pts.select_test_plan(testplan_id, timeout)
return outcome

def run_cmd(self, cmd, env={}, interactive=False, timeout=0, target="all"):
def check(self, result, timeout):
if timeout < 0:
timeout = 0
if isinstance(result, list):
return all(self.check(x, timeout) for x in result)
elif isinstance(result, bool):
return result
return result.check(timeout)

def run_cmd(
self,
cmd,
env={},
interactive=False,
timeout=0,
target="all",
check=True,
):
# interactive mode run_cmd is non-interactive, therefore we may need
# to wait till deadline to fetch the result
deadline = time.time() + timeout
if self.mode == "remote":
if target == "controller":
result = self.controller_machine.run_cmd(
Expand All @@ -252,14 +278,19 @@ def run_cmd(self, cmd, env={}, interactive=False, timeout=0, target="all"):
cmd, env, interactive, timeout
)
else:
self.controller_machine.run_cmd(cmd, env, interactive, timeout)
result = self.agent_machine.run_cmd(
cmd, env, interactive, timeout
)
result = [
self.controller_machine.run_cmd(
cmd, env, interactive, timeout
),
self.agent_machine.run_cmd(cmd, env, interactive, timeout),
]
else:
result = self.local_machine.run_cmd(cmd, env, interactive, timeout)

return result
if check:
return self.check(result, time.time() - deadline)
else:
return result

def reboot(self, timeout=0, target="all"):
if self.mode == "remote":
Expand Down Expand Up @@ -323,15 +354,19 @@ def reboot_agent(self):
def is_agent_active(self):
return self.agent_machine.is_agent_active()

def mktree(self, path, privileged=False, timeout=0, target="all"):
def mktree(
self, path, privileged=False, timeout=0, target="all", check=False
):
"""
Creates a directory including any missing parent
"""
cmd = ["mkdir", "-p", path]
if privileged:
cmd = ["sudo"] + cmd
cmd_str = shlex.join(cmd)
return self.run_cmd(cmd_str, target=target, timeout=timeout)
return self.run_cmd(
cmd_str, target=target, timeout=timeout, check=check
)

def run_manage(self, args, timeout=0, target="all"):
"""
Expand All @@ -351,6 +386,6 @@ def assert_in_file(self, pattern, path):
if isinstance(path, Path):
path = str(path)

result = self.run_cmd(f"cat {path}")
result = self.run_cmd(f"cat {path}", check=False)
regex = re.compile(pattern)
return bool(regex.search(result.stdout))
8 changes: 8 additions & 0 deletions metabox/metabox/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
import re
from typing import NamedTuple
from subprocess import CalledProcessError

__all__ = ("tag", "ExecuteResult")

Expand All @@ -38,6 +39,13 @@ class ExecuteResult(NamedTuple):
stderr: str
outstr_full: str

def check(self, timeout=0):
if self.exit_code == 0:
return True
raise CalledProcessError(
self.exit_code, "", output=self.stdout, stderr=self.stderr
)


class _re:
def __init__(self, pattern, flags=0):
Expand Down
2 changes: 1 addition & 1 deletion metabox/metabox/tools/session_corruptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


def corrupt(path):
with gzip.open(str(path), "rb") as f:
with gzip.open(str(path), "rt") as f:
session = json.load(f)
session["session"]["desired_job_list"].append(
"@ invalid id - intentionally corrupted session"
Expand Down