aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
blob: 9e36dd0b7d8f4aa696b716ac1da4efdfe7120090 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from typing import Dict

from zope import component, interface

from interfaces.message import ISocketMessage, Debug

@interface.implementer(ISocketMessage)
class Mapping(object):
    """ Send an event requesting a layer change to the endpoint.
    """

    def __init__(self, message):
        self.content = {"layer": message}

import abc
from threading import Thread
from queue import Queue, Empty, Full

class EventConsummer(Thread, metaclass=abc.ABCMeta):
    """ Thread processing messages. This class does nothing and is intended to
        be inherited.

        The method `process` need to be overriden.
        """

    def __init__(self, timeout = None):
        super().__init__()
        self._queue = Queue()
        self.timeout = timeout
        self.daemon = True

    def run(self):
        """ Read and process the messages from the queue, and call the
            handler.
        """

        while True:
            # Block the thread for at most timeout seconds
            try:
                message = self._queue.get(timeout=self.timeout)
                self._process(message)
                self._queue.task_done()
            except Empty:
                self._process(None)

    @abc.abstractmethod
    def _process(self, message) -> None:
        raise NotImplementedError()

    def handle(self, message) -> None:
        """ Register a new event in the queue.
            This method is intended to be public.
        """

        # Try to add the message in the queue, if the queue is full, discard
        # the firsts elements and try again.
        try:
            self._queue.put(message)
        except Full:
            while self._queue.full():
                _ = self._queue.get()
            # Discard another one more, just for sure.
            try:
                self._queue.put(message)
            except Full:
                component.handle(Debug("Ignoring event: queue is full"))

import time
from interfaces import endpoint, configuration
from interfaces.configuration import IConfiguration

class SocketMessageConsumer(EventConsummer):
    """ Provide a handler consuming events and sending them to the endpoint.
        It’s not an adapter as it’s not transforming the endpoint into
        something else; instead, it requires an existing endpoint to
        be declared in the registry.

        The class register a handler_ for SocketEvents. They are queued and
        processed when as the endpoint is connected. Events can be send as
        usual using the component registry:

            component.handle(Mapping(…))

    .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers
    """

    def __init__(self):
        super().__init__(timeout = 2)
        self._endpoint = component.getUtility(endpoint.IEndpoint)
        self._last_mapping = None
        component.provideHandler(self.handle, [ISocketMessage])

        # Register the functions to associate for each kind of event.
        self.function_table = {
            "layer": self._process_layer,
        }

    def _process(self, message) -> None:

        # We are here either because we have a message to process, or because
        # we didn’t got any message before the timeout. We check if we are
        # still connected to the endpoint.
        while not self._endpoint.isConnected():
            # Loop until we get the connection back. During this time, the
            # message will stack in the queue.
            component.handle(Debug("Reconnecting …"))
            self._endpoint.state = self._endpoint.STATE_CONNECTING
            self._endpoint.connect()
            time.sleep(2)
        # Also check if we have something to read from the server.
        self._endpoint.fetch()

        # The endpoint is connected and we have no message the process.
        if message is None:
            return

        # Dispatch the event to the appropriate function.
        for key, value in message.content.items():
            self.function_table[key](value)

    def _process_layer(self, message) -> None:

        mapping, name = message
        if mapping == self._last_mapping:
            return
        self._last_mapping = mapping

        configuration = component.getUtility(IConfiguration)
        if isinstance(mapping, dict):
            self._endpoint.send(mapping)
        elif isinstance(data, str):
            # TODO Query the json from the configuration
            layer = configuration.get(mapping, None)
            if layer is not None:
                self._endpoint.send(layer)

        if name is not None:
            # We received a name to associate the configuration with.
            # Register the element in the configuration
            # TODO use an event
            configuration[name] = mapping
            for key in mapping.keys():
              component.handle(Debug("Associating %s with %s" % (name, key)))