diff options
author | Sébastien Dailly <sebastien@dailly.me> | 2023-07-15 14:44:41 +0200 |
---|---|---|
committer | Sébastien Dailly <sebastien@dailly.me> | 2023-07-15 14:59:43 +0200 |
commit | 02d676bda89c2fb8469ea81f7429c19c1e29df7c (patch) | |
tree | 11dae86a561a4800661e6c174b47611a704f857e /interfaces/endpoint.py |
Initial commit
Diffstat (limited to 'interfaces/endpoint.py')
-rwxr-xr-x | interfaces/endpoint.py | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py new file mode 100755 index 0000000..5aa369b --- /dev/null +++ b/interfaces/endpoint.py @@ -0,0 +1,112 @@ +from typing import Optional
+from zope import interface
+from zope.interface import Attribute
+
+
+class IEndpoint(interface.Interface):
+
+ queue = Attribute("""The queue to send the message""")
+ state = Attribute("""The connection status""")
+
+ def isConnected(self) -> bool:
+ """ Return True if the endpoint is connected. This function and
+ [connect] are supposed to work together : the main loop application
+ will call reconnect after a while when isConnected return False.
+ """
+
+ def connect(self):
+ """ Connect or reconnect to the endpoint
+ """
+
+ def fetch(self):
+ """ Check the elements from the connection
+ """
+
+ def send(self, data: list[dict[str, str]]):
+ """ Send element to the connection
+ """
+
+class IConnection(interface.Interface):
+ """ Define a connected element (serial, network…)
+ """
+
+ def connect(self) -> None:
+ """ Connect """
+
+ def read(self) -> str:
+ """ Read from the connection and return the bytes.
+ Return None if there is nothing to read (non-blocking)
+ Raise an exception if disconnected """
+
+ def write(self, content:str) -> None:
+ """ Write into the connection.
+ Raise an exception if disconnected """
+
+from interfaces import desktopEvent
+from zope import component
+import json
+
+from interfaces.message import IMessage, Debug
+
+@component.adapter(IConnection)
+@interface.implementer(IEndpoint)
+class EndPoint(object):
+
+ STATE_DISCONNECTED = 0
+ STATE_CONNECTING = 1
+ STATE_CONNECTED = 2
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.queue = None
+ self.state = self.STATE_DISCONNECTED
+
+ def isConnected(self) -> bool:
+ return self.state != self.STATE_DISCONNECTED
+
+ def connect(self):
+ try:
+ self.connection.connect()
+ self.state = self.STATE_CONNECTED
+ component.handle(Debug("Connected"))
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
+
+ def fetch(self):
+ """ Read from the peripherical and put them in the queue if any data
+ are available.
+ """
+ if self.state != self.STATE_CONNECTED:
+ return
+ try:
+ received = self.connection.read()
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
+ return
+ else:
+ if received is None or received == b'':
+ # If we do not have any entry from the macropad, just return
+ return
+
+ print("recv", received)
+ layout = str(received, "utf-8").strip()
+ desktop = component.queryUtility(desktopEvent.IDesktop)
+ if desktop is not None:
+ title = desktop.getForegroundWindowTitle()
+ else:
+ title = None
+ self.queue.put((layout, title))
+
+ def send(self, data: list[dict[str, str]]):
+ """ Send the data to the macropad
+ """
+ if self.state != self.STATE_CONNECTED:
+ return
+ try:
+ j = json.dumps( data )
+ self.connection.write(bytes(j, "utf-8"))
+ except Exception as e:
+ print(e)
+ self.state = self.STATE_DISCONNECTED
|