diff --git a/checkbox-support/checkbox_support/dbus/gnome_monitor.py b/checkbox-support/checkbox_support/dbus/gnome_monitor.py index 4a5b6c3c7c..e83a04c911 100644 --- a/checkbox-support/checkbox_support/dbus/gnome_monitor.py +++ b/checkbox-support/checkbox_support/dbus/gnome_monitor.py @@ -1,6 +1,7 @@ # Copyright 2024 Canonical Ltd. # Written by: # Paolo Gentili +# Zhongning Li # # This is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3, @@ -21,14 +22,211 @@ - https://gitlab.gnome.org/GNOME/mutter/-/blob/main/tools/get-state.py """ -from collections import namedtuple -from typing import Dict, List, Tuple, Set, Callable, Any -from gi.repository import GLib, Gio import itertools - +from collections import OrderedDict +from enum import IntEnum +from time import sleep +from typing import ( + Any, + Callable, + Dict, + List, + Mapping, + NamedTuple, + Optional, + Set, + Tuple, +) from checkbox_support.monitor_config import MonitorConfig +from gi.repository import Gio, GLib # type: ignore + + +class Transform(IntEnum): + NORMAL_0 = 0 # landscape + NORMAL_90 = 1 # portrait right + NORMAL_180 = 2 # landscape flipped + NORMAL_270 = 3 # portrait left + + # The following are listed in the xml file + # but they aren't available in gnome control center + # maybe it's only intended for devices with an accelerometer? + FLIPPED_0 = 4 + FLIPPED_90 = 5 + FLIPPED_180 = 6 + FLIPPED_270 = 7 + + +# A plain 4-tuple with some basic info about the monitor +MonitorInfo = NamedTuple( + "MonitorInfo", + [ + ("connector", str), # HDMI-1, eDP-1, ... + ("vendor", str), # vendor string like BOE, Asus, etc. + ("product", str), + ("serial", str), + ], +) + + +# py3.5 can't use inline type annotations, +# otherwise the _*T types should be merged with their non-underscore versions +_MutterDisplayModeT = NamedTuple( + "_MutterDisplayModeT", + [ + ("id", str), + ("width", int), + ("height", int), + ("refresh_rate", float), + ("preferred_scale", float), + ("supported_scales", List[float]), + ("properties", Mapping[str, Any]), + ], +) + + +class MutterDisplayMode(_MutterDisplayModeT): + @property + def is_current(self) -> bool: + return self.properties.get("is-current", False) + + @property + def is_preferred(self) -> bool: + return self.properties.get("is-preferred", False) + + @property + def resolution(self) -> str: + """ + Resolution string, makes this class compatible with the Mode type + !! WARNING: This property does not exist on the original dbus object + !! This is only here for code that expects a string, new code should + !! use the width and height numbers + """ + return "{}x{}".format(self.width, self.height) + + +_PhysicalMonitorT = NamedTuple( + "_PhysicalMonitorT", + [ + ("info", MonitorInfo), + ("modes", List[MutterDisplayMode]), + # See: https://gitlab.gnome.org/GNOME/mutter/-/blob/main/data/ + # dbus-interfaces/org.gnome.Mutter.DisplayConfig.xml#L414 + ("properties", Mapping[str, Any]), + ], +) + + +class PhysicalMonitor(_PhysicalMonitorT): + + @classmethod + def from_variant(cls, v: GLib.Variant): + # not going to do extensive checks here + # since get_current_state already checked + assert len(v) == 3 + return cls( + MonitorInfo(*v[0]), [MutterDisplayMode(*raw) for raw in v[1]], v[2] + ) + + @property + def is_builtin(self) -> bool: + return self.properties.get("is-builtin", False) + + +_LogicalMonitorT = NamedTuple( + "_LogicalMonitorT", + [ + ("x", int), + ("y", int), + ("scale", float), + ("transform", Transform), + ("is_primary", bool), + ("monitors", List[MonitorInfo]), + ("properties", Mapping[str, Any]), + ], +) + + +class LogicalMonitor(_LogicalMonitorT): + @classmethod + def from_variant(cls, v: GLib.Variant): + assert len(v) == 7 + return cls( + *v[0:5], # the first 5 elements are flat, so just spread them + [MonitorInfo(*m) for m in v[5]], # type: ignore + v[6], # type: ignore + ) + + +_MutterDisplayConfigT = NamedTuple( + "_MutterDisplayConfigT", + [ + ("serial", int), + ("physical_monitors", List[PhysicalMonitor]), + ("logical_monitors", List[LogicalMonitor]), + # technically value type is GLib.Variant + # but it acts like a readonly map in this case + ("properties", Mapping[str, Any]), + ], +) + + +class MutterDisplayConfig(_MutterDisplayConfigT): + """The top level object that represents + the return value of the GetCurrentState dbus call + """ + + @classmethod + def from_variant(cls, v: GLib.Variant): + return cls( + v[0], + [PhysicalMonitor.from_variant(physical) for physical in v[1]], + [LogicalMonitor.from_variant(logical) for logical in v[2]], + v[3], + ) + + @property + def supports_mirroring(self) -> bool: + return self.properties.get("supports-mirroring", False) + + @property + def layout_mode(self) -> Optional[int]: + # only 2 possible layouts + # layout-mode = 2 => physical, 1 => logical + # If the key doesn't exist, then layout mode can't be changed + return self.properties.get("layout-mode", None) + + @property + def supports_changing_layout_mode(self) -> bool: + return self.properties.get("supports-changing-layout-mode", False) -Mode = namedtuple("Mode", ["id", "resolution", "is_preferred", "is_current"]) + @property + def global_scale_required(self) -> bool: + return self.properties.get("global-scale-required", False) + + +ResolutionFilter = Callable[[List[MutterDisplayMode]], List[MutterDisplayMode]] +# this only appears in apply_monitors_config +# it's very similar to LogicalMonitor but the last list element is different +LogicalMonitorConfig = Tuple[ + int, # x offset + int, # y offset + float, # scale, 1.0 for 100% + Transform, # transformation + bool, # is primary + List[ + Tuple[ + str, # connector name, same as .connector + str, # monitor mode id, same as .id + Dict[ + # only 2 possible keys: + # underscanning: bool + # color-mode: uint32 + str, + "bool|int", + ], + ] + ], +] class MonitorConfigGnome(MonitorConfig): @@ -43,6 +241,9 @@ class MonitorConfigGnome(MonitorConfig): NAME = "org.gnome.Mutter.DisplayConfig" INTERFACE = "org.gnome.Mutter.DisplayConfig" OBJECT_PATH = "/org/gnome/Mutter/DisplayConfig" + CONFIG_VARIANT_TYPE = GLib.VariantType.new( + "(ua((ssss)a(siiddada{sv})a{sv})a(iiduba(ssss)a{sv})a{sv})" + ) def __init__(self): self._proxy = Gio.DBusProxy.new_for_bus_sync( @@ -56,175 +257,251 @@ def __init__(self): ) def get_connected_monitors(self) -> Set[str]: - """Get list of connected monitors, even if inactive.""" - state = self._get_current_state() - return {monitor for monitor in state[1]} + """ + Get the connector names of each connected monitor, even if the monitor + is inactive + - To see if a monitor is active, iterate its modes and check if the + .is_current property is true for any of them + """ + state = self.get_current_state() + return {monitor.info.connector for monitor in state.physical_monitors} def get_current_resolutions(self) -> Dict[str, str]: - """Get current active resolutions for each monitor.""" + """ + Get current active resolutions for each monitor. + - Key is connector name like "eDP-1", + value is resolution string like 800x600 + - This method is only here to implement the one from the + parent abstract class, new code should directly access the resolution + integers from get_current_state + """ - state = self._get_current_state() + state = self.get_current_state() return { - monitor: mode.resolution - for monitor, modes in state[1].items() - for mode in modes - if mode.is_current + monitor.info.connector: mode.resolution + for monitor in state.physical_monitors + for mode in monitor.modes } def set_extended_mode(self) -> Dict[str, str]: """ Set to extend mode so that each monitor can be displayed at preferred, or if missing, maximum resolution. + - This always arranges the displays in a line - :return configuration: ordered list of applied Configuration + :return configuration: ordered dict of applied Configuration """ - state = self._get_current_state() + state = self.get_current_state() - extended_logical_monitors = [] - configuration = {} + extended_logical_monitors = [] # type: list[LogicalMonitorConfig] + # key is connector name, value is resolution string + configuration = OrderedDict() # type: OrderedDict[str, str] + # the x offset of the current monitor + # this will accumulate the width of each monitor as the iteration runs + # so that all monitors are arranged in a straight line position_x = 0 - for monitor, modes in state[1].items(): + for physical_monitor in state.physical_monitors: try: - target_mode = next(mode for mode in modes if mode.is_preferred) + target_mode = next( + mode + for mode in physical_monitor.modes + if mode.is_preferred + ) except StopIteration: - target_mode = self._get_mode_at_max(modes) + target_mode = self._get_mode_at_max(physical_monitor.modes) + + if type(target_mode) is not MutterDisplayMode: + # _get_mode_at_max should only be a filter + # and not change any of the items + raise TypeError("Unexpected mode:", target_mode) + extended_logical_monitors.append( ( - position_x, - 0, - 1.0, - 0, + position_x, # x + 0, # y + 1.0, # scale + Transform.NORMAL_0, position_x == 0, # first monitor is primary - [(monitor, target_mode.id, {})], + # .id is specific to MutterDisplayMode + # doesn't exist on plain Mode + [(physical_monitor.info.connector, target_mode.id, {})], ) ) - position_x += int(target_mode.resolution.split("x")[0]) - configuration[monitor] = target_mode.resolution + position_x += int(target_mode.width) + configuration[physical_monitor.info.connector] = ( + target_mode.resolution + ) - self._apply_monitors_config(state[0], extended_logical_monitors) + self._apply_monitors_config(state.serial, extended_logical_monitors) return configuration def cycle( self, - resolution: bool = True, - transform: bool = False, - resolution_filter: Callable[[List[Mode]], List[Mode]] = None, - action: Callable[..., Any] = None, - **kwargs + cycle_resolutions: bool = True, + cycle_transforms: bool = False, + resolution_filter: Optional[ResolutionFilter] = None, + post_cycle_action: Callable[..., Any] = lambda *a, **k: sleep(5), + **post_cycle_action_kwargs: Any ): - """ - Automatically cycle through the supported monitor configurations. + """Automatically cycle through the supported monitor configurations. + + :param cycle_resolutions: cycle through all resolutions if True + + :param cycle_transforms: cycle through all transforms/rotations if True + + :param resolution_filter: + A function that selects the resolutions to cycle + See the ResolutionFilter type for the input/output types - Args: - resolution: Cycling the resolution or not. + :param post_cycle_action: + A function to call after a cycle has finished. The first argument + to this function is always a string of the form - transform: Cycling the transform or not. + [monitor name]_[resolution]_[transform]_ - resolution_filter: For filtering resolution then returning needed, - it will take List[Mode] as parameter and return - the same data type + !! If specified, a delay MUST be introduced needed inside this + callback to wait for the monitors to respond !! + + :param post_cycle_action_kwargs: + The keyword args for post_cycle_action - action: For extra steps for each cycle, - the string is constructed by - [monitor name]_[resolution]_[transform]_. - Please note that the delay is needed inside this - callback to wait the monitors to response """ - monitors = [] - modes_list = [] - # ["normal": 0, "left": 1, "inverted": 6, "right": 3] - trans_list = [0, 1, 6, 3] if transform else [0] + connectors = [] # type: list[str] + modes_list = [] # type: list[list[MutterDisplayMode]] + transform_list = ( + ( + Transform.NORMAL_0, + Transform.NORMAL_90, + # preserving original behavior in case something depends on it + Transform.FLIPPED_180, + Transform.NORMAL_270, + ) + if cycle_transforms + else (Transform.NORMAL_0,) + ) + transformation_name_map = { + Transform.NORMAL_0: "normal", + Transform.NORMAL_270: "left", + Transform.FLIPPED_180: "inverted", + Transform.NORMAL_90: "right", + } # for multiple monitors, we need to create resolution combination - state = self._get_current_state() - for monitor, modes in state[1].items(): - monitors.append(monitor) - if resolution_filter: - modes_list.append(resolution_filter(modes)) + # modes_list[N] is a list of modes of monitor N + state = self.get_current_state() + for monitor in state.physical_monitors: + connectors.append(monitor.info.connector) + if resolution_filter is not None: + modes_list.append(resolution_filter(monitor.modes)) else: - modes_list.append(modes) - mode_combination = list(itertools.product(*modes_list)) + modes_list.append(monitor.modes) - for mode in mode_combination: - for trans in trans_list: - logical_monitors = [] + for combined_mode in itertools.product(*modes_list): + for trans in transform_list: + logical_monitors = [] # type: list[LogicalMonitorConfig] position_x = 0 - uni_string = "" - for monitor, m in zip(monitors, mode): - uni_string += "{}_{}_{}_".format( - monitor, - m.resolution, - { - 0: "normal", - 1: "left", - 3: "right", - 6: "inverted", - }.get(trans), + unique_str = "" # unique string for the current monitor state + + for connector, mode in zip(connectors, combined_mode): + transformation_str = transformation_name_map[trans] + unique_str += "{}_{}_{}_".format( + connector, mode.resolution, transformation_str ) logical_monitors.append( ( - position_x, - 0, - 1.0, - trans, - position_x == 0, # first monitor is primary - [(monitor, m.id, {})], + position_x, # x + 0, # y + 1.0, # scale + trans, # rotation + position_x == 0, # make the first monitor primary + # specify target connector name and mode + [(connector, mode.id, {})], ) ) - # left and right should convert x and y - xy = 1 if (trans == 1 or trans == 3) else 0 - position_x += int(m.resolution.split("x")[xy]) + + print( + "Setting {} to mode: {} transform: {}".format( + connector, mode.id, transformation_str + ), + # checkbox runtime might buffer this, + # force a flush here so it doesn't look frozen + flush=True, + ) + + x_offset = ( + mode.height + if trans in (Transform.NORMAL_90, Transform.NORMAL_270) + else mode.width + ) # left and right should convert x and y + position_x += x_offset # Sometimes the NVIDIA driver won't update the state. # Get the state before applying to avoid this issue. - state = self._get_current_state() - self._apply_monitors_config(state[0], logical_monitors) - if action: - action(uni_string, **kwargs) - if not resolution: + state = self.get_current_state() + self._apply_monitors_config(state.serial, logical_monitors) + + if post_cycle_action is not None: + post_cycle_action(unique_str, **post_cycle_action_kwargs) + + print("-" * 80, flush=True) # just a divider + + if not cycle_resolutions: break # change back to preferred monitor configuration + print("Finished cycling! Going back to extended mode") self.set_extended_mode() - def _get_current_state(self) -> Tuple[str, Dict[str, List[Mode]]]: + def get_current_state(self) -> MutterDisplayConfig: """ - Using DBus signal 'GetCurrentState' to get the available monitors + Calls the DBus signal 'GetCurrentState' to get the available monitors and related modes. - Check the related DBus XML definition for details over the expected - output data format. + The return type wraps the dbus object specified here: + https://gitlab.gnome.org/GNOME/mutter/-/blob/main/data/dbus-interfaces/ + org.gnome.Mutter.DisplayConfig.xml + + Use this method as the entry point for getting any kind of monitor info + + :raises TypeError: If the return type of GetCurrentState doesn't match + self.CONFIG_VARIANT_TYPE. This only happens if GNOME made a change + to this dbus signal """ - state = self._proxy.call_sync( + + raw = self._proxy.call_sync( method_name="GetCurrentState", - parameters=None, + parameters=None, # doesn't take any args + # don't auto start dbus "recipient"'s process if it's not running + # so if gnome is somehow dead, don't automatically fix it flags=Gio.DBusCallFlags.NO_AUTO_START, - timeout_msec=-1, + timeout_msec=-1, # don't timeout cancellable=None, ) - return ( - state[0], - { - monitor[0][0]: [ - Mode( - mode[0], - "{}x{}".format(mode[1], mode[2]), - mode[6].get("is-preferred", False), - mode[6].get("is-current", False), - ) - for mode in monitor[1] - ] - for monitor in state[1] - }, - ) + if not raw.get_type().equal(self.CONFIG_VARIANT_TYPE): + raise TypeError( + "DBus GetCurrentState returned unexpected type: " + + str(raw.get_type()) + ) + + return MutterDisplayConfig.from_variant(raw) - def _apply_monitors_config(self, serial: str, logical_monitors: List): + def _apply_monitors_config( + self, serial: int, logical_monitors: List[LogicalMonitorConfig] + ): """ - Using DBus signal 'ApplyMonitorsConfig' to apply the given monitor - configuration. + Call the DBus signal 'ApplyMonitorsConfig' to apply the config in + logical_monitors + + Original specification: + https://gitlab.gnome.org/GNOME/mutter/-/blob/main/data/dbus-interfaces/ + org.gnome.Mutter.DisplayConfig.xml#L477 - Check the related DBus XML definition for details over the expected - input data format. + :param serial: The .serial integer + from get_current_state + :param logical_monitors: The actual logical monitor configuration + - Use the LogicalMonitorConfig type to check if the config object + has the correct shape at dev time. Runtime check is done by the + GLib.Variant constructor """ self._proxy.call_sync( method_name="ApplyMonitorsConfig", diff --git a/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py b/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py index 10526ec291..ce229ead5f 100644 --- a/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py +++ b/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py @@ -8,13 +8,32 @@ sys.modules["gi"] = MagicMock() sys.modules["gi.repository"] = MagicMock() -from gi.repository import GLib, Gio +from gi.repository import GLib, Gio # type: ignore from checkbox_support.dbus.gnome_monitor import MonitorConfigGnome class MonitorConfigGnomeTests(unittest.TestCase): """This class provides test cases for the MonitorConfig DBus class.""" + class MockGetCurrentStateReturnValue: + + class MockGLibType: + def __init__(self, equal_check_return_value): + self.rv = equal_check_return_value + + def equal(self, other): + return self.rv + + def __init__(self, type_val, type_check_return_value=True): + self.type_val = type_val + self.type_check_return_value = type_check_return_value + + def __getitem__(self, key): + return self.type_val[key] + + def get_type(self): + return self.MockGLibType(self.type_check_return_value) + @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") def test_get_connected_monitors(self, mock_dbus_proxy): """ @@ -26,7 +45,7 @@ def test_get_connected_monitors(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + raw = ( 1, [ ( @@ -75,6 +94,9 @@ def test_get_connected_monitors(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) monitors = gnome_monitor.get_connected_monitors() self.assertSetEqual(monitors, {"eDP-1", "HDMI-1"}) @@ -89,7 +111,8 @@ def test_get_current_resolution(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + + raw = ( 1, [ ( @@ -138,11 +161,25 @@ def test_get_current_resolution(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) resolutions = gnome_monitor.get_current_resolutions() self.assertEqual( resolutions, {"eDP-1": "1920x1200", "HDMI-1": "2560x1440"} ) + @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") + def test_bad_input_type(self, mock_dbus_proxy): + mock_proxy = Mock() + mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy + + gnome_monitor = MonitorConfigGnome() + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(tuple(), False) + ) + self.assertRaises(TypeError, gnome_monitor.get_current_state) + @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") def test_set_extended_mode(self, mock_dbus_proxy): """ @@ -155,7 +192,7 @@ def test_set_extended_mode(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + raw = ( 1, [ ( @@ -204,6 +241,9 @@ def test_set_extended_mode(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) configuration = gnome_monitor.set_extended_mode() logical_monitors = [ @@ -232,8 +272,9 @@ def test_set_extended_mode(self, mock_dbus_proxy): } self.assertDictEqual(configuration, expected) + @patch("checkbox_support.dbus.gnome_monitor.sleep") @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") - def test_cycle(self, mock_dbus_proxy): + def test_cycle(self, mock_dbus_proxy: MagicMock, _): """ Test the cycle could get the right monitors configuration and send to ApplyMonitorsConfig. @@ -243,7 +284,8 @@ def test_cycle(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + + raw = ( 1, [ ( @@ -292,6 +334,9 @@ def test_cycle(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) gnome_monitor.cycle() logical_monitors = [ @@ -317,8 +362,9 @@ def test_cycle(self, mock_dbus_proxy): cancellable=None, ) + @patch("checkbox_support.dbus.gnome_monitor.sleep") @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") - def test_cycle_no_cycling(self, mock_dbus_proxy): + def test_cycle_no_cycling(self, mock_dbus_proxy: MagicMock, _): """ Test the cycle could get the right monitors configuration (without res and transform change) and send to ApplyMonitorsConfig. @@ -328,7 +374,7 @@ def test_cycle_no_cycling(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + raw = ( 1, [ ( @@ -377,13 +423,20 @@ def test_cycle_no_cycling(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) # mock callback - mock_callback = MagicMock() + mock_resolution_filter = MagicMock() + mock_resolution_filter.side_effect = ( + lambda x: x + ) # keep the real mode values + mock_post_cycle_action = MagicMock() gnome_monitor.cycle( - resolution=False, - transform=False, - resoultion_filter=mock_callback, - action=mock_callback, + cycle_resolutions=False, + cycle_transforms=False, + resolution_filter=mock_resolution_filter, + post_cycle_action=mock_post_cycle_action, ) logical_monitors = [ @@ -408,8 +461,12 @@ def test_cycle_no_cycling(self, mock_dbus_proxy): timeout_msec=-1, cancellable=None, ) - argument_string = mock_callback.call_args[0][0] + argument_string = mock_post_cycle_action.call_args[0][0] p1 = "HDMI-1_2560x1440_normal_" p2 = "eDP-1_1920x1200_normal_" pattern = re.compile("{}{}|{}{}".format(p1, p2, p2, p1)) assert pattern.match(argument_string) + + +if __name__ == "__main__": + unittest.main()