aboutsummaryrefslogtreecommitdiff
path: root/consumer.py
blob: 8c577c885a493eeb758147a43fcf6b51afb81a43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
""" This module provide a thread reading the events we need to send to the endpoint.
"""

import time
import abc
from threading import Thread
from queue import Queue, Empty, Full

from zope import component, interface

from interfaces import endpoint, configuration
from interfaces.configuration import IConfiguration
from interfaces.message import ISocketMessage, Debug

@interface.implementer(ISocketMessage)
class Mapping():
    """ Message requesting a layer change to the endpoint.
    """

    def __init__(self, message):
        self.content = {"layer": message}


class EventConsummer(Thread, metaclass=abc.ABCMeta):
    """ Thread processing messages. This class does nothing and is intended to
        be inherited.

        The method `process` need to be overriden.
        """

    def __init__(self, timeout = None):
        super().__init__()
        self._queue = Queue()
        self.timeout = timeout
        self.daemon = True

    def run(self):
        """ Read and process the messages from the queue, and call the
            handler.
        """

        while True:
            # Block the thread for at most timeout seconds
            try:
                message = self._queue.get(timeout=self.timeout)
                self._process(message)
                self._queue.task_done()
            except Empty:
                self._process(None)

    @abc.abstractmethod
    def _process(self, message) -> None:
        raise NotImplementedError()

    def handle(self, message) -> None:
        """ Register a new event in the queue.
            This method is intended to be public.
        """

        # Try to add the message in the queue, if the queue is full, discard
        # the firsts elements and try again.
        try:
            self._queue.put(message)
        except Full:
            while self._queue.full():
                _ = self._queue.get()
            # Discard another one more, just for sure.
            try:
                self._queue.put(message)
            except Full:
                component.handle(Debug("Ignoring event: queue is full"))


class SocketMessageConsumer(EventConsummer):
    """ Provide a handler consuming events and sending them to the endpoint.
        It’s not an adapter as it’s not transforming the endpoint into
        something else; instead, it requires an existing endpoint to
        be declared in the registry.

        The class register a handler_ for SocketEvents. They are queued and
        processed when as the endpoint is connected. Events can be send as
        usual using the component registry:

            component.handle(Mapping(…))

    .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers
    """

    def __init__(self):
        super().__init__(timeout = 2)
        self._endpoint = component.getUtility(endpoint.IEndpoint)
        self._last_mapping = None
        component.provideHandler(self.handle, [ISocketMessage])

        # Register the functions to associate for each kind of event.
        self.function_table = {
            "layer": self._process_layer,
        }

    def _process(self, message) -> None:

        # We are here either because we have a message to process, or because
        # we didn’t got any message before the timeout. We check if we are
        # still connected to the endpoint.
        while not self._endpoint.isConnected():
            # Loop until we get the connection back. During this time, the
            # message will stack in the queue.
            component.handle(Debug("Reconnecting …"))
            self._endpoint.state = self._endpoint.STATE_CONNECTING
            self._endpoint.connect()
            time.sleep(2)
        # Also check if we have something to read from the server.
        self._endpoint.fetch()

        # The endpoint is connected and we have no message the process.
        if message is None:
            return

        # Dispatch the event to the appropriate function.
        for key, value in message.content.items():
            self.function_table[key](value)

    def _process_layer(self, message) -> None:

        mapping, name = message
        configuration = component.getUtility(IConfiguration)
        if mapping != self._last_mapping:
            self._last_mapping = mapping

            if isinstance(mapping, dict):
                self._endpoint.send(mapping)
            elif isinstance(mapping, str):
                layer = configuration.get(mapping, None)
                if layer is not None:
                    self._endpoint.send(layer)

        # Even if the mapping is the same and does not need to be updated, we
        # may need to associate this new layer with another application.
        if name is not None:
            # We received a name to associate the configuration with.
            configuration[name] = mapping
            for key in mapping.keys():
                # We do not want to log the keycode sent to the keyboard, only
                # the name is interresting. We are supposed to have only one, but
                # it’s the easer way to log it
                component.handle(Debug(f"Associating {name} with {key}"))