From 4bf1c1ffe795a1d28a543a20cc3771a30a090b89 Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Tue, 6 May 2025 19:33:30 +0200 Subject: Added a dedicated thread for handling events sent to the endpoint --- client.py | 3 +- consumer.py | 143 ++++++++++++++++++++++++++++++++++++++++++++ interfaces/configuration.py | 14 +++++ interfaces/endpoint.py | 5 +- macropad.pyw | 64 ++++---------------- socketserver.py | 11 ++-- win32.py | 8 +-- xlib.py | 9 +-- 8 files changed, 186 insertions(+), 71 deletions(-) create mode 100644 consumer.py create mode 100644 interfaces/configuration.py diff --git a/client.py b/client.py index 9b59617..20dddce 100755 --- a/client.py +++ b/client.py @@ -40,7 +40,6 @@ s = component.queryAdapter(conn, endpoint.IEndpoint) if args.layer is not None: print(args.layer) - s.queue = Queue() s.connect() with open(args.layer, "r") as json_file: json_data = json_file.read() @@ -57,7 +56,7 @@ class Conn(Thread): Thread.__init__(self) self.queue = queue self.in_queue = Queue() - s.queue = self.in_queue + #s.queue = self.in_queue s.connect() def run(self): diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..9e36dd0 --- /dev/null +++ b/consumer.py @@ -0,0 +1,143 @@ +from typing import Dict + +from zope import component, interface + +from interfaces.message import ISocketMessage, Debug + +@interface.implementer(ISocketMessage) +class Mapping(object): + """ Send an event requesting a layer change to the endpoint. + """ + + def __init__(self, message): + self.content = {"layer": message} + +import abc +from threading import Thread +from queue import Queue, Empty, Full + +class EventConsummer(Thread, metaclass=abc.ABCMeta): + """ Thread processing messages. This class does nothing and is intended to + be inherited. + + The method `process` need to be overriden. + """ + + def __init__(self, timeout = None): + super().__init__() + self._queue = Queue() + self.timeout = timeout + self.daemon = True + + def run(self): + """ Read and process the messages from the queue, and call the + handler. + """ + + while True: + # Block the thread for at most timeout seconds + try: + message = self._queue.get(timeout=self.timeout) + self._process(message) + self._queue.task_done() + except Empty: + self._process(None) + + @abc.abstractmethod + def _process(self, message) -> None: + raise NotImplementedError() + + def handle(self, message) -> None: + """ Register a new event in the queue. + This method is intended to be public. + """ + + # Try to add the message in the queue, if the queue is full, discard + # the firsts elements and try again. + try: + self._queue.put(message) + except Full: + while self._queue.full(): + _ = self._queue.get() + # Discard another one more, just for sure. + try: + self._queue.put(message) + except Full: + component.handle(Debug("Ignoring event: queue is full")) + +import time +from interfaces import endpoint, configuration +from interfaces.configuration import IConfiguration + +class SocketMessageConsumer(EventConsummer): + """ Provide a handler consuming events and sending them to the endpoint. + It’s not an adapter as it’s not transforming the endpoint into + something else; instead, it requires an existing endpoint to + be declared in the registry. + + The class register a handler_ for SocketEvents. They are queued and + processed when as the endpoint is connected. Events can be send as + usual using the component registry: + + component.handle(Mapping(…)) + + .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers + """ + + def __init__(self): + super().__init__(timeout = 2) + self._endpoint = component.getUtility(endpoint.IEndpoint) + self._last_mapping = None + component.provideHandler(self.handle, [ISocketMessage]) + + # Register the functions to associate for each kind of event. + self.function_table = { + "layer": self._process_layer, + } + + def _process(self, message) -> None: + + # We are here either because we have a message to process, or because + # we didn’t got any message before the timeout. We check if we are + # still connected to the endpoint. + while not self._endpoint.isConnected(): + # Loop until we get the connection back. During this time, the + # message will stack in the queue. + component.handle(Debug("Reconnecting …")) + self._endpoint.state = self._endpoint.STATE_CONNECTING + self._endpoint.connect() + time.sleep(2) + # Also check if we have something to read from the server. + self._endpoint.fetch() + + # The endpoint is connected and we have no message the process. + if message is None: + return + + # Dispatch the event to the appropriate function. + for key, value in message.content.items(): + self.function_table[key](value) + + def _process_layer(self, message) -> None: + + mapping, name = message + if mapping == self._last_mapping: + return + self._last_mapping = mapping + + configuration = component.getUtility(IConfiguration) + if isinstance(mapping, dict): + self._endpoint.send(mapping) + elif isinstance(data, str): + # TODO Query the json from the configuration + layer = configuration.get(mapping, None) + if layer is not None: + self._endpoint.send(layer) + + if name is not None: + # We received a name to associate the configuration with. + # Register the element in the configuration + # TODO use an event + configuration[name] = mapping + for key in mapping.keys(): + component.handle(Debug("Associating %s with %s" % (name, key))) diff --git a/interfaces/configuration.py b/interfaces/configuration.py new file mode 100644 index 0000000..9449f41 --- /dev/null +++ b/interfaces/configuration.py @@ -0,0 +1,14 @@ +from zope import interface +from zope.interface import Attribute + +from typing import Dict + +class IConfiguration(interface.Interface): + + def get(self, key : str, default) -> Dict: + """ Load the mapping for a given key + """ + + def __setitem__(self, key : str, value : Dict) -> None: + """ Set a value + """ diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py index 2d77b66..4da7c57 100755 --- a/interfaces/endpoint.py +++ b/interfaces/endpoint.py @@ -47,6 +47,7 @@ from zope import component import json from interfaces.message import IMessage, Debug +from consumer import Mapping @component.adapter(IConnection) @interface.implementer(IEndpoint) @@ -74,7 +75,7 @@ class EndPoint(object): # This is the first connection # Otherwise the state should be STATE_CONNECTING # Initialize with the default layer - self.queue.put ( ("default", None) ) + component.handle(Mapping(("default", None))) except Exception as e: print(e) @@ -103,7 +104,7 @@ class EndPoint(object): title = desktop.getForegroundWindowTitle() else: title = None - self.queue.put((layout, title)) + component.handle(Mapping((layout, title))) def send(self, data: list[dict[str, str]]): """ Send the data to the endpoint. The data must be the representation diff --git a/macropad.pyw b/macropad.pyw index f73cd1c..c7e8770 100755 --- a/macropad.pyw +++ b/macropad.pyw @@ -14,7 +14,7 @@ import sys from zope import component import interfaces -from interfaces import endpoint +from interfaces import endpoint, configuration from interfaces.message import IMessage, Debug import configparser @@ -36,6 +36,7 @@ from queue import Queue q = Queue() component.provideAdapter(interfaces.endpoint.EndPoint) +component.provideUtility(mapping, interfaces.configuration.IConfiguration) # # Guess the platform and the load the corresponding event listener @@ -46,28 +47,26 @@ component.provideAdapter(interfaces.endpoint.EndPoint) # if config.has_section("connection.serial"): - from serial_conn import SerialConnection endpoint = component.queryAdapter(SerialConnection(config["connection.serial"]), endpoint.IEndpoint) - endpoint.queue = q - endpoint.connect() - component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) elif config.has_section("connection.socket"): - from socket_conn import SocketConnection endpoint = component.queryAdapter(SocketConnection(config["connection.socket"]), endpoint.IEndpoint) - endpoint.queue = q - component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) - endpoint.connect() + +component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) +endpoint.connect() if config.has_section("socket.serve"): import socketserver - server = socketserver.Handler(config["socket.serve"], q) + server = socketserver.Handler(config["socket.serve"]) else: server = None +import consumer +handler = consumer.SocketMessageConsumer() +handler.start() class Icon(object): @@ -147,10 +146,10 @@ class Application(object): component.handle(Debug(platform)) if platform == "win32": import win32 - listener = win32.Listener(mapping, q) + listener = win32.Listener(mapping) elif platform == 'linux': import xlib - listener = xlib.Listener(mapping, q) + listener = xlib.Listener(mapping) component.handle(Debug("Starting xlib")) component.provideUtility(listener, interfaces.desktopEvent.IDesktop) listener.start() @@ -202,46 +201,10 @@ class Application(object): except Exception as e: print(e) - - def send(self, data: str): - """ Send the configuration to the device. - The configuration can be either - - a dictionnary, and will be send as is - - a string, and will be load in a file - If the content is the same, ignore the message and return. - """ - if data == self.last_layout: - return - # Merge the new layout with the previous one, ignoring all the null. - self.last_layout = data - - conn = component.queryUtility(interfaces.endpoint.IEndpoint) - if isinstance(data, dict): - conn.send(data) - elif isinstance(data, str): - layer = mapping.get(data, None) - if layer is not None: - conn.send(layer) - - def associate(self, layout: Dict, name: str): - mapping[name] = layout - for key in layout.keys(): - component.handle(Debug("Associating %s with %s" % (name, key))) - def exec(self): try: self.update() if server is not None: server.update() - conn = component.queryUtility(interfaces.endpoint.IEndpoint) - if not conn.isConnected(): - component.handle(Debug("Reconnecting…")) - - conn.state = conn.STATE_CONNECTING - self.window.after(1000, conn.connect) - else: - # Check if we have something to read from the server, and by - # the by if the server is still active. - conn.fetch() except BaseException as e: component.handle(Debug( str(e) )) print(e) @@ -251,11 +214,6 @@ class Application(object): if app.running: self.window.after(200, self.exec) - while not q.empty(): - last_layout, app_ = q.get(False) - self.send(last_layout) - if app_ is not None: self.associate(last_layout, app_) - if __name__ == '__main__': # Start the main application, Initializing the message listener before # listening desktop events diff --git a/socketserver.py b/socketserver.py index 74f0940..eb7d4fc 100644 --- a/socketserver.py +++ b/socketserver.py @@ -34,14 +34,14 @@ actions = { "layout" : lambda x : x, } +from consumer import Mapping # Event used to send the mapping change request + class Handler(object): """ Listen the incomming connexions and dispatch them into the connexion object """ - def __init__(self, configuration, layout_queue): + def __init__(self, configuration): """ Initialize the the socket server - - The layout_queue will be populated with all the elements received from the socket. """ super().__init__() self.sel = selectors.DefaultSelector() @@ -66,7 +66,6 @@ class Handler(object): self.connexions = [] - self.layout_queue = layout_queue @component.adapter(ISocketMessage) def sendMessage(self, content:ISocketMessage) -> None: @@ -130,11 +129,11 @@ class Handler(object): # As we have a name in the configuration to associate with, use # this name as a reference for title in self.application_name: - self.layout_queue.put((js, title)) + component.handle(Mapping((js, title))) else: # Associate the layout with the current window title = component.queryUtility(desktopEvent.IDesktop).getForegroundWindowTitle() - self.layout_queue.put((js, title)) + component.handle(Mapping((js, title))) def _send(self:object, conn:socket, mask:int, text) -> None: """ Internal method used to dispatch the message to the socket. """ diff --git a/win32.py b/win32.py index 5220cde..1b45a95 100644 --- a/win32.py +++ b/win32.py @@ -9,6 +9,7 @@ import win32gui import win32con from zope import component from interfaces.message import IMessage, Debug +from consumer import Mapping # This code tries to hook the Focus changes events with the callback method. @@ -54,10 +55,9 @@ def setHook(WinEventProc, eventType): @interface.implementer(desktopEvent.IDesktop) class Listener(object): - def __init__(self, mapping: Dict[str, str], queue): + def __init__(self, mapping: Dict[str, str]): self.WinEventProc = WinEventProcType(self.callback) self.mapping = mapping - self.queue = queue def getForegroundWindowTitle(self) -> Optional[str]: """ Get the window title name. @@ -100,7 +100,7 @@ class Listener(object): if pattern == "default": continue if pattern in title: - self.queue.put ( (code, None) ) + component.handle(Mapping((code, None))) return # Get the default mapping to apply if there is any defined # This only applies when the window raising the event is the main @@ -114,7 +114,7 @@ class Listener(object): # called with the GIL held, but the GIL is released (the current # Python thread state is NULL) print("Mapping '%s' to default" % title) - self.queue.put ( ("default", None) ) + component.handle(Mapping((default, None))) def start(self) -> None: self.hookIDs = [setHook(self.WinEventProc, et) for et in eventTypes.keys()] diff --git a/xlib.py b/xlib.py index 051b21f..20476aa 100644 --- a/xlib.py +++ b/xlib.py @@ -8,12 +8,13 @@ from threading import Thread from interfaces.message import IMessage, Debug from zope import component +from consumer import Mapping + @interface.implementer(desktopEvent.IDesktop) class Listener(Thread): - def __init__(self, mapping, queue): + def __init__(self, mapping): Thread.__init__(self) - self.queue = queue self.mapping = mapping self.active_window = None self.last_code = None @@ -80,7 +81,7 @@ class Listener(Thread): if code != self.last_code: component.handle(Debug("Switching to '%s' for '%s'" % (pattern, window_name))) - self.queue.put ( (code, None) ) + component.handle(Mapping((code, None))) self.last_code = code # We found a matching configuration. Even if the match is the # same as the current one, we break the loop in order to @@ -91,7 +92,7 @@ class Listener(Thread): if default is None: return - self.queue.put ( (default, None) ) + component.handle(Mapping((default, None))) self.last_code = "default" -- cgit v1.2.3