Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 167 additions & 62 deletions providers/base/bin/wifi_client_test_netplan.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,44 +49,85 @@ def print_cmd(cmd):
NETPLAN_TEST_CFG = "/etc/netplan/99-CREATED-BY-CHECKBOX.yaml"


def netplan_renderer():
"""
Check the renderer used by netplan on the system.
This function looks for the renderer used in the yaml files located in the
NETPLAN_CFG_PATHS directories, and returns the first renderer found. If the
renderer is not found, it defaults to "networkd".
"""
for basedir in NETPLAN_CFG_PATHS:
if os.path.exists(basedir):
files = glob.glob(os.path.join(basedir, "*.yaml"))
for f in files:
with open(f, "r") as file:
data = yaml.safe_load(file)
if "renderer" in data["network"]:
return data["network"]["renderer"]
return "networkd"


def check_and_get_renderer(renderer):
"""
Check if the renderer provided matches the one used by netplan. If the
renderer is set to "AutoDetect", it will return the detected renderer.
"""
machine_renderer = netplan_renderer()

if renderer == "AutoDetect":
return machine_renderer
elif renderer != machine_renderer:
raise SystemExit(
"ERROR: Renderer mismatch, expected: {}, got: {}".format(
machine_renderer, renderer
)
)
return renderer


def get_netplan_config_files():
config_files = []
for basedir in NETPLAN_CFG_PATHS:
if os.path.exists(basedir):
files = glob.glob(os.path.join(basedir, "*.yaml"))
config_files.extend(files)
return config_files


def netplan_config_backup():
print_head("Backup any existing netplan configuration files")
if os.path.exists(TMP_PATH):
print("Clear backup location")
shutil.rmtree(TMP_PATH)
for basedir in NETPLAN_CFG_PATHS:
print("Checking in {}".format(basedir))
files = glob.glob(os.path.join(basedir, "*.yaml"))
if files:
backup_loc = os.path.join(TMP_PATH, *basedir.split("/"))
os.makedirs(backup_loc)
for f in files:
print(" ", f)
shutil.copy(f, backup_loc)

config_files = get_netplan_config_files()

for f in config_files:
basedir = os.path.dirname(f)
print("Backing up from {}".format(basedir))
backup_loc = os.path.join(TMP_PATH, *basedir.split("/"))
os.makedirs(backup_loc, exist_ok=True)
print(" ", f)
shutil.copy(f, backup_loc)
print()


def netplan_config_wipe():
print_head("Delete any existing netplan configuration files")
# NOTE: this removes not just configs for wifis, but for all device types
# (ethernets, bridges) which could be dangerous
for basedir in NETPLAN_CFG_PATHS:
print("Wiping {}".format(basedir))
files = glob.glob(os.path.join(basedir, "*.yaml"))
for f in files:
print(" ", f)
os.remove(f)
config_files = get_netplan_config_files()
for f in config_files:
print(" ", f)
os.remove(f)

# If there's any file left in configuration folder then there's something
# not expected, stop the test
for basedir in NETPLAN_CFG_PATHS:
files = glob.glob(os.path.join(basedir, "*.yaml"))
if files:
print("ERROR: Failed to wipe netplan config files:")
for f in files:
print(" ", f)
netplan_config_restore()
raise SystemExit("Configuration file restored, exiting...")
remaining_files = get_netplan_config_files()
if remaining_files:
print("ERROR: Failed to wipe netplan config files:")
for f in remaining_files:
print(" ", f)
netplan_config_restore()
raise SystemExit("Configuration file restored, exiting...")
print()


Expand All @@ -104,14 +145,15 @@ def netplan_config_restore():
raise SystemExit("Failed to restore {}".format(f))


