from threading import Thread from contextlib import contextmanager import Xlib import Xlib.display from zope import interface from zope import component from interfaces.message import Debug from interfaces import desktopEvent from consumer import Mapping @contextmanager def window_obj(disp, win_id): """Simplify dealing with BadWindow (make it either valid or None)""" window_obj = None if win_id: try: window_obj = disp.create_resource_object('window', win_id) except Xlib.error.XError: pass yield window_obj @interface.implementer(desktopEvent.IDesktop) class Listener(Thread): def __init__(self, mapping): super().__init__() self.mapping = mapping self.active_name = None self.active_window_id = None self.last_code = None self.running = False self.daemon = True def getForegroundWindowTitle(self): """ Return the name of the selected window """ return self.active_name def run(self): self.disp = Xlib.display.Display() root = self.disp.screen().root root.change_attributes(event_mask=Xlib.X.PropertyChangeMask) self.NET_WM_NAME = self.disp.intern_atom('_NET_WM_NAME') self.WM_CLASS = self.disp.intern_atom('WM_CLASS') self.WM_NAME = self.disp.intern_atom('WM_NAME') self.NET_ACTIVE_WINDOW = self.disp.intern_atom('_NET_ACTIVE_WINDOW') component.handle(Debug("Waiting for xlib event")) self.running = True while self.running: try: self.handle_event(root, self.disp.next_event()) except Exception as e: component.handle(Debug(str(e))) print(e) def get_property_value(self, window, property_): """ Return the requested property for the given window, or None if the property is not defined. """ prop_name = window.get_full_property(property_, 0) if prop_name is not None: return str(prop_name.value).lower() return None def handle_event(self, root, _event): try: window_id_property = root.get_full_property( self.NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType ) if window_id_property is None: return window_id = window_id_property.value[0] if self.active_window_id is not None and window_id != self.active_window_id: with window_obj(self.disp, self.active_window_id) as window: if window is not None: window.change_attributes(event_mask=Xlib.X.NoEventMask) with window_obj(self.disp, window_id) as window: window.change_attributes(event_mask=Xlib.X.PropertyChangeMask) self.active_window_id = window_id with window_obj(self.disp, self.active_window_id) as window: window_name = self.get_property_value(window, self.NET_WM_NAME) \ or self.get_property_value(window, self.WM_NAME) class_name = self.get_property_value(window, self.WM_CLASS) except Xlib.error.XError: return if window_name is None or window_name == self.active_name: return self.active_name = window_name found = False # Create a reveverse dictionnary for getting the users added first # In python, the elements in a dictionnary are sorted by insertion # order, so we get the user added first here for pattern in reversed(self.mapping.keys()): # Ignore the default layout if pattern == "default": continue if not ((window_name is not None and pattern.lstrip() in window_name) or (class_name is not None and pattern.lstrip() in class_name)): continue code = self.mapping[pattern] # We found something. At this point, even if we do not update # the layer (because it is the same as the previous) we do not # switch back to the default layer. found = True if code != self.last_code: component.handle(Debug(f"Switching to '{pattern}' for '{window_name}'")) component.handle(Mapping((code, None))) self.last_code = code # We found a matching configuration. Even if the match is the # same as the current one, we break the loop in order to # prevent another layer to update. break if not found: print(f"Mapping '{window_name}' / '{class_name}' to default") if not found and self.last_code != "default": default = self.mapping.get("default", None) if default is None: return component.handle(Mapping((default, None))) self.last_code = "default" def stop(self) -> None: """ Stop the thread properly. """ self.running = False