import time import abc from threading import Thread from queue import Queue, Empty, Full from zope import component, interface from interfaces import endpoint, configuration from interfaces.configuration import IConfiguration from interfaces.message import ISocketMessage, Debug @interface.implementer(ISocketMessage) class Mapping(): """ Send an event requesting a layer change to the endpoint. """ def __init__(self, message): self.content = {"layer": message} class EventConsummer(Thread, metaclass=abc.ABCMeta): """ Thread processing messages. This class does nothing and is intended to be inherited. The method `process` need to be overriden. """ def __init__(self, timeout = None): super().__init__() self._queue = Queue() self.timeout = timeout self.daemon = True def run(self): """ Read and process the messages from the queue, and call the handler. """ while True: # Block the thread for at most timeout seconds try: message = self._queue.get(timeout=self.timeout) self._process(message) self._queue.task_done() except Empty: self._process(None) @abc.abstractmethod def _process(self, message) -> None: raise NotImplementedError() def handle(self, message) -> None: """ Register a new event in the queue. This method is intended to be public. """ # Try to add the message in the queue, if the queue is full, discard # the firsts elements and try again. try: self._queue.put(message) except Full: while self._queue.full(): _ = self._queue.get() # Discard another one more, just for sure. try: self._queue.put(message) except Full: component.handle(Debug("Ignoring event: queue is full")) class SocketMessageConsumer(EventConsummer): """ Provide a handler consuming events and sending them to the endpoint. It’s not an adapter as it’s not transforming the endpoint into something else; instead, it requires an existing endpoint to be declared in the registry. The class register a handler_ for SocketEvents. They are queued and processed when as the endpoint is connected. Events can be send as usual using the component registry: component.handle(Mapping(…)) .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers """ def __init__(self): super().__init__(timeout = 2) self._endpoint = component.getUtility(endpoint.IEndpoint) self._last_mapping = None component.provideHandler(self.handle, [ISocketMessage]) # Register the functions to associate for each kind of event. self.function_table = { "layer": self._process_layer, } def _process(self, message) -> None: # We are here either because we have a message to process, or because # we didn’t got any message before the timeout. We check if we are # still connected to the endpoint. while not self._endpoint.isConnected(): # Loop until we get the connection back. During this time, the # message will stack in the queue. component.handle(Debug("Reconnecting …")) self._endpoint.state = self._endpoint.STATE_CONNECTING self._endpoint.connect() time.sleep(2) # Also check if we have something to read from the server. self._endpoint.fetch() # The endpoint is connected and we have no message the process. if message is None: return # Dispatch the event to the appropriate function. for key, value in message.content.items(): self.function_table[key](value) def _process_layer(self, message) -> None: mapping, name = message if mapping == self._last_mapping: return self._last_mapping = mapping configuration = component.getUtility(IConfiguration) if isinstance(mapping, dict): self._endpoint.send(mapping) elif isinstance(mapping, str): layer = configuration.get(mapping, None) if layer is not None: self._endpoint.send(layer) if name is not None: # We received a name to associate the configuration with. # Register the element in the configuration configuration[name] = mapping for key in mapping.keys(): # We do not want to log the keycode sent to the keyboard, only # the name is interresting. We are supposed to have only one, but # it’s the easer way to log it component.handle(Debug(f"Associating {name} with {key}"))