diff options
-rwxr-xr-x | client.py | 68 | ||||
-rw-r--r-- | favicon.ico | bin | 0 -> 13094 bytes | |||
-rwxr-xr-x | interfaces/__init__.py | 0 | ||||
-rwxr-xr-x | interfaces/desktopEvent.py | 16 | ||||
-rwxr-xr-x | interfaces/endpoint.py | 112 | ||||
-rwxr-xr-x | interfaces/message.py | 14 | ||||
-rwxr-xr-x | macropad.pyw | 244 | ||||
-rwxr-xr-x | readme.rst | 110 | ||||
-rwxr-xr-x | serial_conn.py | 46 | ||||
-rwxr-xr-x | socket_conn.py | 46 | ||||
-rwxr-xr-x | socketserver.py | 138 | ||||
-rwxr-xr-x | win32.py | 104 | ||||
-rw-r--r-- | xlib.py | 62 |
13 files changed, 960 insertions, 0 deletions
diff --git a/client.py b/client.py new file mode 100755 index 0000000..24a8429 --- /dev/null +++ b/client.py @@ -0,0 +1,68 @@ +import sys
+from threading import Thread
+from queue import Queue
+import time
+
+#
+# This code provide an example for connecting to the socket server.
+#
+
+from zope import component
+
+from interfaces import endpoint
+component.provideAdapter(endpoint.EndPoint)
+
+from socket_conn import SocketConnection
+conn = SocketConnection()
+
+
+class Conn(Thread):
+
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.s = component.queryAdapter(conn, endpoint.IEndpoint)
+ self.s.connect()
+ self.in_queue = Queue()
+ self.s.queue = self.in_queue
+
+ def run(self):
+
+ while True:
+ while not self.queue.empty():
+ text = self.queue.get(False)
+ if text == -1:
+ sys.exit(0)
+ elif text == "":
+ continue
+ self.s.send([{"layout": text}])
+
+ if not self.s.isConnected():
+ time.sleep(2)
+ self.s.connect()
+ continue
+
+
+ self.s.fetch()
+
+ while not self.in_queue.empty():
+ msg = self.in_queue.get(False)
+ print(msg)
+ # Print the promot again
+ print("\n> ", end=u"", flush=True)
+
+ time.sleep(0.2)
+
+queue = Queue()
+c = Conn(queue)
+c.start()
+print("Press ^D to quit")
+while True:
+ try:
+ text = input("> ")
+ queue.put(text)
+ if text == "":
+ continue
+ except:
+ queue.put(-1)
+ break
diff --git a/favicon.ico b/favicon.ico Binary files differnew file mode 100644 index 0000000..39209c5 --- /dev/null +++ b/favicon.ico diff --git a/interfaces/__init__.py b/interfaces/__init__.py new file mode 100755 index 0000000..e69de29 --- /dev/null +++ b/interfaces/__init__.py diff --git a/interfaces/desktopEvent.py b/interfaces/desktopEvent.py new file mode 100755 index 0000000..e8a30a4 --- /dev/null +++ b/interfaces/desktopEvent.py @@ -0,0 +1,16 @@ +from typing import Optional
+from zope import interface
+from zope.interface import Attribute
+
+class IDesktop(interface.Interface):
+
+ queue = Attribute("""The queue to send the message""")
+ mapping = Attribute("""Correspondance between application and layer""")
+
+ def getForegroundWindowTitle(sel) -> Optional[str]:
+ """ Return the name of the selected window
+ """
+
+ def start(self) -> None:
+ """ Start listening the events
+ """
diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py new file mode 100755 index 0000000..5aa369b --- /dev/null +++ b/interfaces/endpoint.py @@ -0,0 +1,112 @@ +from typing import Optional
+from zope import interface
+from zope.interface import Attribute
+
+
+class IEndpoint(interface.Interface):
+
+ queue = Attribute("""The queue to send the message""")
+ state = Attribute("""The connection status""")
+
+ def isConnected(self) -> bool:
+ """ Return True if the endpoint is connected. This function and
+ [connect] are supposed to work together : the main loop application
+ will call reconnect after a while when isConnected return False.
+ """
+
+ def connect(self):
+ """ Connect or reconnect to the endpoint
+ """
+
+ def fetch(self):
+ """ Check the elements from the connection
+ """
+
+ def send(self, data: list[dict[str, str]]):
+ """ Send element to the connection
+ """
+
+class IConnection(interface.Interface):
+ """ Define a connected element (serial, network…)
+ """
+
+ def connect(self) -> None:
+ """ Connect """
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+
+from interfaces import desktopEvent
+from zope import component
+import json
+
+from interfaces.message import IMessage, Debug
+
+@component.adapter(IConnection)
+@interface.implementer(IEndpoint)
+class EndPoint(object):
+
+ STATE_DISCONNECTED = 0
+ STATE_CONNECTING = 1
+ STATE_CONNECTED = 2
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.queue = None
+ self.state = self.STATE_DISCONNECTED
+
+ def isConnected(self) -> bool:
+ return self.state != self.STATE_DISCONNECTED
+
+ def connect(self):
+ try:
+ self.connection.connect()
+ self.state = self.STATE_CONNECTED
+ component.handle(Debug("Connected"))
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
+
+ def fetch(self):
+ """ Read from the peripherical and put them in the queue if any data
+ are available.
+ """
+ if self.state != self.STATE_CONNECTED:
+ return
+ try:
+ received = self.connection.read()
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
+ return
+ else:
+ if received is None or received == b'':
+ # If we do not have any entry from the macropad, just return
+ return
+
+ print("recv", received)
+ layout = str(received, "utf-8").strip()
+ desktop = component.queryUtility(desktopEvent.IDesktop)
+ if desktop is not None:
+ title = desktop.getForegroundWindowTitle()
+ else:
+ title = None
+ self.queue.put((layout, title))
+
+ def send(self, data: list[dict[str, str]]):
+ """ Send the data to the macropad
+ """
+ if self.state != self.STATE_CONNECTED:
+ return
+ try:
+ j = json.dumps( data )
+ self.connection.write(bytes(j, "utf-8"))
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
diff --git a/interfaces/message.py b/interfaces/message.py new file mode 100755 index 0000000..23ce2ec --- /dev/null +++ b/interfaces/message.py @@ -0,0 +1,14 @@ +from zope import interface
+from zope.interface import Attribute
+
+class IMessage(interface.Interface):
+ content = Attribute("""The text message to log""")
+
+class ISocketMessage(interface.Interface):
+ content = Attribute("""Content to send""")
+
+@interface.implementer(IMessage)
+class Debug(object):
+
+ def __init__(self, message):
+ self.content = message
diff --git a/macropad.pyw b/macropad.pyw new file mode 100755 index 0000000..f35e141 --- /dev/null +++ b/macropad.pyw @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 + +import serial + +import tkinter as tk +from tkinter import Text +import pystray +from pystray import MenuItem as item +from PIL import Image, ImageTk + +import threading +from os import path +import sys + +from zope import component +import interfaces +from interfaces import endpoint +from interfaces.message import IMessage, Debug + +import configparser + +script_path = path.dirname(path.realpath(__file__)) +config_file = path.join(script_path, "config.ini") + +config = configparser.ConfigParser(delimiters="=") +config.read(config_file) + + + +init_mapping = config["mapping"] +mapping = dict(init_mapping) + +from queue import Queue +q = Queue() + +component.provideAdapter(interfaces.endpoint.EndPoint) + +# +# Guess the platform and the load the corresponding event listener +# + +from sys import platform +if platform == "win32": + import win32 + window_listener = win32.Listener(mapping, q) + window_listener.start() + component.provideUtility(window_listener, interfaces.desktopEvent.IDesktop) + +elif platform == 'linux': + import xlib + xlib_listener = xlib.Listener(mapping, q) + xlib_listener.start() + component.provideUtility(xlib_listener, interfaces.desktopEvent.IDesktop) + +# +# How to connect to the peripherical +# + +if config.has_section("connection.serial"): + + from serial_conn import SerialConnection + endpoint = component.queryAdapter(SerialConnection(config["connection.serial"]), endpoint.IEndpoint) + endpoint.queue = q + endpoint.connect() + component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) + +elif config.has_section("connection.socket"): + + from socket_conn import SocketConnection + endpoint = component.queryAdapter(SocketConnection(config["connection.socket"]), endpoint.IEndpoint) + endpoint.queue = q + endpoint.connect() + component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) + +if config.has_section("socket.serve"): + + import socketserver + server = socketserver.Handler(config["socket.serve"], q) + +else: + + server = None + + +class Icon(object): + + def __init__(self, image): + menu=( + item('Quit', self.quit_window), + item('Show', self.show_window, default=True), + item('Reset',self.reset), + ) + self.icon=pystray.Icon("name", image, "Macropad companion", menu) + + self.stop = threading.Event() + self.show_hide = threading.Event() + + # Start the icon into a new thread in order to keep the main loop control + icon_thread = threading.Thread(target=self.icon.run) + self.icon_thread = icon_thread + + def start(self): + self.icon_thread.start() + + def quit(self): + self.icon.stop() + + def quit_window(self): + self.stop.set() + + def show_window(self): + self.show_hide.set() + + def reset(self): + # Create a copy of the dictonnary before updating the keys + tmp_mapping = dict(mapping) + for key in tmp_mapping.keys() : + if key not in init_mapping.keys(): + del mapping[key] + +import json +class Application(object): + + def __init__(self): + self.window = tk.Tk() + self.text = Text(self.window, height=8) + ## State of the #pplication + self.running = True + self.visible = False + self.focused_window = None + self.last_layout = None + + component.provideHandler(self.log) + + # Window property + self.window.withdraw() + self.window.title("Macropad companion") + icon = path.join(script_path, "favicon.ico") + try: + self.window.iconbitmap(icon) + except: + pass + self.text.pack() + + # When closing, return back to the iconified mode + self.window.protocol("WM_DELETE_WINDOW", self.hide) + # Start the application in iconified mode + image=Image.open(icon) + self.icon = Icon(image) + self.icon.start() + component.handle(Debug("Started")) + + def hide(self): + self.icon.show_hide.clear() + self.visible = False + self.window.withdraw() + self.update() + + + def update(self): + if self.icon.stop.is_set(): + self.icon.quit() + self.running = False + self.window.destroy() + component.queryUtility(interfaces.desktopEvent.IDesktop).stop() + return + + if self.icon.show_hide.is_set(): + if not self.visible: + self.window.deiconify() + else: + self.window.withdraw() + self.icon.show_hide.clear() + self.visible = not self.visible + + @component.adapter(IMessage) + def log(self, message : str): + print(message.content) + try: + f = open("/tmp/macropad.log", "a") + f.write(message.content) + f.write("\n") + f.close() + self.text.insert("1.0", "\n") + self.text.insert("1.0", message.content) + self.text.delete("200.0", "end") + except Exception as e: + print(e) + + def send(self, data: str): + if data == self.last_layout: + return + self.last_layout = data + + if data == "Windows": + j = [ {"layout": data} ] + else: + # Add the Windows layout in the bottom layer + j = [ {"layout": "Windows"}, {"stack": data} ] + + component.queryUtility(interfaces.endpoint.IEndpoint).send(j) + + def associate(self, layout: str, name: str): + mapping[name] = layout + component.handle(Debug("Associating %s with %s" % (name, layout))) + + def exec(self): + try: + self.update() + if server is not None: server.update() + conn = component.queryUtility(interfaces.endpoint.IEndpoint) + if not conn.isConnected(): + component.handle(Debug("Reconnecting…")) + + conn.state = conn.STATE_CONNECTING + self.window.after(5000, conn.connect) + else: + conn.fetch() + except Exception as e: + component.handle(Debug( str(e) )) + print(e) + # Got any error, stop the application properly + self.icon.stop.set() + + if app.running: + self.window.after(200, self.exec) + + while not q.empty(): + last_layout, app_ = q.get(False) + self.send(last_layout) + if app_ is not None: self.associate(last_layout, app_) + +if __name__ == '__main__': + app = Application() + + # Initialize the main loop + app.exec() + try: + app.window.mainloop() + except: + app.running = False + app.window.destroy() + app.icon.quit() + component.queryUtility(interfaces.desktopEvent.IDesktop).stop() diff --git a/readme.rst b/readme.rst new file mode 100755 index 0000000..b8ffca2 --- /dev/null +++ b/readme.rst @@ -0,0 +1,110 @@ +
+Requirements
+============
+
+python3 -m pip install pystray pyserial zope.component
+
+for debian you may need to install :
+
+sudo apt install python3-pil.imagetk
+
+Configuration
+=============
+
+The configuration lies in an ini file:
+
+Serial connection
+-----------------
+
+.. code:: ini
+
+ [connexion.serial]
+ port = \\.\COM12
+
+
+Initialize a connection directly with the macropad
+
+Socket connection
+-----------------
+
+.. code:: ini
+
+ [connexion.socket]
+ port = 9999
+ host = localhost
+
+Use a proxy to connect with the keyboard (as client)
+
+Socket server
+-------------
+
+.. code:: ini
+
+ [socket.serve]
+ port = 9999
+ host = localhost
+ name = The name of the application to associate with
+
+Use a proxy to connect with the keyboard (as server)
+
+The mapping
+-----------
+
+.. code:: ini
+
+ [mapping]
+ Mozilla Firefox = Firefox
+ Teams = Teams
+ irssi = Irssi
+ …
+
+Mapping list
+============
+
+When a new window is selected, the application will look in the table for a
+match, and send the corresponding layer to the endpoint.
+
+.. note::
+
+ When using XLib, the application will match the name with both the CLASS or
+ the NAME of the selected window
+
+If the application receive a notification for a new layer, it will also
+register it, and store it for the session. This allow to make the application
+"learn" about the layer you want to use.
+
+Serial connection
+=================
+
+Sending message
+---------------
+
+The application send a json string to the endpoint (network or serial connection):
+
+.. code:: json
+
+ {"layout": "Firefox"}
+
+This application does not handle the code in the keyboard. CircuitPython
+provide a native library in order `to read or store json`_, and firmware build
+upon it (like KMK) are easer to use with.
+
+.. _`to read or store json`: https://docs.circuitpython.org/en/latest/docs/library/json.html
+
+Reading message
+---------------
+
+The endpoint can also send a message to the application. For now, the message
+is a raw string with the name of the layer.
+
+When the application receive such message, it will look for the active window
+in the desktop, and register the application with this layer. If this
+application is selected again, the application will ask the endpoint to switch
+to the same layer again.
+
+Network connection
+==================
+
+You can relay the events to another one instance using the network. I'm using
+this when I'm connected over VNC in order to use the keyboard as if it was
+plugged directly in the host.
diff --git a/serial_conn.py b/serial_conn.py new file mode 100755 index 0000000..a08543a --- /dev/null +++ b/serial_conn.py @@ -0,0 +1,46 @@ +from zope import interface
+from interfaces.message import ISocketMessage
+
+@interface.implementer(ISocketMessage)
+class Layout(object):
+
+ def __init__(self, message):
+ self.content = message
+
+from zope import component
+from interfaces.endpoint import IConnection
+import serial
+from queue import Queue
+
+@interface.implementer(IConnection)
+class SerialConnection(object):
+ """ Define a connected element (serial, network…)
+ """
+
+ def __init__(self, configuration):
+ self.serial_port = configuration["port"]
+
+ def connect(self) -> None:
+ """ Connect """
+ self.s = serial.Serial(port=self.serial_port, timeout=0)
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+
+
+ in_waiting = self.s.in_waiting != 0
+ if in_waiting == 0:
+ # If we do not have any entry from the macropad, just return
+ return None
+
+ line = self.s.readline()
+ layout = str(line, "utf-8").strip()
+ component.handle(Layout(layout))
+ return line
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+ self.s.write(content)
diff --git a/socket_conn.py b/socket_conn.py new file mode 100755 index 0000000..0ac7230 --- /dev/null +++ b/socket_conn.py @@ -0,0 +1,46 @@ +#
+# Describe a connection over a socket.
+#
+# This class is intended to be apdapted into IEndpoint interface
+
+import socket
+from zope import interface
+import errno
+
+from interfaces import endpoint
+
+from zope import component
+from interfaces.message import Debug
+
+@interface.implementer(endpoint.IConnection)
+class SocketConnection(object):
+ """ Define a connected element (serial, network…)
+ """
+
+ def __init__(self, configuration):
+ self.port = int(configuration["port"])
+ self.host = configuration["host"]
+
+ def connect(self) -> None:
+ """ Connect """
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.connect((self.host, self.port))
+ self.s.settimeout(0.0)
+ component.handle(Debug("Connected to the socket"))
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+ try:
+ return self.s.recv(1024)
+ except socket.error as e:
+ err = e.args[0]
+ if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
+ return None
+ raise e
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+ self.s.sendall(content)
diff --git a/socketserver.py b/socketserver.py new file mode 100755 index 0000000..fb71d6d --- /dev/null +++ b/socketserver.py @@ -0,0 +1,138 @@ +""" The module manage the socket server which allow to send new commands to
+ apply to the macropad.
+
+ Any client connected to the server will also be informed from the manual
+ changes in the macropad by the user.
+
+ The module is not blocking, and run in the main thread.
+"""
+
+import socket
+import selectors
+
+from dataclasses import dataclass
+from typing import Callable
+
+from zope import component
+from interfaces import desktopEvent
+
+from interfaces.message import Debug, ISocketMessage
+
+@dataclass
+class waitEvent:
+ callback = Callable[[object, socket, int], None]
+
+@dataclass
+class sendEvent:
+ callback = Callable[[object, socket, int], None]
+ message: str
+
+actions = {
+ "stack" : lambda x : x,
+ "layout" : lambda x : x,
+ }
+
+class Handler(object):
+ """ Listen the incomming connexions and dispatch them into the connexion
+ object """
+
+ def __init__(self, configuration, layout_queue):
+ """ Initialize the the socket server
+
+ The layout_queue will be populated with all the elements received from the socket.
+ """
+ super().__init__()
+ self.sel = selectors.DefaultSelector()
+ component.provideHandler(self.sendMessage)
+
+ self.socket = socket.socket()
+ self.socket.bind((configuration["host"], int(configuration["port"])))
+ self.socket.listen(100)
+ self.socket.setblocking(False)
+ c = waitEvent()
+ c.callback = self.accept
+ self.sel.register(self.socket, selectors.EVENT_READ, c)
+ self.application_name = configuration["name"].lower()
+
+ self.connexions = []
+ self.layout_queue = layout_queue
+
+ @component.adapter(ISocketMessage)
+ def sendMessage(self, content:ISocketMessage) -> None:
+ """ Send a message to all the sockets connected to the server
+ """
+ c = sendEvent(message = content.content)
+ c.callback = self._send
+ map = self.sel.get_map()
+ for value in list(map.values())[:]:
+ if value.fileobj == self.socket:
+ # Ignore the main connexion, keep it only in accept mode.
+ continue
+ self.sel.modify(value.fileobj, selectors.EVENT_WRITE, c)
+
+ def update(self):
+ """ Read the status for all the sockets, if they are available.
+ """
+ events = self.sel.select(timeout=0)
+ for key, mask in events:
+ c = key.data
+
+ if isinstance(c, waitEvent):
+ c.callback(key.fileobj, mask)
+ elif isinstance(c, sendEvent):
+ c.callback(key.fileobj, mask, c.message)
+
+ def _switch_read(self, conn):
+ """ Send the socket back into read mode
+ """
+ c = waitEvent()
+ c.callback = self._read
+ try:
+ self.sel.register(conn, selectors.EVENT_READ, c)
+ except KeyError:
+ self.sel.modify(conn, selectors.EVENT_READ, c)
+
+ def accept(self, sock, mask):
+ conn, addr = sock.accept() # Should be ready
+ conn.setblocking(False)
+ self._switch_read(conn)
+ component.handle(Debug("Received new connection"))
+
+ def _read(self:object, conn:socket, mask:int):
+ """ Internal method used to retreive data from the socket
+ """
+ data = conn.recv(1024).strip()
+ if data == bytes("", "ascii"):
+ # A socket ready but sending garbage is a dead socket.
+ self.close(conn)
+ return
+ last_layout = str(data, "utf-8")
+ try:
+ js = json.loads(last_layout)[-1]
+ for key, value in js.items():
+ last_layout = actions[key](value)
+ except:
+ print(last_layout)
+
+ #title = component.queryUtility(desktopEvent.IDesktop).getForegroundWindowTitle()
+ # The application name is hardcoded in the code, and should be reported
+ # in the configuration or somewhere else.
+ # The name of the current application is not reliable because the
+ # message can be send with some lattency.
+ component.handle(Debug("Received %s from the socket" % (last_layout)))
+
+ # Associate the layout with the current window
+ self.layout_queue.put((last_layout, self.application_name))
+
+ def _send(self:object, conn:socket, mask:int, text) -> None:
+ """ Internal method used to dispatch the message to the socket. """
+ try:
+ conn.sendall(bytes(text , "utf-8"))
+ except:
+ self.close(conn)
+ return
+ self._switch_read(conn)
+
+ def close(self, conn):
+ self.sel.unregister(conn)
+ conn.close()
diff --git a/win32.py b/win32.py new file mode 100755 index 0000000..46d1efe --- /dev/null +++ b/win32.py @@ -0,0 +1,104 @@ +# Required for the window title name
+from ctypes import wintypes, windll, create_unicode_buffer, WINFUNCTYPE
+from typing import Self, Optional, Dict
+
+from zope import interface
+from interfaces import desktopEvent
+
+# This code tries to hook the Focus changes events with the callback method.
+
+# The types of events we want to listen for, and the names we'll use for
+# them in the log output. Pick from
+# http://msdn.microsoft.com/en-us/library/windows/desktop/dd318066(v=vs.85).aspx
+EVENT_OBJECT_FOCUS = 0x8005
+EVENT_OBJECT_NAMECHANGE = 0x800C
+WINEVENT_OUTOFCONTEXT = 0x0000
+
+eventTypes = {
+ #win32con.EVENT_SYSTEM_FOREGROUND: "Foreground",
+ EVENT_OBJECT_FOCUS: "Focus",
+ EVENT_OBJECT_NAMECHANGE: "NameChange",
+ #win32con.EVENT_OBJECT_SHOW: "Show",
+ #win32con.EVENT_SYSTEM_DIALOGSTART: "Dialog",
+ #win32con.EVENT_SYSTEM_CAPTURESTART: "Capture",
+ #win32con.EVENT_SYSTEM_MINIMIZEEND: "UnMinimize"
+}
+
+WinEventProcType = WINFUNCTYPE(
+ None,
+ wintypes.HANDLE,
+ wintypes.DWORD,
+ wintypes.HWND,
+ wintypes.LONG,
+ wintypes.LONG,
+ wintypes.DWORD,
+ wintypes.DWORD
+)
+
+def setHook(WinEventProc, eventType):
+ """ Register the hook foo being notified when the window change
+ """
+ return windll.user32.SetWinEventHook(
+ eventType,
+ eventType,
+ 0,
+ WinEventProc,
+ 0,
+ 0,
+ WINEVENT_OUTOFCONTEXT
+ )
+
+@interface.implementer(desktopEvent.IDesktop)
+class Listener(object):
+
+ def __init__(self: Self, mapping: Dict[str, str], queue):
+ self.WinEventProc = WinEventProcType(self.callback)
+ self.mapping = mapping
+ self.queue = queue
+
+ def getForegroundWindowTitle(self: Self) -> Optional[str]:
+ """ Get the window title name.
+ Example found from https://stackoverflow.com/a/58355052
+ See the function in the winuser librarry :
+ https://learn.microsoft.com/en-us/windows/win32/api/winuser/
+ """
+ hWnd = windll.user32.GetForegroundWindow()
+ length = windll.user32.GetWindowTextLengthW(hWnd)
+ buf = create_unicode_buffer(length + 1)
+ windll.user32.GetWindowTextW(hWnd, buf, length + 1)
+
+ # 1-liner alternative: return buf.value if buf.value else None
+ if buf.value:
+ return buf.value
+ else:
+ return None
+
+ def callback(self: Self, hWinEventHook, event, hwnd, idObject, idChild, dwEventThread,
+ dwmsEventTime) -> None:
+
+ if hwnd != windll.user32.GetForegroundWindow():
+ # Only check the active window, the events received from other
+ # windows are ignored.
+ return
+
+ length = windll.user32.GetWindowTextLengthW(hwnd)
+ title = create_unicode_buffer(length + 1)
+ windll.user32.GetWindowTextW(hwnd, title, length + 1)
+ if title.value is None:
+ return
+ title = str(title.value).lower()
+ for pattern, code in self.mapping.items():
+ if pattern in title:
+ self.queue.put ( (code, None) )
+ return
+ self.queue.put ( ("Windows", None) )
+
+ def start(self: Self) -> None:
+ self.hookIDs = [setHook(self.WinEventProc, et) for et in eventTypes.keys()]
+ if not any(self.hookIDs):
+ print('SetWinEventHook failed')
+ sys.exit(1)
+
+ def stop(self: Self) -> None:
+ for hook in self.hookIDs:
+ windll.user32.UnhookWinEvent(hook)
@@ -0,0 +1,62 @@ +#!/usr/bin/python3
+import Xlib
+import Xlib.display
+
+from zope import interface
+from interfaces import desktopEvent
+from threading import Thread
+
+from interfaces.message import IMessage, Debug
+from zope import component
+
+@interface.implementer(desktopEvent.IDesktop)
+class Listener(Thread):
+
+ def __init__(self, mapping, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.mapping = mapping
+ self.active_window = None
+ self.last_code = None
+ self.running = False
+
+ def getForegroundWindowTitle(self):
+ """ Return the name of the selected window
+ """
+ return self.active_window
+
+ def run(self):
+ disp = Xlib.display.Display()
+ NET_WM_NAME = disp.intern_atom('_NET_WM_NAME')
+ WM_CLASS = disp.intern_atom('WM_CLASS')
+ NET_ACTIVE_WINDOW = disp.intern_atom('_NET_ACTIVE_WINDOW')
+ root = disp.screen().root
+ root.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
+
+ component.handle(Debug("Waiting for xlib event"))
+ self.running = True
+ while self.running:
+ try:
+ window_id = root.get_full_property(NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType).value[0]
+ window = disp.create_resource_object('window', window_id)
+ window.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
+ window_name = str(window.get_full_property(NET_WM_NAME, 0).value).lower()
+ class_name = str(window.get_full_property(WM_CLASS, 0).value).lower()
+ except Xlib.error.XError:
+ window_name = None
+ class_name = None
+
+ if window_name is not None and window_name != self.active_window:
+
+ self.active_window = window_name
+ for pattern, code in self.mapping.items():
+ if (pattern in window_name or pattern in class_name) and code != self.last_code:
+
+ component.handle(Debug("Switching to '%s' for '%s'" % (code, window_name)))
+ self.queue.put ( (code, None) )
+ self.last_code = code
+ break
+ event = disp.next_event()
+
+ def stop(self):
+ self.running = False
|