from typing import Dict from zope import component, interface from interfaces.message import ISocketMessage, Debug @interface.implementer(ISocketMessage) class Mapping(object): """ Send an event requesting a layer change to the endpoint. """ def __init__(self, message): self.content = {"layer": message} import abc from threading import Thread from queue import Queue, Empty, Full class EventConsummer(Thread, metaclass=abc.ABCMeta): """ Thread processing messages. This class does nothing and is intended to be inherited. The method `process` need to be overriden. """ def __init__(self, timeout = None): super().__init__() self._queue = Queue() self.timeout = timeout self.daemon = True def run(self): """ Read and process the messages from the queue, and call the handler. """ while True: # Block the thread for at most timeout seconds try: message = self._queue.get(timeout=self.timeout) self._process(message) self._queue.task_done() except Empty: self._process(None) @abc.abstractmethod def _process(self, message) -> None: raise NotImplementedError() def handle(self, message) -> None: """ Register a new event in the queue. This method is intended to be public. """ # Try to add the message in the queue, if the queue is full, discard # the firsts elements and try again. try: self._queue.put(message) except Full: while self._queue.full(): _ = self._queue.get() # Discard another one more, just for sure. try: self._queue.put(message) except Full: component.handle(Debug("Ignoring event: queue is full")) import time from interfaces import endpoint, configuration from interfaces.configuration import IConfiguration class SocketMessageConsumer(EventConsummer): """ Provide a handler consuming events and sending them to the endpoint. It’s not an adapter as it’s not transforming the endpoint into something else; instead, it requires an existing endpoint to be declared in the registry. The class register a handler_ for SocketEvents. They are queued and processed when as the endpoint is connected. Events can be send as usual using the component registry: component.handle(Mapping(…)) .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers """ def __init__(self): super().__init__(timeout = 2) self._endpoint = component.getUtility(endpoint.IEndpoint) self._last_mapping = None component.provideHandler(self.handle, [ISocketMessage]) # Register the functions to associate for each kind of event. self.function_table = { "layer": self._process_layer, } def _process(self, message) -> None: # We are here either because we have a message to process, or because # we didn’t got any message before the timeout. We check if we are # still connected to the endpoint. while not self._endpoint.isConnected(): # Loop until we get the connection back. During this time, the # message will stack in the queue. component.handle(Debug("Reconnecting …")) self._endpoint.state = self._endpoint.STATE_CONNECTING self._endpoint.connect() time.sleep(2) # Also check if we have something to read from the server. self._endpoint.fetch() # The endpoint is connected and we have no message the process. if message is None: return # Dispatch the event to the appropriate function. for key, value in message.content.items(): self.function_table[key](value) def _process_layer(self, message) -> None: mapping, name = message if mapping == self._last_mapping: return self._last_mapping = mapping configuration = component.getUtility(IConfiguration) if isinstance(mapping, dict): self._endpoint.send(mapping) elif isinstance(data, str): # TODO Query the json from the configuration layer = configuration.get(mapping, None) if layer is not None: self._endpoint.send(layer) if name is not None: # We received a name to associate the configuration with. # Register the element in the configuration # TODO use an event configuration[name] = mapping for key in mapping.keys(): component.handle(Debug("Associating %s with %s" % (name, key)))