def generate_test_config(interface, ssid, psk, address, dhcp, wpa3):
def generate_test_config(interface, ssid, psk, address, dhcp, wpa3, renderer):
"""
Produce valid netplan yaml from arguments provided

Typical open ap with dhcp:
# This is the network config written by checkbox
network:
version: 2
renderer: networkd
wifis:
eth0:
access-points:
Expand Down Expand Up @@ -146,7 +188,11 @@ def generate_test_config(interface, ssid, psk, address, dhcp, wpa3):
interface_info["addresses"] = [address]

network_config = {
"network": {"version": 2, "wifis": {interface: interface_info}}
"network": {
"version": 2,
"renderer": renderer,
"wifis": {interface: interface_info},
}
}

# Serialize the dictionary to a YAML string using pyyaml
Expand Down Expand Up @@ -187,26 +233,63 @@ def netplan_apply_config():
return True


def _get_networkctl_state(interface):
cmd = "networkctl status --no-pager --no-legend {}".format(interface)
output = sp.check_output(cmd, shell=True)
for line in output.decode(sys.stdout.encoding).splitlines():
key, val = line.strip().split(":", maxsplit=1)
if key == "State":
return val
def get_interface_info(interface, renderer):
if renderer == "networkd":
cmd = "networkctl status --no-pager --no-legend {}".format(interface)
key_map = {"State": "state", "Gateway": "gateway"}
elif renderer == "NetworkManager":
cmd = "nmcli device show {}".format(interface)
key_map = {"GENERAL.STATE": "state", "IP4.GATEWAY": "gateway"}
else:
raise ValueError("Unknown renderer: {}".format(renderer))

return _get_cmd_info(cmd, key_map, renderer)


def _get_cmd_info(cmd, key_map, renderer):
info = {}
try:
output = sp.check_output(cmd, shell=True)
for line in output.decode(sys.stdout.encoding).splitlines():
# Skip lines that don't have a "key: value" format
if ":" not in line:
continue
key, val = line.strip().split(":", maxsplit=1)
key = key.strip()
val = val.strip()
if key in key_map:
info[key_map[key]] = val
except sp.CalledProcessError as e:
print("Error running {} command: {}".format(renderer, e))
return info


def _check_routable_state(interface, renderer):
"""
Check if the interface is in a routable state depending on the renderer
"""
routable = False
state = ""
info = get_interface_info(interface, renderer)
state = info.get("state", "")
if renderer == "networkd":
routable = "routable" in state
elif renderer == "NetworkManager":
routable = "connected" in state and "disconnected" not in state
else:
raise ValueError("Unknown renderer: {}".format(renderer))
return (routable, state)


def wait_for_routable(interface, max_wait=30):
routable = False
def wait_for_routable(interface, renderer, max_wait=30):
attempts = 0
routable = False
state = ""
while not routable and attempts < max_wait:
state = _get_networkctl_state(interface)
print(state)
if "routable" in state:
routable = True
break
(routable, state) = _check_routable_state(interface, renderer)
time.sleep(1)
attempts += 1

if routable:
print("Reached routable state")
else:
Expand All @@ -232,35 +315,43 @@ def print_route_info():
print()


def perform_ping_test(interface):
target = None
cmd = "networkctl status --no-pager --no-legend {}".format(interface)
print_cmd(cmd)
output = sp.check_output(cmd, shell=True)
for line in output.decode(sys.stdout.encoding).splitlines():
vals = line.strip().split(" ")
if len(vals) >= 2:
if vals[0] == "Gateway:":
target = vals[1]
print("Got gateway address: {}".format(target))
def get_gateway(interface, renderer):
gateway = None
info = get_interface_info(interface, renderer)
gateway = info.get("gateway", None)
print("Got gateway address: {}".format(gateway))
return gateway


def perform_ping_test(interface, renderer):
target = get_gateway(interface, renderer)

if target:
count = 5
result = ping(target, interface, count, 10)
print("Ping result: {}".format(result))
if result["received"] == count:
return True

return False


def print_journal_entries(start):
def print_journal_entries(start, renderer):
if renderer == "networkd":
render_service = "systemd-networkd.service"
elif renderer == "NetworkManager":
render_service = "NetworkManager.service"
else:
raise ValueError("Unknown renderer: {}".format(renderer))
print_head("Journal Entries")
cmd = (
"journalctl -q --no-pager "
"-u systemd-networkd.service "
"-u {} "
"-u wpa_supplicant.service "
" -u netplan-* "
'--since "{}" '.format(start.strftime("%Y-%m-%d %H:%M:%S"))
"-u netplan-* "
'--since "{}" '.format(
render_service, start.strftime("%Y-%m-%d %H:%M:%S")
)
)
print_cmd(cmd)
sp.call(cmd, shell=True)
Expand Down Expand Up @@ -322,6 +413,18 @@ def parse_args():
default=False,
)

parser.add_argument(
"--renderer",
choices=["networkd", "NetworkManager", "AutoDetect"],
help=(
"Set the backend daemon to use for netplan configuration. "
"If 'AutoDetect' is set, the script will try to determine the "
"renderer based on the system configuration. "
"(default: %(default)s)"
),
default="AutoDetect",
)

return parser.parse_args()


Expand All @@ -330,6 +433,8 @@ def main():

start_time = datetime.datetime.now()

renderer = check_and_get_renderer(args.renderer)
args.renderer = renderer
netplan_config_backup()
netplan_config_wipe()

Expand All @@ -346,12 +451,12 @@ def main():
if not netplan_apply_config():
delete_test_config()
netplan_config_restore()
print_journal_entries(start_time)
print_journal_entries(start_time, renderer)
raise SystemExit(1)
time.sleep(20)

print_head("Wait for interface to be routable")
reached_routable = wait_for_routable(args.interface)
reached_routable = wait_for_routable(args.interface, renderer)

test_result = False
if reached_routable:
Expand All @@ -363,7 +468,7 @@ def main():

# Check connection by ping or link status
print_head("Perform a ping test")
test_result = perform_ping_test(args.interface)
test_result = perform_ping_test(args.interface, renderer)
if test_result:
print("Connection test passed\n")
else:
Expand All @@ -373,14 +478,14 @@ def main():
netplan_config_restore()

if not netplan_apply_config():
print_journal_entries(start_time)
print_journal_entries(start_time, renderer)
raise SystemExit("ERROR: failed to apply restored config")

if not test_result:
print_journal_entries(start_time)
print_journal_entries(start_time, renderer)
raise SystemExit(1)

print_journal_entries(start_time)
print_journal_entries(start_time, renderer)


if __name__ == "__main__":
Expand Down
Loading