diff options
Diffstat (limited to 'consumer.py')
-rw-r--r-- | consumer.py | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..9e36dd0 --- /dev/null +++ b/consumer.py @@ -0,0 +1,143 @@ +from typing import Dict
+
+from zope import component, interface
+
+from interfaces.message import ISocketMessage, Debug
+
+@interface.implementer(ISocketMessage)
+class Mapping(object):
+ """ Send an event requesting a layer change to the endpoint.
+ """
+
+ def __init__(self, message):
+ self.content = {"layer": message}
+
+import abc
+from threading import Thread
+from queue import Queue, Empty, Full
+
+class EventConsummer(Thread, metaclass=abc.ABCMeta):
+ """ Thread processing messages. This class does nothing and is intended to
+ be inherited.
+
+ The method `process` need to be overriden.
+ """
+
+ def __init__(self, timeout = None):
+ super().__init__()
+ self._queue = Queue()
+ self.timeout = timeout
+ self.daemon = True
+
+ def run(self):
+ """ Read and process the messages from the queue, and call the
+ handler.
+ """
+
+ while True:
+ # Block the thread for at most timeout seconds
+ try:
+ message = self._queue.get(timeout=self.timeout)
+ self._process(message)
+ self._queue.task_done()
+ except Empty:
+ self._process(None)
+
+ @abc.abstractmethod
+ def _process(self, message) -> None:
+ raise NotImplementedError()
+
+ def handle(self, message) -> None:
+ """ Register a new event in the queue.
+ This method is intended to be public.
+ """
+
+ # Try to add the message in the queue, if the queue is full, discard
+ # the firsts elements and try again.
+ try:
+ self._queue.put(message)
+ except Full:
+ while self._queue.full():
+ _ = self._queue.get()
+ # Discard another one more, just for sure.
+ try:
+ self._queue.put(message)
+ except Full:
+ component.handle(Debug("Ignoring event: queue is full"))
+
+import time
+from interfaces import endpoint, configuration
+from interfaces.configuration import IConfiguration
+
+class SocketMessageConsumer(EventConsummer):
+ """ Provide a handler consuming events and sending them to the endpoint.
+ It’s not an adapter as it’s not transforming the endpoint into
+ something else; instead, it requires an existing endpoint to
+ be declared in the registry.
+
+ The class register a handler_ for SocketEvents. They are queued and
+ processed when as the endpoint is connected. Events can be send as
+ usual using the component registry:
+
+ component.handle(Mapping(…))
+
+ .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers
+ """
+
+ def __init__(self):
+ super().__init__(timeout = 2)
+ self._endpoint = component.getUtility(endpoint.IEndpoint)
+ self._last_mapping = None
+ component.provideHandler(self.handle, [ISocketMessage])
+
+ # Register the functions to associate for each kind of event.
+ self.function_table = {
+ "layer": self._process_layer,
+ }
+
+ def _process(self, message) -> None:
+
+ # We are here either because we have a message to process, or because
+ # we didn’t got any message before the timeout. We check if we are
+ # still connected to the endpoint.
+ while not self._endpoint.isConnected():
+ # Loop until we get the connection back. During this time, the
+ # message will stack in the queue.
+ component.handle(Debug("Reconnecting …"))
+ self._endpoint.state = self._endpoint.STATE_CONNECTING
+ self._endpoint.connect()
+ time.sleep(2)
+ # Also check if we have something to read from the server.
+ self._endpoint.fetch()
+
+ # The endpoint is connected and we have no message the process.
+ if message is None:
+ return
+
+ # Dispatch the event to the appropriate function.
+ for key, value in message.content.items():
+ self.function_table[key](value)
+
+ def _process_layer(self, message) -> None:
+
+ mapping, name = message
+ if mapping == self._last_mapping:
+ return
+ self._last_mapping = mapping
+
+ configuration = component.getUtility(IConfiguration)
+ if isinstance(mapping, dict):
+ self._endpoint.send(mapping)
+ elif isinstance(data, str):
+ # TODO Query the json from the configuration
+ layer = configuration.get(mapping, None)
+ if layer is not None:
+ self._endpoint.send(layer)
+
+ if name is not None:
+ # We received a name to associate the configuration with.
+ # Register the element in the configuration
+ # TODO use an event
+ configuration[name] = mapping
+ for key in mapping.keys():
+ component.handle(Debug("Associating %s with %s" % (name, key)))
|