1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
from typing import Optional
from zope import interface
from zope.interface import Attribute
class IEndpoint(interface.Interface):
queue = Attribute("""The queue to send the message""")
state = Attribute("""The connection status""")
def isConnected(self) -> bool:
""" Return True if the endpoint is connected. This function and
[connect] are supposed to work together : the main loop application
will call reconnect after a while when isConnected return False.
"""
def connect(self):
""" Connect or reconnect to the endpoint
"""
def fetch(self):
""" Check the elements from the connection
"""
def send(self, data: list[dict[str, str]]):
""" Send element to the connection
"""
class IConnection(interface.Interface):
""" Define a connected element (serial, network…)
"""
def connect(self) -> None:
""" Connect """
def read(self) -> str:
""" Read from the connection and return the bytes.
Return None if there is nothing to read (non-blocking)
Raise an exception if disconnected """
def write(self, content:str) -> None:
""" Write into the connection.
Raise an exception if disconnected """
from interfaces import desktopEvent
from zope import component
import json
from interfaces.message import IMessage, Debug
@component.adapter(IConnection)
@interface.implementer(IEndpoint)
class EndPoint(object):
STATE_DISCONNECTED = 0
STATE_CONNECTING = 1
STATE_CONNECTED = 2
def __init__(self, connection):
self.connection = connection
self.queue = None
self.state = self.STATE_DISCONNECTED
def isConnected(self) -> bool:
return self.state != self.STATE_DISCONNECTED
def connect(self):
try:
self.connection.connect()
self.state = self.STATE_CONNECTED
component.handle(Debug("Connected"))
except Exception as e:
print(e)
self.state = self.STATE_DISCONNECTED
def fetch(self):
""" Read from the peripherical and put them in the queue if any data
are available.
"""
if self.state != self.STATE_CONNECTED:
return
try:
received = self.connection.read()
except Exception as e:
print("fetch error", e)
self.state = self.STATE_DISCONNECTED
return
else:
if received is None or received == b'':
# If we do not have any entry from the macropad, just return
return
print("recv", received)
layout = str(received, "utf-8").strip()
desktop = component.queryUtility(desktopEvent.IDesktop)
if desktop is not None:
title = desktop.getForegroundWindowTitle()
else:
title = None
self.queue.put((layout, title))
def send(self, data: list[dict[str, str]]):
""" Send the data to the macropad. The data must be the representation
of a json element.
"""
if self.state != self.STATE_CONNECTED:
return
try:
j = json.JSONEncoder().encode( data ) + "\n"
self.connection.write(str.encode(j))
except Exception as e:
print("send error", e)
self.state = self.STATE_DISCONNECTED
|