From 4bf1c1ffe795a1d28a543a20cc3771a30a090b89 Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Tue, 6 May 2025 19:33:30 +0200 Subject: Added a dedicated thread for handling events sent to the endpoint --- consumer.py | 143 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 consumer.py (limited to 'consumer.py') diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..9e36dd0 --- /dev/null +++ b/consumer.py @@ -0,0 +1,143 @@ +from typing import Dict + +from zope import component, interface + +from interfaces.message import ISocketMessage, Debug + +@interface.implementer(ISocketMessage) +class Mapping(object): + """ Send an event requesting a layer change to the endpoint. + """ + + def __init__(self, message): + self.content = {"layer": message} + +import abc +from threading import Thread +from queue import Queue, Empty, Full + +class EventConsummer(Thread, metaclass=abc.ABCMeta): + """ Thread processing messages. This class does nothing and is intended to + be inherited. + + The method `process` need to be overriden. + """ + + def __init__(self, timeout = None): + super().__init__() + self._queue = Queue() + self.timeout = timeout + self.daemon = True + + def run(self): + """ Read and process the messages from the queue, and call the + handler. + """ + + while True: + # Block the thread for at most timeout seconds + try: + message = self._queue.get(timeout=self.timeout) + self._process(message) + self._queue.task_done() + except Empty: + self._process(None) + + @abc.abstractmethod + def _process(self, message) -> None: + raise NotImplementedError() + + def handle(self, message) -> None: + """ Register a new event in the queue. + This method is intended to be public. + """ + + # Try to add the message in the queue, if the queue is full, discard + # the firsts elements and try again. + try: + self._queue.put(message) + except Full: + while self._queue.full(): + _ = self._queue.get() + # Discard another one more, just for sure. + try: + self._queue.put(message) + except Full: + component.handle(Debug("Ignoring event: queue is full")) + +import time +from interfaces import endpoint, configuration +from interfaces.configuration import IConfiguration + +class SocketMessageConsumer(EventConsummer): + """ Provide a handler consuming events and sending them to the endpoint. + It’s not an adapter as it’s not transforming the endpoint into + something else; instead, it requires an existing endpoint to + be declared in the registry. + + The class register a handler_ for SocketEvents. They are queued and + processed when as the endpoint is connected. Events can be send as + usual using the component registry: + + component.handle(Mapping(…)) + + .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers + """ + + def __init__(self): + super().__init__(timeout = 2) + self._endpoint = component.getUtility(endpoint.IEndpoint) + self._last_mapping = None + component.provideHandler(self.handle, [ISocketMessage]) + + # Register the functions to associate for each kind of event. + self.function_table = { + "layer": self._process_layer, + } + + def _process(self, message) -> None: + + # We are here either because we have a message to process, or because + # we didn’t got any message before the timeout. We check if we are + # still connected to the endpoint. + while not self._endpoint.isConnected(): + # Loop until we get the connection back. During this time, the + # message will stack in the queue. + component.handle(Debug("Reconnecting …")) + self._endpoint.state = self._endpoint.STATE_CONNECTING + self._endpoint.connect() + time.sleep(2) + # Also check if we have something to read from the server. + self._endpoint.fetch() + + # The endpoint is connected and we have no message the process. + if message is None: + return + + # Dispatch the event to the appropriate function. + for key, value in message.content.items(): + self.function_table[key](value) + + def _process_layer(self, message) -> None: + + mapping, name = message + if mapping == self._last_mapping: + return + self._last_mapping = mapping + + configuration = component.getUtility(IConfiguration) + if isinstance(mapping, dict): + self._endpoint.send(mapping) + elif isinstance(data, str): + # TODO Query the json from the configuration + layer = configuration.get(mapping, None) + if layer is not None: + self._endpoint.send(layer) + + if name is not None: + # We received a name to associate the configuration with. + # Register the element in the configuration + # TODO use an event + configuration[name] = mapping + for key in mapping.keys(): + component.handle(Debug("Associating %s with %s" % (name, key))) -- cgit v1.2.3