blob: 79256731c8ad0032291fda5e90d4e102f26ea315 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
from zope import interface
from interfaces.message import ISocketMessage
@interface.implementer(ISocketMessage)
class Layout(object):
def __init__(self, message):
self.content = message
from zope import component
from interfaces.endpoint import IConnection
import serial
from queue import Queue
@interface.implementer(IConnection)
class SerialConnection(object):
""" Define a connected element (serial, network…)
"""
def __init__(self, configuration):
self.serial_port = configuration["port"]
def connect(self) -> None:
""" Connect """
self.s = serial.Serial(port=self.serial_port, timeout=0)
def read(self) -> str:
""" Read from the connection and return the bytes.
Return None if there is nothing to read (non-blocking)
Raise an exception if disconnected """
in_waiting = self.s.in_waiting != 0
if in_waiting == 0:
# If we do not have any entry from the macropad, just return
return None
line = self.s.readline()
layout = str(line, "utf-8").strip()
component.handle(Layout(layout))
return line
def write(self, content:str) -> None:
""" Write into the connection.
Raise an exception if disconnected """
self.s.write(content)
self.s.write(bytes("\n", "utf-8"))
|