From 02d676bda89c2fb8469ea81f7429c19c1e29df7c Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Sat, 15 Jul 2023 14:44:41 +0200 Subject: Initial commit --- client.py | 68 +++++++++++++ favicon.ico | Bin 0 -> 13094 bytes interfaces/__init__.py | 0 interfaces/desktopEvent.py | 16 +++ interfaces/endpoint.py | 112 +++++++++++++++++++++ interfaces/message.py | 14 +++ macropad.pyw | 244 +++++++++++++++++++++++++++++++++++++++++++++ readme.rst | 110 ++++++++++++++++++++ serial_conn.py | 46 +++++++++ socket_conn.py | 46 +++++++++ socketserver.py | 138 +++++++++++++++++++++++++ win32.py | 104 +++++++++++++++++++ xlib.py | 62 ++++++++++++ 13 files changed, 960 insertions(+) create mode 100755 client.py create mode 100644 favicon.ico create mode 100755 interfaces/__init__.py create mode 100755 interfaces/desktopEvent.py create mode 100755 interfaces/endpoint.py create mode 100755 interfaces/message.py create mode 100755 macropad.pyw create mode 100755 readme.rst create mode 100755 serial_conn.py create mode 100755 socket_conn.py create mode 100755 socketserver.py create mode 100755 win32.py create mode 100644 xlib.py diff --git a/client.py b/client.py new file mode 100755 index 0000000..24a8429 --- /dev/null +++ b/client.py @@ -0,0 +1,68 @@ +import sys +from threading import Thread +from queue import Queue +import time + +# +# This code provide an example for connecting to the socket server. +# + +from zope import component + +from interfaces import endpoint +component.provideAdapter(endpoint.EndPoint) + +from socket_conn import SocketConnection +conn = SocketConnection() + + +class Conn(Thread): + + def __init__(self, queue): + Thread.__init__(self) + self.queue = queue + self.s = component.queryAdapter(conn, endpoint.IEndpoint) + self.s.connect() + self.in_queue = Queue() + self.s.queue = self.in_queue + + def run(self): + + while True: + while not self.queue.empty(): + text = self.queue.get(False) + if text == -1: + sys.exit(0) + elif text == "": + continue + self.s.send([{"layout": text}]) + + if not self.s.isConnected(): + time.sleep(2) + self.s.connect() + continue + + + self.s.fetch() + + while not self.in_queue.empty(): + msg = self.in_queue.get(False) + print(msg) + # Print the promot again + print("\n> ", end=u"", flush=True) + + time.sleep(0.2) + +queue = Queue() +c = Conn(queue) +c.start() +print("Press ^D to quit") +while True: + try: + text = input("> ") + queue.put(text) + if text == "": + continue + except: + queue.put(-1) + break diff --git a/favicon.ico b/favicon.ico new file mode 100644 index 0000000..39209c5 Binary files /dev/null and b/favicon.ico differ diff --git a/interfaces/__init__.py b/interfaces/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/interfaces/desktopEvent.py b/interfaces/desktopEvent.py new file mode 100755 index 0000000..e8a30a4 --- /dev/null +++ b/interfaces/desktopEvent.py @@ -0,0 +1,16 @@ +from typing import Optional +from zope import interface +from zope.interface import Attribute + +class IDesktop(interface.Interface): + + queue = Attribute("""The queue to send the message""") + mapping = Attribute("""Correspondance between application and layer""") + + def getForegroundWindowTitle(sel) -> Optional[str]: + """ Return the name of the selected window + """ + + def start(self) -> None: + """ Start listening the events + """ diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py new file mode 100755 index 0000000..5aa369b --- /dev/null +++ b/interfaces/endpoint.py @@ -0,0 +1,112 @@ +from typing import Optional +from zope import interface +from zope.interface import Attribute + + +class IEndpoint(interface.Interface): + + queue = Attribute("""The queue to send the message""") + state = Attribute("""The connection status""") + + def isConnected(self) -> bool: + """ Return True if the endpoint is connected. This function and + [connect] are supposed to work together : the main loop application + will call reconnect after a while when isConnected return False. + """ + + def connect(self): + """ Connect or reconnect to the endpoint + """ + + def fetch(self): + """ Check the elements from the connection + """ + + def send(self, data: list[dict[str, str]]): + """ Send element to the connection + """ + +class IConnection(interface.Interface): + """ Define a connected element (serial, network…) + """ + + def connect(self) -> None: + """ Connect """ + + def read(self) -> str: + """ Read from the connection and return the bytes. + Return None if there is nothing to read (non-blocking) + Raise an exception if disconnected """ + + def write(self, content:str) -> None: + """ Write into the connection. + Raise an exception if disconnected """ + +from interfaces import desktopEvent +from zope import component +import json + +from interfaces.message import IMessage, Debug + +@component.adapter(IConnection) +@interface.implementer(IEndpoint) +class EndPoint(object): + + STATE_DISCONNECTED = 0 + STATE_CONNECTING = 1 + STATE_CONNECTED = 2 + + def __init__(self, connection): + self.connection = connection + self.queue = None + self.state = self.STATE_DISCONNECTED + + def isConnected(self) -> bool: + return self.state != self.STATE_DISCONNECTED + + def connect(self): + try: + self.connection.connect() + self.state = self.STATE_CONNECTED + component.handle(Debug("Connected")) + except Exception as e: + print(e) + self.state = self.STATE_DISCONNECTED + + def fetch(self): + """ Read from the peripherical and put them in the queue if any data + are available. + """ + if self.state != self.STATE_CONNECTED: + return + try: + received = self.connection.read() + except Exception as e: + print(e) + self.state = self.STATE_DISCONNECTED + return + else: + if received is None or received == b'': + # If we do not have any entry from the macropad, just return + return + + print("recv", received) + layout = str(received, "utf-8").strip() + desktop = component.queryUtility(desktopEvent.IDesktop) + if desktop is not None: + title = desktop.getForegroundWindowTitle() + else: + title = None + self.queue.put((layout, title)) + + def send(self, data: list[dict[str, str]]): + """ Send the data to the macropad + """ + if self.state != self.STATE_CONNECTED: + return + try: + j = json.dumps( data ) + self.connection.write(bytes(j, "utf-8")) + except Exception as e: + print(e) + self.state = self.STATE_DISCONNECTED diff --git a/interfaces/message.py b/interfaces/message.py new file mode 100755 index 0000000..23ce2ec --- /dev/null +++ b/interfaces/message.py @@ -0,0 +1,14 @@ +from zope import interface +from zope.interface import Attribute + +class IMessage(interface.Interface): + content = Attribute("""The text message to log""") + +class ISocketMessage(interface.Interface): + content = Attribute("""Content to send""") + +@interface.implementer(IMessage) +class Debug(object): + + def __init__(self, message): + self.content = message diff --git a/macropad.pyw b/macropad.pyw new file mode 100755 index 0000000..f35e141 --- /dev/null +++ b/macropad.pyw @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 + +import serial + +import tkinter as tk +from tkinter import Text +import pystray +from pystray import MenuItem as item +from PIL import Image, ImageTk + +import threading +from os import path +import sys + +from zope import component +import interfaces +from interfaces import endpoint +from interfaces.message import IMessage, Debug + +import configparser + +script_path = path.dirname(path.realpath(__file__)) +config_file = path.join(script_path, "config.ini") + +config = configparser.ConfigParser(delimiters="=") +config.read(config_file) + + + +init_mapping = config["mapping"] +mapping = dict(init_mapping) + +from queue import Queue +q = Queue() + +component.provideAdapter(interfaces.endpoint.EndPoint) + +# +# Guess the platform and the load the corresponding event listener +# + +from sys import platform +if platform == "win32": + import win32 + window_listener = win32.Listener(mapping, q) + window_listener.start() + component.provideUtility(window_listener, interfaces.desktopEvent.IDesktop) + +elif platform == 'linux': + import xlib + xlib_listener = xlib.Listener(mapping, q) + xlib_listener.start() + component.provideUtility(xlib_listener, interfaces.desktopEvent.IDesktop) + +# +# How to connect to the peripherical +# + +if config.has_section("connection.serial"): + + from serial_conn import SerialConnection + endpoint = component.queryAdapter(SerialConnection(config["connection.serial"]), endpoint.IEndpoint) + endpoint.queue = q + endpoint.connect() + component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) + +elif config.has_section("connection.socket"): + + from socket_conn import SocketConnection + endpoint = component.queryAdapter(SocketConnection(config["connection.socket"]), endpoint.IEndpoint) + endpoint.queue = q + endpoint.connect() + component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) + +if config.has_section("socket.serve"): + + import socketserver + server = socketserver.Handler(config["socket.serve"], q) + +else: + + server = None + + +class Icon(object): + + def __init__(self, image): + menu=( + item('Quit', self.quit_window), + item('Show', self.show_window, default=True), + item('Reset',self.reset), + ) + self.icon=pystray.Icon("name", image, "Macropad companion", menu) + + self.stop = threading.Event() + self.show_hide = threading.Event() + + # Start the icon into a new thread in order to keep the main loop control + icon_thread = threading.Thread(target=self.icon.run) + self.icon_thread = icon_thread + + def start(self): + self.icon_thread.start() + + def quit(self): + self.icon.stop() + + def quit_window(self): + self.stop.set() + + def show_window(self): + self.show_hide.set() + + def reset(self): + # Create a copy of the dictonnary before updating the keys + tmp_mapping = dict(mapping) + for key in tmp_mapping.keys() : + if key not in init_mapping.keys(): + del mapping[key] + +import json +class Application(object): + + def __init__(self): + self.window = tk.Tk() + self.text = Text(self.window, height=8) + ## State of the #pplication + self.running = True + self.visible = False + self.focused_window = None + self.last_layout = None + + component.provideHandler(self.log) + + # Window property + self.window.withdraw() + self.window.title("Macropad companion") + icon = path.join(script_path, "favicon.ico") + try: + self.window.iconbitmap(icon) + except: + pass + self.text.pack() + + # When closing, return back to the iconified mode + self.window.protocol("WM_DELETE_WINDOW", self.hide) + # Start the application in iconified mode + image=Image.open(icon) + self.icon = Icon(image) + self.icon.start() + component.handle(Debug("Started")) + + def hide(self): + self.icon.show_hide.clear() + self.visible = False + self.window.withdraw() + self.update() + + + def update(self): + if self.icon.stop.is_set(): + self.icon.quit() + self.running = False + self.window.destroy() + component.queryUtility(interfaces.desktopEvent.IDesktop).stop() + return + + if self.icon.show_hide.is_set(): + if not self.visible: + self.window.deiconify() + else: + self.window.withdraw() + self.icon.show_hide.clear() + self.visible = not self.visible + + @component.adapter(IMessage) + def log(self, message : str): + print(message.content) + try: + f = open("/tmp/macropad.log", "a") + f.write(message.content) + f.write("\n") + f.close() + self.text.insert("1.0", "\n") + self.text.insert("1.0", message.content) + self.text.delete("200.0", "end") + except Exception as e: + print(e) + + def send(self, data: str): + if data == self.last_layout: + return + self.last_layout = data + + if data == "Windows": + j = [ {"layout": data} ] + else: + # Add the Windows layout in the bottom layer + j = [ {"layout": "Windows"}, {"stack": data} ] + + component.queryUtility(interfaces.endpoint.IEndpoint).send(j) + + def associate(self, layout: str, name: str): + mapping[name] = layout + component.handle(Debug("Associating %s with %s" % (name, layout))) + + def exec(self): + try: + self.update() + if server is not None: server.update() + conn = component.queryUtility(interfaces.endpoint.IEndpoint) + if not conn.isConnected(): + component.handle(Debug("Reconnecting…")) + + conn.state = conn.STATE_CONNECTING + self.window.after(5000, conn.connect) + else: + conn.fetch() + except Exception as e: + component.handle(Debug( str(e) )) + print(e) + # Got any error, stop the application properly + self.icon.stop.set() + + if app.running: + self.window.after(200, self.exec) + + while not q.empty(): + last_layout, app_ = q.get(False) + self.send(last_layout) + if app_ is not None: self.associate(last_layout, app_) + +if __name__ == '__main__': + app = Application() + + # Initialize the main loop + app.exec() + try: + app.window.mainloop() + except: + app.running = False + app.window.destroy() + app.icon.quit() + component.queryUtility(interfaces.desktopEvent.IDesktop).stop() diff --git a/readme.rst b/readme.rst new file mode 100755 index 0000000..b8ffca2 --- /dev/null +++ b/readme.rst @@ -0,0 +1,110 @@ + +Requirements +============ + +python3 -m pip install pystray pyserial zope.component + +for debian you may need to install : + +sudo apt install python3-pil.imagetk + +Configuration +============= + +The configuration lies in an ini file: + +Serial connection +----------------- + +.. code:: ini + + [connexion.serial] + port = \\.\COM12 + + +Initialize a connection directly with the macropad + +Socket connection +----------------- + +.. code:: ini + + [connexion.socket] + port = 9999 + host = localhost + +Use a proxy to connect with the keyboard (as client) + +Socket server +------------- + +.. code:: ini + + [socket.serve] + port = 9999 + host = localhost + name = The name of the application to associate with + +Use a proxy to connect with the keyboard (as server) + +The mapping +----------- + +.. code:: ini + + [mapping] + Mozilla Firefox = Firefox + Teams = Teams + irssi = Irssi + … + +Mapping list +============ + +When a new window is selected, the application will look in the table for a +match, and send the corresponding layer to the endpoint. + +.. note:: + + When using XLib, the application will match the name with both the CLASS or + the NAME of the selected window + +If the application receive a notification for a new layer, it will also +register it, and store it for the session. This allow to make the application +"learn" about the layer you want to use. + +Serial connection +================= + +Sending message +--------------- + +The application send a json string to the endpoint (network or serial connection): + +.. code:: json + + {"layout": "Firefox"} + +This application does not handle the code in the keyboard. CircuitPython +provide a native library in order `to read or store json`_, and firmware build +upon it (like KMK) are easer to use with. + +.. _`to read or store json`: https://docs.circuitpython.org/en/latest/docs/library/json.html + +Reading message +--------------- + +The endpoint can also send a message to the application. For now, the message +is a raw string with the name of the layer. + +When the application receive such message, it will look for the active window +in the desktop, and register the application with this layer. If this +application is selected again, the application will ask the endpoint to switch +to the same layer again. + +Network connection +================== + +You can relay the events to another one instance using the network. I'm using +this when I'm connected over VNC in order to use the keyboard as if it was +plugged directly in the host. diff --git a/serial_conn.py b/serial_conn.py new file mode 100755 index 0000000..a08543a --- /dev/null +++ b/serial_conn.py @@ -0,0 +1,46 @@ +from zope import interface +from interfaces.message import ISocketMessage + +@interface.implementer(ISocketMessage) +class Layout(object): + + def __init__(self, message): + self.content = message + +from zope import component +from interfaces.endpoint import IConnection +import serial +from queue import Queue + +@interface.implementer(IConnection) +class SerialConnection(object): + """ Define a connected element (serial, network…) + """ + + def __init__(self, configuration): + self.serial_port = configuration["port"] + + def connect(self) -> None: + """ Connect """ + self.s = serial.Serial(port=self.serial_port, timeout=0) + + def read(self) -> str: + """ Read from the connection and return the bytes. + Return None if there is nothing to read (non-blocking) + Raise an exception if disconnected """ + + + in_waiting = self.s.in_waiting != 0 + if in_waiting == 0: + # If we do not have any entry from the macropad, just return + return None + + line = self.s.readline() + layout = str(line, "utf-8").strip() + component.handle(Layout(layout)) + return line + + def write(self, content:str) -> None: + """ Write into the connection. + Raise an exception if disconnected """ + self.s.write(content) diff --git a/socket_conn.py b/socket_conn.py new file mode 100755 index 0000000..0ac7230 --- /dev/null +++ b/socket_conn.py @@ -0,0 +1,46 @@ +# +# Describe a connection over a socket. +# +# This class is intended to be apdapted into IEndpoint interface + +import socket +from zope import interface +import errno + +from interfaces import endpoint + +from zope import component +from interfaces.message import Debug + +@interface.implementer(endpoint.IConnection) +class SocketConnection(object): + """ Define a connected element (serial, network…) + """ + + def __init__(self, configuration): + self.port = int(configuration["port"]) + self.host = configuration["host"] + + def connect(self) -> None: + """ Connect """ + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.connect((self.host, self.port)) + self.s.settimeout(0.0) + component.handle(Debug("Connected to the socket")) + + def read(self) -> str: + """ Read from the connection and return the bytes. + Return None if there is nothing to read (non-blocking) + Raise an exception if disconnected """ + try: + return self.s.recv(1024) + except socket.error as e: + err = e.args[0] + if err == errno.EAGAIN or err == errno.EWOULDBLOCK: + return None + raise e + + def write(self, content:str) -> None: + """ Write into the connection. + Raise an exception if disconnected """ + self.s.sendall(content) diff --git a/socketserver.py b/socketserver.py new file mode 100755 index 0000000..fb71d6d --- /dev/null +++ b/socketserver.py @@ -0,0 +1,138 @@ +""" The module manage the socket server which allow to send new commands to + apply to the macropad. + + Any client connected to the server will also be informed from the manual + changes in the macropad by the user. + + The module is not blocking, and run in the main thread. +""" + +import socket +import selectors + +from dataclasses import dataclass +from typing import Callable + +from zope import component +from interfaces import desktopEvent + +from interfaces.message import Debug, ISocketMessage + +@dataclass +class waitEvent: + callback = Callable[[object, socket, int], None] + +@dataclass +class sendEvent: + callback = Callable[[object, socket, int], None] + message: str + +actions = { + "stack" : lambda x : x, + "layout" : lambda x : x, + } + +class Handler(object): + """ Listen the incomming connexions and dispatch them into the connexion + object """ + + def __init__(self, configuration, layout_queue): + """ Initialize the the socket server + + The layout_queue will be populated with all the elements received from the socket. + """ + super().__init__() + self.sel = selectors.DefaultSelector() + component.provideHandler(self.sendMessage) + + self.socket = socket.socket() + self.socket.bind((configuration["host"], int(configuration["port"]))) + self.socket.listen(100) + self.socket.setblocking(False) + c = waitEvent() + c.callback = self.accept + self.sel.register(self.socket, selectors.EVENT_READ, c) + self.application_name = configuration["name"].lower() + + self.connexions = [] + self.layout_queue = layout_queue + + @component.adapter(ISocketMessage) + def sendMessage(self, content:ISocketMessage) -> None: + """ Send a message to all the sockets connected to the server + """ + c = sendEvent(message = content.content) + c.callback = self._send + map = self.sel.get_map() + for value in list(map.values())[:]: + if value.fileobj == self.socket: + # Ignore the main connexion, keep it only in accept mode. + continue + self.sel.modify(value.fileobj, selectors.EVENT_WRITE, c) + + def update(self): + """ Read the status for all the sockets, if they are available. + """ + events = self.sel.select(timeout=0) + for key, mask in events: + c = key.data + + if isinstance(c, waitEvent): + c.callback(key.fileobj, mask) + elif isinstance(c, sendEvent): + c.callback(key.fileobj, mask, c.message) + + def _switch_read(self, conn): + """ Send the socket back into read mode + """ + c = waitEvent() + c.callback = self._read + try: + self.sel.register(conn, selectors.EVENT_READ, c) + except KeyError: + self.sel.modify(conn, selectors.EVENT_READ, c) + + def accept(self, sock, mask): + conn, addr = sock.accept() # Should be ready + conn.setblocking(False) + self._switch_read(conn) + component.handle(Debug("Received new connection")) + + def _read(self:object, conn:socket, mask:int): + """ Internal method used to retreive data from the socket + """ + data = conn.recv(1024).strip() + if data == bytes("", "ascii"): + # A socket ready but sending garbage is a dead socket. + self.close(conn) + return + last_layout = str(data, "utf-8") + try: + js = json.loads(last_layout)[-1] + for key, value in js.items(): + last_layout = actions[key](value) + except: + print(last_layout) + + #title = component.queryUtility(desktopEvent.IDesktop).getForegroundWindowTitle() + # The application name is hardcoded in the code, and should be reported + # in the configuration or somewhere else. + # The name of the current application is not reliable because the + # message can be send with some lattency. + component.handle(Debug("Received %s from the socket" % (last_layout))) + + # Associate the layout with the current window + self.layout_queue.put((last_layout, self.application_name)) + + def _send(self:object, conn:socket, mask:int, text) -> None: + """ Internal method used to dispatch the message to the socket. """ + try: + conn.sendall(bytes(text , "utf-8")) + except: + self.close(conn) + return + self._switch_read(conn) + + def close(self, conn): + self.sel.unregister(conn) + conn.close() diff --git a/win32.py b/win32.py new file mode 100755 index 0000000..46d1efe --- /dev/null +++ b/win32.py @@ -0,0 +1,104 @@ +# Required for the window title name +from ctypes import wintypes, windll, create_unicode_buffer, WINFUNCTYPE +from typing import Self, Optional, Dict + +from zope import interface +from interfaces import desktopEvent + +# This code tries to hook the Focus changes events with the callback method. + +# The types of events we want to listen for, and the names we'll use for +# them in the log output. Pick from +# http://msdn.microsoft.com/en-us/library/windows/desktop/dd318066(v=vs.85).aspx +EVENT_OBJECT_FOCUS = 0x8005 +EVENT_OBJECT_NAMECHANGE = 0x800C +WINEVENT_OUTOFCONTEXT = 0x0000 + +eventTypes = { + #win32con.EVENT_SYSTEM_FOREGROUND: "Foreground", + EVENT_OBJECT_FOCUS: "Focus", + EVENT_OBJECT_NAMECHANGE: "NameChange", + #win32con.EVENT_OBJECT_SHOW: "Show", + #win32con.EVENT_SYSTEM_DIALOGSTART: "Dialog", + #win32con.EVENT_SYSTEM_CAPTURESTART: "Capture", + #win32con.EVENT_SYSTEM_MINIMIZEEND: "UnMinimize" +} + +WinEventProcType = WINFUNCTYPE( + None, + wintypes.HANDLE, + wintypes.DWORD, + wintypes.HWND, + wintypes.LONG, + wintypes.LONG, + wintypes.DWORD, + wintypes.DWORD +) + +def setHook(WinEventProc, eventType): + """ Register the hook foo being notified when the window change + """ + return windll.user32.SetWinEventHook( + eventType, + eventType, + 0, + WinEventProc, + 0, + 0, + WINEVENT_OUTOFCONTEXT + ) + +@interface.implementer(desktopEvent.IDesktop) +class Listener(object): + + def __init__(self: Self, mapping: Dict[str, str], queue): + self.WinEventProc = WinEventProcType(self.callback) + self.mapping = mapping + self.queue = queue + + def getForegroundWindowTitle(self: Self) -> Optional[str]: + """ Get the window title name. + Example found from https://stackoverflow.com/a/58355052 + See the function in the winuser librarry : + https://learn.microsoft.com/en-us/windows/win32/api/winuser/ + """ + hWnd = windll.user32.GetForegroundWindow() + length = windll.user32.GetWindowTextLengthW(hWnd) + buf = create_unicode_buffer(length + 1) + windll.user32.GetWindowTextW(hWnd, buf, length + 1) + + # 1-liner alternative: return buf.value if buf.value else None + if buf.value: + return buf.value + else: + return None + + def callback(self: Self, hWinEventHook, event, hwnd, idObject, idChild, dwEventThread, + dwmsEventTime) -> None: + + if hwnd != windll.user32.GetForegroundWindow(): + # Only check the active window, the events received from other + # windows are ignored. + return + + length = windll.user32.GetWindowTextLengthW(hwnd) + title = create_unicode_buffer(length + 1) + windll.user32.GetWindowTextW(hwnd, title, length + 1) + if title.value is None: + return + title = str(title.value).lower() + for pattern, code in self.mapping.items(): + if pattern in title: + self.queue.put ( (code, None) ) + return + self.queue.put ( ("Windows", None) ) + + def start(self: Self) -> None: + self.hookIDs = [setHook(self.WinEventProc, et) for et in eventTypes.keys()] + if not any(self.hookIDs): + print('SetWinEventHook failed') + sys.exit(1) + + def stop(self: Self) -> None: + for hook in self.hookIDs: + windll.user32.UnhookWinEvent(hook) diff --git a/xlib.py b/xlib.py new file mode 100644 index 0000000..b86a8b1 --- /dev/null +++ b/xlib.py @@ -0,0 +1,62 @@ +#!/usr/bin/python3 +import Xlib +import Xlib.display + +from zope import interface +from interfaces import desktopEvent +from threading import Thread + +from interfaces.message import IMessage, Debug +from zope import component + +@interface.implementer(desktopEvent.IDesktop) +class Listener(Thread): + + def __init__(self, mapping, queue): + Thread.__init__(self) + self.queue = queue + self.mapping = mapping + self.active_window = None + self.last_code = None + self.running = False + + def getForegroundWindowTitle(self): + """ Return the name of the selected window + """ + return self.active_window + + def run(self): + disp = Xlib.display.Display() + NET_WM_NAME = disp.intern_atom('_NET_WM_NAME') + WM_CLASS = disp.intern_atom('WM_CLASS') + NET_ACTIVE_WINDOW = disp.intern_atom('_NET_ACTIVE_WINDOW') + root = disp.screen().root + root.change_attributes(event_mask=Xlib.X.PropertyChangeMask) + + component.handle(Debug("Waiting for xlib event")) + self.running = True + while self.running: + try: + window_id = root.get_full_property(NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType).value[0] + window = disp.create_resource_object('window', window_id) + window.change_attributes(event_mask=Xlib.X.PropertyChangeMask) + window_name = str(window.get_full_property(NET_WM_NAME, 0).value).lower() + class_name = str(window.get_full_property(WM_CLASS, 0).value).lower() + except Xlib.error.XError: + window_name = None + class_name = None + + if window_name is not None and window_name != self.active_window: + + self.active_window = window_name + for pattern, code in self.mapping.items(): + if (pattern in window_name or pattern in class_name) and code != self.last_code: + + component.handle(Debug("Switching to '%s' for '%s'" % (code, window_name))) + self.queue.put ( (code, None) ) + self.last_code = code + break + event = disp.next_event() + + def stop(self): + self.running = False -- cgit v1.2.3