aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSébastien Dailly <sebastien@dailly.me>2023-07-15 14:44:41 +0200
committerSébastien Dailly <sebastien@dailly.me>2023-07-15 14:59:43 +0200
commit02d676bda89c2fb8469ea81f7429c19c1e29df7c (patch)
tree11dae86a561a4800661e6c174b47611a704f857e
Initial commit
-rwxr-xr-xclient.py68
-rw-r--r--favicon.icobin0 -> 13094 bytes
-rwxr-xr-xinterfaces/__init__.py0
-rwxr-xr-xinterfaces/desktopEvent.py16
-rwxr-xr-xinterfaces/endpoint.py112
-rwxr-xr-xinterfaces/message.py14
-rwxr-xr-xmacropad.pyw244
-rwxr-xr-xreadme.rst110
-rwxr-xr-xserial_conn.py46
-rwxr-xr-xsocket_conn.py46
-rwxr-xr-xsocketserver.py138
-rwxr-xr-xwin32.py104
-rw-r--r--xlib.py62
13 files changed, 960 insertions, 0 deletions
diff --git a/client.py b/client.py
new file mode 100755
index 0000000..24a8429
--- /dev/null
+++ b/client.py
@@ -0,0 +1,68 @@
+import sys
+from threading import Thread
+from queue import Queue
+import time
+
+#
+# This code provide an example for connecting to the socket server.
+#
+
+from zope import component
+
+from interfaces import endpoint
+component.provideAdapter(endpoint.EndPoint)
+
+from socket_conn import SocketConnection
+conn = SocketConnection()
+
+
+class Conn(Thread):
+
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.s = component.queryAdapter(conn, endpoint.IEndpoint)
+ self.s.connect()
+ self.in_queue = Queue()
+ self.s.queue = self.in_queue
+
+ def run(self):
+
+ while True:
+ while not self.queue.empty():
+ text = self.queue.get(False)
+ if text == -1:
+ sys.exit(0)
+ elif text == "":
+ continue
+ self.s.send([{"layout": text}])
+
+ if not self.s.isConnected():
+ time.sleep(2)
+ self.s.connect()
+ continue
+
+
+ self.s.fetch()
+
+ while not self.in_queue.empty():
+ msg = self.in_queue.get(False)
+ print(msg)
+ # Print the promot again
+ print("\n> ", end=u"", flush=True)
+
+ time.sleep(0.2)
+
+queue = Queue()
+c = Conn(queue)
+c.start()
+print("Press ^D to quit")
+while True:
+ try:
+ text = input("> ")
+ queue.put(text)
+ if text == "":
+ continue
+ except:
+ queue.put(-1)
+ break
diff --git a/favicon.ico b/favicon.ico
new file mode 100644
index 0000000..39209c5
--- /dev/null
+++ b/favicon.ico
Binary files differ
diff --git a/interfaces/__init__.py b/interfaces/__init__.py
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/interfaces/__init__.py
diff --git a/interfaces/desktopEvent.py b/interfaces/desktopEvent.py
new file mode 100755
index 0000000..e8a30a4
--- /dev/null
+++ b/interfaces/desktopEvent.py
@@ -0,0 +1,16 @@
+from typing import Optional
+from zope import interface
+from zope.interface import Attribute
+
+class IDesktop(interface.Interface):
+
+ queue = Attribute("""The queue to send the message""")
+ mapping = Attribute("""Correspondance between application and layer""")
+
+ def getForegroundWindowTitle(sel) -> Optional[str]:
+ """ Return the name of the selected window
+ """
+
+ def start(self) -> None:
+ """ Start listening the events
+ """
diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py
new file mode 100755
index 0000000..5aa369b
--- /dev/null
+++ b/interfaces/endpoint.py
@@ -0,0 +1,112 @@
+from typing import Optional
+from zope import interface
+from zope.interface import Attribute
+
+
+class IEndpoint(interface.Interface):
+
+ queue = Attribute("""The queue to send the message""")
+ state = Attribute("""The connection status""")
+
+ def isConnected(self) -> bool:
+ """ Return True if the endpoint is connected. This function and
+ [connect] are supposed to work together : the main loop application
+ will call reconnect after a while when isConnected return False.
+ """
+
+ def connect(self):
+ """ Connect or reconnect to the endpoint
+ """
+
+ def fetch(self):
+ """ Check the elements from the connection
+ """
+
+ def send(self, data: list[dict[str, str]]):
+ """ Send element to the connection
+ """
+
+class IConnection(interface.Interface):
+ """ Define a connected element (serial, network…)
+ """
+
+ def connect(self) -> None:
+ """ Connect """
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+
+from interfaces import desktopEvent
+from zope import component
+import json
+
+from interfaces.message import IMessage, Debug
+
+@component.adapter(IConnection)
+@interface.implementer(IEndpoint)
+class EndPoint(object):
+
+ STATE_DISCONNECTED = 0
+ STATE_CONNECTING = 1
+ STATE_CONNECTED = 2
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.queue = None
+ self.state = self.STATE_DISCONNECTED
+
+ def isConnected(self) -> bool:
+ return self.state != self.STATE_DISCONNECTED
+
+ def connect(self):
+ try:
+ self.connection.connect()
+ self.state = self.STATE_CONNECTED
+ component.handle(Debug("Connected"))
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
+
+ def fetch(self):
+ """ Read from the peripherical and put them in the queue if any data
+ are available.
+ """
+ if self.state != self.STATE_CONNECTED:
+ return
+ try:
+ received = self.connection.read()
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
+ return
+ else:
+ if received is None or received == b'':
+ # If we do not have any entry from the macropad, just return
+ return
+
+ print("recv", received)
+ layout = str(received, "utf-8").strip()
+ desktop = component.queryUtility(desktopEvent.IDesktop)
+ if desktop is not None:
+ title = desktop.getForegroundWindowTitle()
+ else:
+ title = None
+ self.queue.put((layout, title))
+
+ def send(self, data: list[dict[str, str]]):
+ """ Send the data to the macropad
+ """
+ if self.state != self.STATE_CONNECTED:
+ return
+ try:
+ j = json.dumps( data )
+ self.connection.write(bytes(j, "utf-8"))
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
diff --git a/interfaces/message.py b/interfaces/message.py
new file mode 100755
index 0000000..23ce2ec
--- /dev/null
+++ b/interfaces/message.py
@@ -0,0 +1,14 @@
+from zope import interface
+from zope.interface import Attribute
+
+class IMessage(interface.Interface):
+ content = Attribute("""The text message to log""")
+
+class ISocketMessage(interface.Interface):
+ content = Attribute("""Content to send""")
+
+@interface.implementer(IMessage)
+class Debug(object):
+
+ def __init__(self, message):
+ self.content = message
diff --git a/macropad.pyw b/macropad.pyw
new file mode 100755
index 0000000..f35e141
--- /dev/null
+++ b/macropad.pyw
@@ -0,0 +1,244 @@
+#!/usr/bin/env python3
+
+import serial
+
+import tkinter as tk
+from tkinter import Text
+import pystray
+from pystray import MenuItem as item
+from PIL import Image, ImageTk
+
+import threading
+from os import path
+import sys
+
+from zope import component
+import interfaces
+from interfaces import endpoint
+from interfaces.message import IMessage, Debug
+
+import configparser
+
+script_path = path.dirname(path.realpath(__file__))
+config_file = path.join(script_path, "config.ini")
+
+config = configparser.ConfigParser(delimiters="=")
+config.read(config_file)
+
+
+
+init_mapping = config["mapping"]
+mapping = dict(init_mapping)
+
+from queue import Queue
+q = Queue()
+
+component.provideAdapter(interfaces.endpoint.EndPoint)
+
+#
+# Guess the platform and the load the corresponding event listener
+#
+
+from sys import platform
+if platform == "win32":
+ import win32
+ window_listener = win32.Listener(mapping, q)
+ window_listener.start()
+ component.provideUtility(window_listener, interfaces.desktopEvent.IDesktop)
+
+elif platform == 'linux':
+ import xlib
+ xlib_listener = xlib.Listener(mapping, q)
+ xlib_listener.start()
+ component.provideUtility(xlib_listener, interfaces.desktopEvent.IDesktop)
+
+#
+# How to connect to the peripherical
+#
+
+if config.has_section("connection.serial"):
+
+ from serial_conn import SerialConnection
+ endpoint = component.queryAdapter(SerialConnection(config["connection.serial"]), endpoint.IEndpoint)
+ endpoint.queue = q
+ endpoint.connect()
+ component.provideUtility(endpoint, interfaces.endpoint.IEndpoint)
+
+elif config.has_section("connection.socket"):
+
+ from socket_conn import SocketConnection
+ endpoint = component.queryAdapter(SocketConnection(config["connection.socket"]), endpoint.IEndpoint)
+ endpoint.queue = q
+ endpoint.connect()
+ component.provideUtility(endpoint, interfaces.endpoint.IEndpoint)
+
+if config.has_section("socket.serve"):
+
+ import socketserver
+ server = socketserver.Handler(config["socket.serve"], q)
+
+else:
+
+ server = None
+
+
+class Icon(object):
+
+ def __init__(self, image):
+ menu=(
+ item('Quit', self.quit_window),
+ item('Show', self.show_window, default=True),
+ item('Reset',self.reset),
+ )
+ self.icon=pystray.Icon("name", image, "Macropad companion", menu)
+
+ self.stop = threading.Event()
+ self.show_hide = threading.Event()
+
+ # Start the icon into a new thread in order to keep the main loop control
+ icon_thread = threading.Thread(target=self.icon.run)
+ self.icon_thread = icon_thread
+
+ def start(self):
+ self.icon_thread.start()
+
+ def quit(self):
+ self.icon.stop()
+
+ def quit_window(self):
+ self.stop.set()
+
+ def show_window(self):
+ self.show_hide.set()
+
+ def reset(self):
+ # Create a copy of the dictonnary before updating the keys
+ tmp_mapping = dict(mapping)
+ for key in tmp_mapping.keys() :
+ if key not in init_mapping.keys():
+ del mapping[key]
+
+import json
+class Application(object):
+
+ def __init__(self):
+ self.window = tk.Tk()
+ self.text = Text(self.window, height=8)
+ ## State of the #pplication
+ self.running = True
+ self.visible = False
+ self.focused_window = None
+ self.last_layout = None
+
+ component.provideHandler(self.log)
+
+ # Window property
+ self.window.withdraw()
+ self.window.title("Macropad companion")
+ icon = path.join(script_path, "favicon.ico")
+ try:
+ self.window.iconbitmap(icon)
+ except:
+ pass
+ self.text.pack()
+
+ # When closing, return back to the iconified mode
+ self.window.protocol("WM_DELETE_WINDOW", self.hide)
+ # Start the application in iconified mode
+ image=Image.open(icon)
+ self.icon = Icon(image)
+ self.icon.start()
+ component.handle(Debug("Started"))
+
+ def hide(self):
+ self.icon.show_hide.clear()
+ self.visible = False
+ self.window.withdraw()
+ self.update()
+
+
+ def update(self):
+ if self.icon.stop.is_set():
+ self.icon.quit()
+ self.running = False
+ self.window.destroy()
+ component.queryUtility(interfaces.desktopEvent.IDesktop).stop()
+ return
+
+ if self.icon.show_hide.is_set():
+ if not self.visible:
+ self.window.deiconify()
+ else:
+ self.window.withdraw()
+ self.icon.show_hide.clear()
+ self.visible = not self.visible
+
+ @component.adapter(IMessage)
+ def log(self, message : str):
+ print(message.content)
+ try:
+ f = open("/tmp/macropad.log", "a")
+ f.write(message.content)
+ f.write("\n")
+ f.close()
+ self.text.insert("1.0", "\n")
+ self.text.insert("1.0", message.content)
+ self.text.delete("200.0", "end")
+ except Exception as e:
+ print(e)
+
+ def send(self, data: str):
+ if data == self.last_layout:
+ return
+ self.last_layout = data
+
+ if data == "Windows":
+ j = [ {"layout": data} ]
+ else:
+ # Add the Windows layout in the bottom layer
+ j = [ {"layout": "Windows"}, {"stack": data} ]
+
+ component.queryUtility(interfaces.endpoint.IEndpoint).send(j)
+
+ def associate(self, layout: str, name: str):
+ mapping[name] = layout
+ component.handle(Debug("Associating %s with %s" % (name, layout)))
+
+ def exec(self):
+ try:
+ self.update()
+ if server is not None: server.update()
+ conn = component.queryUtility(interfaces.endpoint.IEndpoint)
+ if not conn.isConnected():
+ component.handle(Debug("Reconnecting…"))
+
+ conn.state = conn.STATE_CONNECTING
+ self.window.after(5000, conn.connect)
+ else:
+ conn.fetch()
+ except Exception as e:
+ component.handle(Debug( str(e) ))
+ print(e)
+ # Got any error, stop the application properly
+ self.icon.stop.set()
+
+ if app.running:
+ self.window.after(200, self.exec)
+
+ while not q.empty():
+ last_layout, app_ = q.get(False)
+ self.send(last_layout)
+ if app_ is not None: self.associate(last_layout, app_)
+
+if __name__ == '__main__':
+ app = Application()
+
+ # Initialize the main loop
+ app.exec()
+ try:
+ app.window.mainloop()
+ except:
+ app.running = False
+ app.window.destroy()
+ app.icon.quit()
+ component.queryUtility(interfaces.desktopEvent.IDesktop).stop()
diff --git a/readme.rst b/readme.rst
new file mode 100755
index 0000000..b8ffca2
--- /dev/null
+++ b/readme.rst
@@ -0,0 +1,110 @@
+
+Requirements
+============
+
+python3 -m pip install pystray pyserial zope.component
+
+for debian you may need to install :
+
+sudo apt install python3-pil.imagetk
+
+Configuration
+=============
+
+The configuration lies in an ini file:
+
+Serial connection
+-----------------
+
+.. code:: ini
+
+ [connexion.serial]
+ port = \\.\COM12
+
+
+Initialize a connection directly with the macropad
+
+Socket connection
+-----------------
+
+.. code:: ini
+
+ [connexion.socket]
+ port = 9999
+ host = localhost
+
+Use a proxy to connect with the keyboard (as client)
+
+Socket server
+-------------
+
+.. code:: ini
+
+ [socket.serve]
+ port = 9999
+ host = localhost
+ name = The name of the application to associate with
+
+Use a proxy to connect with the keyboard (as server)
+
+The mapping
+-----------
+
+.. code:: ini
+
+ [mapping]
+ Mozilla Firefox = Firefox
+ Teams = Teams
+ irssi = Irssi
+ …
+
+Mapping list
+============
+
+When a new window is selected, the application will look in the table for a
+match, and send the corresponding layer to the endpoint.
+
+.. note::
+
+ When using XLib, the application will match the name with both the CLASS or
+ the NAME of the selected window
+
+If the application receive a notification for a new layer, it will also
+register it, and store it for the session. This allow to make the application
+"learn" about the layer you want to use.
+
+Serial connection
+=================
+
+Sending message
+---------------
+
+The application send a json string to the endpoint (network or serial connection):
+
+.. code:: json
+
+ {"layout": "Firefox"}
+
+This application does not handle the code in the keyboard. CircuitPython
+provide a native library in order `to read or store json`_, and firmware build
+upon it (like KMK) are easer to use with.
+
+.. _`to read or store json`: https://docs.circuitpython.org/en/latest/docs/library/json.html
+
+Reading message
+---------------
+
+The endpoint can also send a message to the application. For now, the message
+is a raw string with the name of the layer.
+
+When the application receive such message, it will look for the active window
+in the desktop, and register the application with this layer. If this
+application is selected again, the application will ask the endpoint to switch
+to the same layer again.
+
+Network connection
+==================
+
+You can relay the events to another one instance using the network. I'm using
+this when I'm connected over VNC in order to use the keyboard as if it was
+plugged directly in the host.
diff --git a/serial_conn.py b/serial_conn.py
new file mode 100755
index 0000000..a08543a
--- /dev/null
+++ b/serial_conn.py
@@ -0,0 +1,46 @@
+from zope import interface
+from interfaces.message import ISocketMessage
+
+@interface.implementer(ISocketMessage)
+class Layout(object):
+
+ def __init__(self, message):
+ self.content = message
+
+from zope import component
+from interfaces.endpoint import IConnection
+import serial
+from queue import Queue
+
+@interface.implementer(IConnection)
+class SerialConnection(object):
+ """ Define a connected element (serial, network…)
+ """
+
+ def __init__(self, configuration):
+ self.serial_port = configuration["port"]
+
+ def connect(self) -> None:
+ """ Connect """
+ self.s = serial.Serial(port=self.serial_port, timeout=0)
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+
+
+ in_waiting = self.s.in_waiting != 0
+ if in_waiting == 0:
+ # If we do not have any entry from the macropad, just return
+ return None
+
+ line = self.s.readline()
+ layout = str(line, "utf-8").strip()
+ component.handle(Layout(layout))
+ return line
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+ self.s.write(content)
diff --git a/socket_conn.py b/socket_conn.py
new file mode 100755
index 0000000..0ac7230
--- /dev/null
+++ b/socket_conn.py
@@ -0,0 +1,46 @@
+#
+# Describe a connection over a socket.
+#
+# This class is intended to be apdapted into IEndpoint interface
+
+import socket
+from zope import interface
+import errno
+
+from interfaces import endpoint
+
+from zope import component
+from interfaces.message import Debug
+
+@interface.implementer(endpoint.IConnection)
+class SocketConnection(object):
+ """ Define a connected element (serial, network…)
+ """
+
+ def __init__(self, configuration):
+ self.port = int(configuration["port"])
+ self.host = configuration["host"]
+
+ def connect(self) -> None:
+ """ Connect """
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.connect((self.host, self.port))
+ self.s.settimeout(0.0)
+ component.handle(Debug("Connected to the socket"))
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+ try:
+ return self.s.recv(1024)
+ except socket.error as e:
+ err = e.args[0]
+ if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
+ return None
+ raise e
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+ self.s.sendall(content)
diff --git a/socketserver.py b/socketserver.py
new file mode 100755
index 0000000..fb71d6d
--- /dev/null
+++ b/socketserver.py
@@ -0,0 +1,138 @@
+""" The module manage the socket server which allow to send new commands to
+ apply to the macropad.
+
+ Any client connected to the server will also be informed from the manual
+ changes in the macropad by the user.
+
+ The module is not blocking, and run in the main thread.
+"""
+
+import socket
+import selectors
+
+from dataclasses import dataclass
+from typing import Callable
+
+from zope import component
+from interfaces import desktopEvent
+
+from interfaces.message import Debug, ISocketMessage
+
+@dataclass
+class waitEvent:
+ callback = Callable[[object, socket, int], None]
+
+@dataclass
+class sendEvent:
+ callback = Callable[[object, socket, int], None]
+ message: str
+
+actions = {
+ "stack" : lambda x : x,
+ "layout" : lambda x : x,
+ }
+
+class Handler(object):
+ """ Listen the incomming connexions and dispatch them into the connexion
+ object """
+
+ def __init__(self, configuration, layout_queue):
+ """ Initialize the the socket server
+
+ The layout_queue will be populated with all the elements received from the socket.
+ """
+ super().__init__()
+ self.sel = selectors.DefaultSelector()
+ component.provideHandler(self.sendMessage)
+
+ self.socket = socket.socket()
+ self.socket.bind((configuration["host"], int(configuration["port"])))
+ self.socket.listen(100)
+ self.socket.setblocking(False)
+ c = waitEvent()
+ c.callback = self.accept
+ self.sel.register(self.socket, selectors.EVENT_READ, c)
+ self.application_name = configuration["name"].lower()
+
+ self.connexions = []
+ self.layout_queue = layout_queue
+
+ @component.adapter(ISocketMessage)
+ def sendMessage(self, content:ISocketMessage) -> None:
+ """ Send a message to all the sockets connected to the server
+ """
+ c = sendEvent(message = content.content)
+ c.callback = self._send
+ map = self.sel.get_map()
+ for value in list(map.values())[:]:
+ if value.fileobj == self.socket:
+ # Ignore the main connexion, keep it only in accept mode.
+ continue
+ self.sel.modify(value.fileobj, selectors.EVENT_WRITE, c)
+
+ def update(self):
+ """ Read the status for all the sockets, if they are available.
+ """
+ events = self.sel.select(timeout=0)
+ for key, mask in events:
+ c = key.data
+
+ if isinstance(c, waitEvent):
+ c.callback(key.fileobj, mask)
+ elif isinstance(c, sendEvent):
+ c.callback(key.fileobj, mask, c.message)
+
+ def _switch_read(self, conn):
+ """ Send the socket back into read mode
+ """
+ c = waitEvent()
+ c.callback = self._read
+ try:
+ self.sel.register(conn, selectors.EVENT_READ, c)
+ except KeyError:
+ self.sel.modify(conn, selectors.EVENT_READ, c)
+
+ def accept(self, sock, mask):
+ conn, addr = sock.accept() # Should be ready
+ conn.setblocking(False)
+ self._switch_read(conn)
+ component.handle(Debug("Received new connection"))
+
+ def _read(self:object, conn:socket, mask:int):
+ """ Internal method used to retreive data from the socket
+ """
+ data = conn.recv(1024).strip()
+ if data == bytes("", "ascii"):
+ # A socket ready but sending garbage is a dead socket.
+ self.close(conn)
+ return
+ last_layout = str(data, "utf-8")
+ try:
+ js = json.loads(last_layout)[-1]
+ for key, value in js.items():
+ last_layout = actions[key](value)
+ except:
+ print(last_layout)
+
+ #title = component.queryUtility(desktopEvent.IDesktop).getForegroundWindowTitle()
+ # The application name is hardcoded in the code, and should be reported
+ # in the configuration or somewhere else.
+ # The name of the current application is not reliable because the
+ # message can be send with some lattency.
+ component.handle(Debug("Received %s from the socket" % (last_layout)))
+
+ # Associate the layout with the current window
+ self.layout_queue.put((last_layout, self.application_name))
+
+ def _send(self:object, conn:socket, mask:int, text) -> None:
+ """ Internal method used to dispatch the message to the socket. """
+ try:
+ conn.sendall(bytes(text , "utf-8"))
+ except:
+ self.close(conn)
+ return
+ self._switch_read(conn)
+
+ def close(self, conn):
+ self.sel.unregister(conn)
+ conn.close()
diff --git a/win32.py b/win32.py
new file mode 100755
index 0000000..46d1efe
--- /dev/null
+++ b/win32.py
@@ -0,0 +1,104 @@
+# Required for the window title name
+from ctypes import wintypes, windll, create_unicode_buffer, WINFUNCTYPE
+from typing import Self, Optional, Dict
+
+from zope import interface
+from interfaces import desktopEvent
+
+# This code tries to hook the Focus changes events with the callback method.
+
+# The types of events we want to listen for, and the names we'll use for
+# them in the log output. Pick from
+# http://msdn.microsoft.com/en-us/library/windows/desktop/dd318066(v=vs.85).aspx
+EVENT_OBJECT_FOCUS = 0x8005
+EVENT_OBJECT_NAMECHANGE = 0x800C
+WINEVENT_OUTOFCONTEXT = 0x0000
+
+eventTypes = {
+ #win32con.EVENT_SYSTEM_FOREGROUND: "Foreground",
+ EVENT_OBJECT_FOCUS: "Focus",
+ EVENT_OBJECT_NAMECHANGE: "NameChange",
+ #win32con.EVENT_OBJECT_SHOW: "Show",
+ #win32con.EVENT_SYSTEM_DIALOGSTART: "Dialog",
+ #win32con.EVENT_SYSTEM_CAPTURESTART: "Capture",
+ #win32con.EVENT_SYSTEM_MINIMIZEEND: "UnMinimize"
+}
+
+WinEventProcType = WINFUNCTYPE(
+ None,
+ wintypes.HANDLE,
+ wintypes.DWORD,
+ wintypes.HWND,
+ wintypes.LONG,
+ wintypes.LONG,
+ wintypes.DWORD,
+ wintypes.DWORD
+)
+
+def setHook(WinEventProc, eventType):
+ """ Register the hook foo being notified when the window change
+ """
+ return windll.user32.SetWinEventHook(
+ eventType,
+ eventType,
+ 0,
+ WinEventProc,
+ 0,
+ 0,
+ WINEVENT_OUTOFCONTEXT
+ )
+
+@interface.implementer(desktopEvent.IDesktop)
+class Listener(object):
+
+ def __init__(self: Self, mapping: Dict[str, str], queue):
+ self.WinEventProc = WinEventProcType(self.callback)
+ self.mapping = mapping
+ self.queue = queue
+
+ def getForegroundWindowTitle(self: Self) -> Optional[str]:
+ """ Get the window title name.
+ Example found from https://stackoverflow.com/a/58355052
+ See the function in the winuser librarry :
+ https://learn.microsoft.com/en-us/windows/win32/api/winuser/
+ """
+ hWnd = windll.user32.GetForegroundWindow()
+ length = windll.user32.GetWindowTextLengthW(hWnd)
+ buf = create_unicode_buffer(length + 1)
+ windll.user32.GetWindowTextW(hWnd, buf, length + 1)
+
+ # 1-liner alternative: return buf.value if buf.value else None
+ if buf.value:
+ return buf.value
+ else:
+ return None
+
+ def callback(self: Self, hWinEventHook, event, hwnd, idObject, idChild, dwEventThread,
+ dwmsEventTime) -> None:
+
+ if hwnd != windll.user32.GetForegroundWindow():
+ # Only check the active window, the events received from other
+ # windows are ignored.
+ return
+
+ length = windll.user32.GetWindowTextLengthW(hwnd)
+ title = create_unicode_buffer(length + 1)
+ windll.user32.GetWindowTextW(hwnd, title, length + 1)
+ if title.value is None:
+ return
+ title = str(title.value).lower()
+ for pattern, code in self.mapping.items():
+ if pattern in title:
+ self.queue.put ( (code, None) )
+ return
+ self.queue.put ( ("Windows", None) )
+
+ def start(self: Self) -> None:
+ self.hookIDs = [setHook(self.WinEventProc, et) for et in eventTypes.keys()]
+ if not any(self.hookIDs):
+ print('SetWinEventHook failed')
+ sys.exit(1)
+
+ def stop(self: Self) -> None:
+ for hook in self.hookIDs:
+ windll.user32.UnhookWinEvent(hook)
diff --git a/xlib.py b/xlib.py
new file mode 100644
index 0000000..b86a8b1
--- /dev/null
+++ b/xlib.py
@@ -0,0 +1,62 @@
+#!/usr/bin/python3
+import Xlib
+import Xlib.display
+
+from zope import interface
+from interfaces import desktopEvent
+from threading import Thread
+
+from interfaces.message import IMessage, Debug
+from zope import component
+
+@interface.implementer(desktopEvent.IDesktop)
+class Listener(Thread):
+
+ def __init__(self, mapping, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.mapping = mapping
+ self.active_window = None
+ self.last_code = None
+ self.running = False
+
+ def getForegroundWindowTitle(self):
+ """ Return the name of the selected window
+ """
+ return self.active_window
+
+ def run(self):
+ disp = Xlib.display.Display()
+ NET_WM_NAME = disp.intern_atom('_NET_WM_NAME')
+ WM_CLASS = disp.intern_atom('WM_CLASS')
+ NET_ACTIVE_WINDOW = disp.intern_atom('_NET_ACTIVE_WINDOW')
+ root = disp.screen().root
+ root.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
+
+ component.handle(Debug("Waiting for xlib event"))
+ self.running = True
+ while self.running:
+ try:
+ window_id = root.get_full_property(NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType).value[0]
+ window = disp.create_resource_object('window', window_id)
+ window.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
+ window_name = str(window.get_full_property(NET_WM_NAME, 0).value).lower()
+ class_name = str(window.get_full_property(WM_CLASS, 0).value).lower()
+ except Xlib.error.XError:
+ window_name = None
+ class_name = None
+
+ if window_name is not None and window_name != self.active_window:
+
+ self.active_window = window_name
+ for pattern, code in self.mapping.items():
+ if (pattern in window_name or pattern in class_name) and code != self.last_code:
+
+ component.handle(Debug("Switching to '%s' for '%s'" % (code, window_name)))
+ self.queue.put ( (code, None) )
+ self.last_code = code
+ break
+ event = disp.next_event()
+
+ def stop(self):
+ self.running = False