from zope import interface from interfaces.message import ISocketMessage @interface.implementer(ISocketMessage) class Layout(object): def __init__(self, message): self.content = message from zope import component from interfaces.endpoint import IConnection import serial from queue import Queue @interface.implementer(IConnection) class SerialConnection(object): """ Define a connected element (serial, network…) """ def __init__(self, configuration): self.serial_port = configuration["port"] def connect(self) -> None: """ Connect """ self.s = serial.Serial(port=self.serial_port, timeout=0) def read(self) -> str: """ Read from the connection and return the bytes. Return None if there is nothing to read (non-blocking) Raise an exception if disconnected """ in_waiting = self.s.in_waiting != 0 if in_waiting == 0: # If we do not have any entry from the macropad, just return return None line = self.s.readline() layout = str(line, "utf-8").strip() component.handle(Layout(layout)) return line def write(self, content:str) -> None: """ Write into the connection. Raise an exception if disconnected """ self.s.write(content) self.s.write(bytes("\n", "utf-8"))