diff options
-rwxr-xr-x | client.py | 4 | ||||
-rw-r--r-- | configuration.py | 47 | ||||
-rwxr-xr-x | interfaces/endpoint.py | 15 | ||||
-rwxr-xr-x | macropad.pyw | 107 | ||||
-rwxr-xr-x | readme.rst | 4 | ||||
-rwxr-xr-x | serial_conn.py | 1 | ||||
-rw-r--r--[-rwxr-xr-x] | socketserver.py | 27 | ||||
-rw-r--r--[-rwxr-xr-x] | win32.py | 22 | ||||
-rw-r--r-- | xlib.py | 108 |
9 files changed, 220 insertions, 115 deletions
@@ -37,11 +37,11 @@ elif config.has_section("connection.serial"): # Connect to the endpoint right now
component.provideAdapter(endpoint.EndPoint)
s = component.queryAdapter(conn, endpoint.IEndpoint)
-s.connect()
if args.layer is not None:
print(args.layer)
s.queue = Queue()
+ s.connect()
with open(args.layer, "r") as json_file:
json_data = json_file.read()
j = json.loads(json_data)
@@ -58,6 +58,7 @@ class Conn(Thread): self.queue = queue
self.in_queue = Queue()
s.queue = self.in_queue
+ s.connect()
def run(self):
@@ -76,7 +77,6 @@ class Conn(Thread): s.connect()
continue
-
s.fetch()
while not self.in_queue.empty():
diff --git a/configuration.py b/configuration.py new file mode 100644 index 0000000..fceef14 --- /dev/null +++ b/configuration.py @@ -0,0 +1,47 @@ +from collections import OrderedDict
+from os import path
+import json
+class Mapping():
+
+ def __init__(self, configuration):
+ self.init_mapping = configuration["mapping"]
+ self.mapping = OrderedDict()
+ tmp_mapping = dict(self.init_mapping)
+ for key in tmp_mapping.keys() :
+ json_file = self.init_mapping[key]
+ if not path.exists(json_file):
+ print("The file '%s' does not exists" % json_file)
+ continue
+ with open(json_file, "r") as file:
+ json_data = file.read()
+ j = json.loads(json_data)
+ self.mapping[key] = j
+
+ def reset(self):
+ """ Remove all the keys added and not in the configuration file.
+ """
+ # Create a copy of the dictonnary before updating the keys
+ tmp_mapping = dict(self.mapping)
+ for key in tmp_mapping.keys() :
+ if key not in self.init_mapping.keys():
+ del self.mapping[key]
+
+ def get(self, key, default):
+ return self.mapping.get(key, default)
+
+ def __getitem__(self, key):
+ return self.mapping.get(key, None)
+
+ def __setitem__(self, key, value):
+ self.mapping[key] = value
+
+ def items(self):
+ """ Implement the keys items from the dictionnary
+ """
+ return self.mapping.items()
+
+ def keys(self):
+ """ Implement the keys method from the dictionnary
+ """
+ return self.mapping.keys()
+
diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py index 15356c2..2d77b66 100755 --- a/interfaces/endpoint.py +++ b/interfaces/endpoint.py @@ -65,10 +65,17 @@ class EndPoint(object): return self.state != self.STATE_DISCONNECTED
def connect(self):
+ state = self.state
try:
self.connection.connect()
self.state = self.STATE_CONNECTED
component.handle(Debug("Connected"))
+ if state == self.STATE_DISCONNECTED:
+ # This is the first connection
+ # Otherwise the state should be STATE_CONNECTING
+ # Initialize with the default layer
+ self.queue.put ( ("default", None) )
+
except Exception as e:
print(e)
self.state = self.STATE_DISCONNECTED
@@ -90,7 +97,6 @@ class EndPoint(object): # If we do not have any entry from the macropad, just return
return
- print("recv", received)
layout = str(received, "utf-8").strip()
desktop = component.queryUtility(desktopEvent.IDesktop)
if desktop is not None:
@@ -100,13 +106,14 @@ class EndPoint(object): self.queue.put((layout, title))
def send(self, data: list[dict[str, str]]):
- """ Send the data to the macropad
+ """ Send the data to the endpoint. The data must be the representation
+ of a json element.
"""
if self.state != self.STATE_CONNECTED:
return
try:
- j = json.dumps( data )
- self.connection.write(bytes(j, "utf-8"))
+ j = json.JSONEncoder().encode( data ) + "\n"
+ self.connection.write(str.encode(j))
except Exception as e:
print("send error", e)
self.state = self.STATE_DISCONNECTED
diff --git a/macropad.pyw b/macropad.pyw index 87dfd40..f73cd1c 100755 --- a/macropad.pyw +++ b/macropad.pyw @@ -8,8 +8,8 @@ import pystray from pystray import MenuItem as item from PIL import Image, ImageTk +from typing import Dict import threading -from os import path import sys from zope import component @@ -19,6 +19,7 @@ from interfaces.message import IMessage, Debug import configparser +from os import path script_path = path.dirname(path.realpath(__file__)) config_file = path.join(script_path, "config.ini") @@ -27,8 +28,9 @@ config.read(config_file) from collections import OrderedDict -init_mapping = config["mapping"] -mapping = OrderedDict(init_mapping) +from configuration import Mapping
+mapping = Mapping(config)
+ from queue import Queue q = Queue() @@ -39,19 +41,6 @@ component.provideAdapter(interfaces.endpoint.EndPoint) # Guess the platform and the load the corresponding event listener # -from sys import platform -if platform == "win32": - import win32 - window_listener = win32.Listener(mapping, q) - component.provideUtility(window_listener, interfaces.desktopEvent.IDesktop) - window_listener.start() - -elif platform == 'linux': - import xlib - xlib_listener = xlib.Listener(mapping, q) - component.provideUtility(xlib_listener, interfaces.desktopEvent.IDesktop) - xlib_listener.start() - # # How to connect to the peripherical # @@ -80,6 +69,7 @@ else: server = None + class Icon(object): def __init__(self, image): @@ -93,12 +83,16 @@ class Icon(object): self.stop = threading.Event() self.show_hide = threading.Event() - # Start the icon into a new thread in order to keep the main loop - # control + # Start the icon into a new thread in order to keep the main loop control icon_thread = threading.Thread(target=self.icon.run) self.icon_thread = icon_thread def start(self): + """ Start the icon. + + Handler is runned in a dedicated thread to avoid blocking the + events from the main loop. + """ self.icon_thread.start() def quit(self): @@ -111,16 +105,13 @@ class Icon(object): self.show_hide.set() def reset(self): - # Create a copy of the dictonnary before updating the keys - tmp_mapping = dict(mapping) - for key in tmp_mapping.keys() : - if key not in init_mapping.keys(): - del mapping[key] + mapping.reset() -import json class Application(object): def __init__(self): + # Override the default function called when exception are reported + tk.Tk.report_callback_exception = self.report_callback_exception self.window = tk.Tk() self.text = Text(self.window, height=8) ## State of the #pplication @@ -149,15 +140,44 @@ class Application(object): self.icon.start() component.handle(Debug("Started")) + def connect_desktop(self): + """ Launch the thread listening events from the desktop + """ + from sys import platform + component.handle(Debug(platform)) + if platform == "win32": + import win32 + listener = win32.Listener(mapping, q) + elif platform == 'linux': + import xlib + listener = xlib.Listener(mapping, q) + component.handle(Debug("Starting xlib")) + component.provideUtility(listener, interfaces.desktopEvent.IDesktop) + listener.start() + + def report_callback_exception(self, exc, val, tb): + """ Handle exception reported inside the Tk application. + This method overrid the default Tk.tk.report_callback_exception + method. + """ + import traceback + traceback.print_exception(exc, value=val, tb=tb) + self.icon.stop.set() + self.icon.quit() + self.running = False + self.window.destroy() + component.queryUtility(interfaces.desktopEvent.IDesktop).stop() + return + def hide(self): self.icon.show_hide.clear() self.visible = False self.window.withdraw() self.update() - def update(self): if self.icon.stop.is_set(): + print("stopping") self.icon.quit() self.running = False self.window.destroy() @@ -176,39 +196,37 @@ class Application(object): def log(self, message : str): print(message.content) try: - f = open("/tmp/macropad.log", "a") - f.write(message.content) - f.write("\n") - f.close() self.text.insert("1.0", "\n") self.text.insert("1.0", message.content) self.text.delete("200.0", "end") except Exception as e: print(e) + def send(self, data: str): + """ Send the configuration to the device. + The configuration can be either + - a dictionnary, and will be send as is + - a string, and will be load in a file + If the content is the same, ignore the message and return. + """ if data == self.last_layout: return + # Merge the new layout with the previous one, ignoring all the null. self.last_layout = data conn = component.queryUtility(interfaces.endpoint.IEndpoint) if isinstance(data, dict): conn.send(data) - return elif isinstance(data, str): - if not path.exists(data): - print("The file '%s' does not exists" % data) - return - with open(data, "r") as json_file: - json_data = json_file.read() - j = json.loads(json_data) - content = json.dumps(j) - - conn.send(j) + layer = mapping.get(data, None) + if layer is not None: + conn.send(layer) - def associate(self, layout: str, name: str): + def associate(self, layout: Dict, name: str): mapping[name] = layout - component.handle(Debug("Associating %s with %s" % (name, layout))) + for key in layout.keys(): + component.handle(Debug("Associating %s with %s" % (name, key))) def exec(self): try: @@ -224,7 +242,7 @@ class Application(object): # Check if we have something to read from the server, and by # the by if the server is still active. conn.fetch() - except Exception as e: + except BaseException as e: component.handle(Debug( str(e) )) print(e) # Got any error, stop the application properly @@ -239,13 +257,16 @@ class Application(object): if app_ is not None: self.associate(last_layout, app_) if __name__ == '__main__': + # Start the main application, Initializing the message listener before + # listening desktop events app = Application() + app.connect_desktop() # Initialize the main loop app.exec() try: app.window.mainloop() - except: + except BaseException as e: app.running = False app.window.destroy() app.icon.quit() @@ -65,7 +65,9 @@ Socket server host = localhost
name = The name of the application to associate with
-Use a proxy to connect with the keyboard (as server)
+Use a proxy to connect with the keyboard (as server). The name will be used to
+restore the registered layout if the application is selected. You can declare
+multiple application using a `,` as a separator.
The mapping
-----------
diff --git a/serial_conn.py b/serial_conn.py index a08543a..7925673 100755 --- a/serial_conn.py +++ b/serial_conn.py @@ -44,3 +44,4 @@ class SerialConnection(object): """ Write into the connection.
Raise an exception if disconnected """
self.s.write(content)
+ self.s.write(bytes("\n", "utf-8"))
diff --git a/socketserver.py b/socketserver.py index 57000d8..74f0940 100755..100644 --- a/socketserver.py +++ b/socketserver.py @@ -10,6 +10,7 @@ import socket
import selectors
import json
+import csv
from dataclasses import dataclass
from typing import Callable
@@ -54,9 +55,15 @@ class Handler(object): c = waitEvent()
c.callback = self.accept
self.sel.register(self.socket, selectors.EVENT_READ, c)
- self.application_name = configuration.get("name", None)
- if self.application_name is not None:
- self.application_name = self.application_name.lower()
+ # Read the name to associate the connection with.
+ application_name = configuration.get("name", None)
+ if application_name is not None:
+ # Split the values using `,` as separator
+ reader = csv.reader([application_name])
+ self.application_name = [name.lower().strip() for name in next(reader)]
+ else:
+ self.application_name = None
+
self.connexions = []
self.layout_queue = layout_queue
@@ -114,7 +121,7 @@ class Handler(object): try:
js = json.loads(json_data)
for key in js.keys():
- component.handle(Debug("Received %s from the socket" % (key)))
+ component.handle(Debug("Received '%s' from the socket" % (key)))
except Exception as e:
print("Can’t read", json_data, e)
return
@@ -122,18 +129,12 @@ class Handler(object): if self.application_name is not None:
# As we have a name in the configuration to associate with, use
# this name as a reference
- title = self.application_name
+ for title in self.application_name:
+ self.layout_queue.put((js, title))
else:
# Associate the layout with the current window
title = component.queryUtility(desktopEvent.IDesktop).getForegroundWindowTitle()
- self.layout_queue.put((js, title))
-
- # The application name is hardcoded in the code, and should be reported
- # in the configuration or somewhere else.
-
- # The name of the current application is not reliable because the
- # message can be send with some lattency.
- self.layout_queue.put((js, self.application_name))
+ self.layout_queue.put((js, title))
def _send(self:object, conn:socket, mask:int, text) -> None:
""" Internal method used to dispatch the message to the socket. """
@@ -76,6 +76,7 @@ class Listener(object): else:
return None
+
def callback(self, hWinEventHook, event, hwnd, idObject, idChild, dwEventThread,
dwmsEventTime) -> None:
@@ -90,17 +91,30 @@ class Listener(object): title = str(title.value).lower()
if title == "":
return
+
+ foreground_hwnd = windll.user32.GetForegroundWindow()
+ if event != win32con.EVENT_OBJECT_FOCUS and foreground_hwnd != hwnd:
+ return
+
for pattern, code in self.mapping.items():
if pattern == "default":
continue
if pattern in title:
self.queue.put ( (code, None) )
return
- print("Matching '%s' to default" % title)
# Get the default mapping to apply if there is any defined
- default = self.mapping.get("default", None)
- if default is not None:
- self.queue.put ( (default, None) )
+ # This only applies when the window raising the event is the main
+ # window
+ if hwnd == foreground_hwnd:
+ # We are not allowed to display message into the handler, because
+ # the GIL is released in the callback. We can’t modifiy elements
+ # from other threads, as it’s create weird errors :
+ #
+ # Fatal Python error: PyEval_RestoreThread: the function must be
+ # called with the GIL held, but the GIL is released (the current
+ # Python thread state is NULL)
+ print("Mapping '%s' to default" % title)
+ self.queue.put ( ("default", None) )
def start(self) -> None:
self.hookIDs = [setHook(self.WinEventProc, et) for et in eventTypes.keys()]
@@ -1,4 +1,3 @@ -#!/usr/bin/python3
import Xlib
import Xlib.display
@@ -26,62 +25,75 @@ class Listener(Thread): return self.active_window
def run(self):
- disp = Xlib.display.Display()
- NET_WM_NAME = disp.intern_atom('_NET_WM_NAME')
- WM_CLASS = disp.intern_atom('WM_CLASS')
- NET_ACTIVE_WINDOW = disp.intern_atom('_NET_ACTIVE_WINDOW')
- root = disp.screen().root
+ self.disp = Xlib.display.Display()
+ root = self.disp.screen().root
root.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
+ self.NET_WM_NAME = self.disp.intern_atom('_NET_WM_NAME')
+ self.WM_CLASS = self.disp.intern_atom('WM_CLASS')
+ self.NET_ACTIVE_WINDOW = self.disp.intern_atom('_NET_ACTIVE_WINDOW')
component.handle(Debug("Waiting for xlib event"))
self.running = True
while self.running:
- event = disp.next_event()
+
try:
- window_id = root.get_full_property(NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType).value[0]
- window = disp.create_resource_object('window', window_id)
- window.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
- window_prop = window.get_full_property(NET_WM_NAME, 0)
- if window_prop is None:
- continue
- window_name = str(window_prop.value).lower()
- class_name = str(window.get_full_property(WM_CLASS, 0).value).lower()
- except Xlib.error.XError:
- continue
+ self.handle_event(root, self.disp.next_event())
+ except Exception as e:
+ component.handle(Debug( str(e) ))
+ print(e)
+
+ def handle_event(self, root, event):
+ try:
+ window_id_property = root.get_full_property(self.NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType)
+ if window_id_property is None:
+ return
+ window_id = window_id_property.value[0]
+ window = self.disp.create_resource_object('window', window_id)
+ window_prop = window.get_full_property(self.NET_WM_NAME, 0)
+ if window_prop is None:
+ return
+ window_name = str(window_prop.value).lower()
+ class_name = str(window.get_full_property(self.WM_CLASS, 0).value).lower()
+ except Xlib.error.XError:
+ return
+
+ if window_name is None or window_name == self.active_window:
+ return
- if window_name is None or window_name == self.active_window:
+ self.active_window = window_name
+ found = False
+
+ # Create a reveverse dictionnary for getting the users added first
+ # In python, the elements in a dictionnary are sorted by insertion
+ # order, so we get the user added first here
+ for pattern in reversed(self.mapping.keys()):
+ # Ignore the default layout
+ if pattern == "default":
+ continue
+ if not (pattern.lstrip() in window_name or pattern.lstrip() in class_name):
continue
+ code = self.mapping[pattern]
+ # We found something. At this point, even if we do not update
+ # the layer (because it is the same as the previous) we do not
+ # switch back to the default layer.
+ found = True
+ if code != self.last_code:
+
+ component.handle(Debug("Switching to '%s' for '%s'" % (pattern, window_name)))
+ self.queue.put ( (code, None) )
+ self.last_code = code
+ # We found a matching configuration. Even if the match is the
+ # same as the current one, we break the loop in order to
+ # prevent another layer to update.
+ break
+ if not found and self.last_code != "default":
+ default = self.mapping.get("default", None)
+ if default is None:
+ return
+
+ self.queue.put ( (default, None) )
+ self.last_code = "default"
- self.active_window = window_name
- found = False
-
- # Create a reveverse dictionnary for getting the users added first
- # In python, the elements in a dictionnary are sorted by insertion
- # order, so we get the user added first here
- for pattern in reversed(self.mapping):
- # Ignore the default layout
- if pattern == "default":
- continue
- if not (pattern.lstrip() in window_name or pattern.lstrip() in class_name):
- continue
- code = self.mapping[pattern]
- # We found something. At this point, even if we do not update
- # the layer (because it is the same as the previous) we do not
- # switch back to the default layer.
- found = True
- if code != self.last_code:
-
- component.handle(Debug("Switching to '%s' for '%s'" % (pattern, window_name)))
- self.queue.put ( (code, None) )
- self.last_code = code
- break
- if not found and self.last_code != "default":
- default = self.mapping.get("default", None)
- if default is None:
- continue
-
- self.queue.put ( (default, None) )
- self.last_code = "default"
def stop(self):
self.running = False
|