From 02d676bda89c2fb8469ea81f7429c19c1e29df7c Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Sat, 15 Jul 2023 14:44:41 +0200 Subject: Initial commit --- interfaces/__init__.py | 0 interfaces/desktopEvent.py | 16 +++++++ interfaces/endpoint.py | 112 +++++++++++++++++++++++++++++++++++++++++++++ interfaces/message.py | 14 ++++++ 4 files changed, 142 insertions(+) create mode 100755 interfaces/__init__.py create mode 100755 interfaces/desktopEvent.py create mode 100755 interfaces/endpoint.py create mode 100755 interfaces/message.py (limited to 'interfaces') diff --git a/interfaces/__init__.py b/interfaces/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/interfaces/desktopEvent.py b/interfaces/desktopEvent.py new file mode 100755 index 0000000..e8a30a4 --- /dev/null +++ b/interfaces/desktopEvent.py @@ -0,0 +1,16 @@ +from typing import Optional +from zope import interface +from zope.interface import Attribute + +class IDesktop(interface.Interface): + + queue = Attribute("""The queue to send the message""") + mapping = Attribute("""Correspondance between application and layer""") + + def getForegroundWindowTitle(sel) -> Optional[str]: + """ Return the name of the selected window + """ + + def start(self) -> None: + """ Start listening the events + """ diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py new file mode 100755 index 0000000..5aa369b --- /dev/null +++ b/interfaces/endpoint.py @@ -0,0 +1,112 @@ +from typing import Optional +from zope import interface +from zope.interface import Attribute + + +class IEndpoint(interface.Interface): + + queue = Attribute("""The queue to send the message""") + state = Attribute("""The connection status""") + + def isConnected(self) -> bool: + """ Return True if the endpoint is connected. This function and + [connect] are supposed to work together : the main loop application + will call reconnect after a while when isConnected return False. + """ + + def connect(self): + """ Connect or reconnect to the endpoint + """ + + def fetch(self): + """ Check the elements from the connection + """ + + def send(self, data: list[dict[str, str]]): + """ Send element to the connection + """ + +class IConnection(interface.Interface): + """ Define a connected element (serial, network…) + """ + + def connect(self) -> None: + """ Connect """ + + def read(self) -> str: + """ Read from the connection and return the bytes. + Return None if there is nothing to read (non-blocking) + Raise an exception if disconnected """ + + def write(self, content:str) -> None: + """ Write into the connection. + Raise an exception if disconnected """ + +from interfaces import desktopEvent +from zope import component +import json + +from interfaces.message import IMessage, Debug + +@component.adapter(IConnection) +@interface.implementer(IEndpoint) +class EndPoint(object): + + STATE_DISCONNECTED = 0 + STATE_CONNECTING = 1 + STATE_CONNECTED = 2 + + def __init__(self, connection): + self.connection = connection + self.queue = None + self.state = self.STATE_DISCONNECTED + + def isConnected(self) -> bool: + return self.state != self.STATE_DISCONNECTED + + def connect(self): + try: + self.connection.connect() + self.state = self.STATE_CONNECTED + component.handle(Debug("Connected")) + except Exception as e: + print(e) + self.state = self.STATE_DISCONNECTED + + def fetch(self): + """ Read from the peripherical and put them in the queue if any data + are available. + """ + if self.state != self.STATE_CONNECTED: + return + try: + received = self.connection.read() + except Exception as e: + print(e) + self.state = self.STATE_DISCONNECTED + return + else: + if received is None or received == b'': + # If we do not have any entry from the macropad, just return + return + + print("recv", received) + layout = str(received, "utf-8").strip() + desktop = component.queryUtility(desktopEvent.IDesktop) + if desktop is not None: + title = desktop.getForegroundWindowTitle() + else: + title = None + self.queue.put((layout, title)) + + def send(self, data: list[dict[str, str]]): + """ Send the data to the macropad + """ + if self.state != self.STATE_CONNECTED: + return + try: + j = json.dumps( data ) + self.connection.write(bytes(j, "utf-8")) + except Exception as e: + print(e) + self.state = self.STATE_DISCONNECTED diff --git a/interfaces/message.py b/interfaces/message.py new file mode 100755 index 0000000..23ce2ec --- /dev/null +++ b/interfaces/message.py @@ -0,0 +1,14 @@ +from zope import interface +from zope.interface import Attribute + +class IMessage(interface.Interface): + content = Attribute("""The text message to log""") + +class ISocketMessage(interface.Interface): + content = Attribute("""Content to send""") + +@interface.implementer(IMessage) +class Debug(object): + + def __init__(self, message): + self.content = message -- cgit v1.2.3