diff options
| -rwxr-xr-x | client.py | 7 | ||||
| -rw-r--r-- | configuration.py | 60 | ||||
| -rw-r--r-- | consumer.py | 146 | ||||
| -rw-r--r-- | interfaces/configuration.py | 14 | ||||
| -rwxr-xr-x | interfaces/desktopEvent.py | 3 | ||||
| -rwxr-xr-x | interfaces/endpoint.py | 20 | ||||
| -rwxr-xr-x | interfaces/message.py | 19 | ||||
| -rwxr-xr-x | macropad.pyw | 198 | ||||
| -rwxr-xr-x | readme.rst | 10 | ||||
| -rwxr-xr-x | serial_conn.py | 1 | ||||
| -rw-r--r--[-rwxr-xr-x] | socketserver.py | 67 | ||||
| -rw-r--r--[-rwxr-xr-x] | win32.py | 36 | ||||
| -rw-r--r-- | xlib.py | 144 |
13 files changed, 509 insertions, 216 deletions
@@ -37,11 +37,10 @@ elif config.has_section("connection.serial"): # Connect to the endpoint right now
component.provideAdapter(endpoint.EndPoint)
s = component.queryAdapter(conn, endpoint.IEndpoint)
-s.connect()
if args.layer is not None:
print(args.layer)
- s.queue = Queue()
+ s.connect()
with open(args.layer, "r") as json_file:
json_data = json_file.read()
j = json.loads(json_data)
@@ -57,7 +56,8 @@ class Conn(Thread): Thread.__init__(self)
self.queue = queue
self.in_queue = Queue()
- s.queue = self.in_queue
+ #s.queue = self.in_queue
+ s.connect()
def run(self):
@@ -76,7 +76,6 @@ class Conn(Thread): s.connect()
continue
-
s.fetch()
while not self.in_queue.empty():
diff --git a/configuration.py b/configuration.py new file mode 100644 index 0000000..fd73fad --- /dev/null +++ b/configuration.py @@ -0,0 +1,60 @@ +""" The configuration from the user.
+"""
+
+from collections import OrderedDict
+from os import path
+import json
+from zope import component
+from interfaces.message import Error
+
+class Mapping():
+ """Represent the configuration. This class is provided as an utility and is
+ used in the IDesktop interfaces.
+ """
+
+ def __init__(self, configuration):
+ self.set(configuration)
+
+ def set(self, configuration):
+ """ Associate the mapping defined in the dictionnary.
+
+ Only the lines pointing to a valid file or configuration will be
+ loaded.
+ """
+ init_mapping = configuration["mapping"]
+ self.mapping = OrderedDict()
+ tmp_mapping = dict(init_mapping)
+ for key in tmp_mapping.keys() :
+ json_file = init_mapping[key]
+ if not path.exists(json_file):
+ component.handle(Error(f"The file '{json_file}' does not exists"))
+ continue
+ with open(json_file, "r") as file:
+ json_data = file.read()
+ try:
+ j = json.loads(json_data)
+ self.mapping[key] = j
+ except json.decoder.JSONDecodeError:
+ component.handle(Error(f"Json syntax error in '{json_file}'"))
+
+
+ def get(self, key, default):
+ """ This function return the mapping associated with the given key.
+ """
+ return self.mapping.get(key, default)
+
+ def __getitem__(self, key):
+ return self.mapping.get(key, None)
+
+ def __setitem__(self, key, value):
+ self.mapping[key] = value
+
+ def items(self):
+ """ Implement the keys items from the dictionnary
+ """
+ return self.mapping.items()
+
+ def keys(self):
+ """ Implement the keys method from the dictionnary
+ """
+ return self.mapping.keys()
diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..8c577c8 --- /dev/null +++ b/consumer.py @@ -0,0 +1,146 @@ +""" This module provide a thread reading the events we need to send to the endpoint.
+"""
+
+import time
+import abc
+from threading import Thread
+from queue import Queue, Empty, Full
+
+from zope import component, interface
+
+from interfaces import endpoint, configuration
+from interfaces.configuration import IConfiguration
+from interfaces.message import ISocketMessage, Debug
+
+@interface.implementer(ISocketMessage)
+class Mapping():
+ """ Message requesting a layer change to the endpoint.
+ """
+
+ def __init__(self, message):
+ self.content = {"layer": message}
+
+
+class EventConsummer(Thread, metaclass=abc.ABCMeta):
+ """ Thread processing messages. This class does nothing and is intended to
+ be inherited.
+
+ The method `process` need to be overriden.
+ """
+
+ def __init__(self, timeout = None):
+ super().__init__()
+ self._queue = Queue()
+ self.timeout = timeout
+ self.daemon = True
+
+ def run(self):
+ """ Read and process the messages from the queue, and call the
+ handler.
+ """
+
+ while True:
+ # Block the thread for at most timeout seconds
+ try:
+ message = self._queue.get(timeout=self.timeout)
+ self._process(message)
+ self._queue.task_done()
+ except Empty:
+ self._process(None)
+
+ @abc.abstractmethod
+ def _process(self, message) -> None:
+ raise NotImplementedError()
+
+ def handle(self, message) -> None:
+ """ Register a new event in the queue.
+ This method is intended to be public.
+ """
+
+ # Try to add the message in the queue, if the queue is full, discard
+ # the firsts elements and try again.
+ try:
+ self._queue.put(message)
+ except Full:
+ while self._queue.full():
+ _ = self._queue.get()
+ # Discard another one more, just for sure.
+ try:
+ self._queue.put(message)
+ except Full:
+ component.handle(Debug("Ignoring event: queue is full"))
+
+
+class SocketMessageConsumer(EventConsummer):
+ """ Provide a handler consuming events and sending them to the endpoint.
+ It’s not an adapter as it’s not transforming the endpoint into
+ something else; instead, it requires an existing endpoint to
+ be declared in the registry.
+
+ The class register a handler_ for SocketEvents. They are queued and
+ processed when as the endpoint is connected. Events can be send as
+ usual using the component registry:
+
+ component.handle(Mapping(…))
+
+ .. _handler: https://zopecomponent.readthedocs.io/en/latest/narr.html#handlers
+ """
+
+ def __init__(self):
+ super().__init__(timeout = 2)
+ self._endpoint = component.getUtility(endpoint.IEndpoint)
+ self._last_mapping = None
+ component.provideHandler(self.handle, [ISocketMessage])
+
+ # Register the functions to associate for each kind of event.
+ self.function_table = {
+ "layer": self._process_layer,
+ }
+
+ def _process(self, message) -> None:
+
+ # We are here either because we have a message to process, or because
+ # we didn’t got any message before the timeout. We check if we are
+ # still connected to the endpoint.
+ while not self._endpoint.isConnected():
+ # Loop until we get the connection back. During this time, the
+ # message will stack in the queue.
+ component.handle(Debug("Reconnecting …"))
+ self._endpoint.state = self._endpoint.STATE_CONNECTING
+ self._endpoint.connect()
+ time.sleep(2)
+ # Also check if we have something to read from the server.
+ self._endpoint.fetch()
+
+ # The endpoint is connected and we have no message the process.
+ if message is None:
+ return
+
+ # Dispatch the event to the appropriate function.
+ for key, value in message.content.items():
+ self.function_table[key](value)
+
+ def _process_layer(self, message) -> None:
+
+ mapping, name = message
+ configuration = component.getUtility(IConfiguration)
+ if mapping != self._last_mapping:
+ self._last_mapping = mapping
+
+ if isinstance(mapping, dict):
+ self._endpoint.send(mapping)
+ elif isinstance(mapping, str):
+ layer = configuration.get(mapping, None)
+ if layer is not None:
+ self._endpoint.send(layer)
+
+ # Even if the mapping is the same and does not need to be updated, we
+ # may need to associate this new layer with another application.
+ if name is not None:
+ # We received a name to associate the configuration with.
+ configuration[name] = mapping
+ for key in mapping.keys():
+ # We do not want to log the keycode sent to the keyboard, only
+ # the name is interresting. We are supposed to have only one, but
+ # it’s the easer way to log it
+ component.handle(Debug(f"Associating {name} with {key}"))
diff --git a/interfaces/configuration.py b/interfaces/configuration.py new file mode 100644 index 0000000..9449f41 --- /dev/null +++ b/interfaces/configuration.py @@ -0,0 +1,14 @@ +from zope import interface +from zope.interface import Attribute + +from typing import Dict + +class IConfiguration(interface.Interface): + + def get(self, key : str, default) -> Dict: + """ Load the mapping for a given key + """ + + def __setitem__(self, key : str, value : Dict) -> None: + """ Set a value + """ diff --git a/interfaces/desktopEvent.py b/interfaces/desktopEvent.py index e8a30a4..9030ccd 100755 --- a/interfaces/desktopEvent.py +++ b/interfaces/desktopEvent.py @@ -4,10 +4,9 @@ from zope.interface import Attribute class IDesktop(interface.Interface):
- queue = Attribute("""The queue to send the message""")
mapping = Attribute("""Correspondance between application and layer""")
- def getForegroundWindowTitle(sel) -> Optional[str]:
+ def getForegroundWindowTitle(self) -> Optional[str]:
""" Return the name of the selected window
"""
diff --git a/interfaces/endpoint.py b/interfaces/endpoint.py index 15356c2..2f3af95 100755 --- a/interfaces/endpoint.py +++ b/interfaces/endpoint.py @@ -5,7 +5,6 @@ from zope.interface import Attribute class IEndpoint(interface.Interface):
- queue = Attribute("""The queue to send the message""")
state = Attribute("""The connection status""")
def isConnected(self) -> bool:
@@ -47,6 +46,7 @@ from zope import component import json
from interfaces.message import IMessage, Debug
+from consumer import Mapping
@component.adapter(IConnection)
@interface.implementer(IEndpoint)
@@ -58,17 +58,23 @@ class EndPoint(object): def __init__(self, connection):
self.connection = connection
- self.queue = None
self.state = self.STATE_DISCONNECTED
def isConnected(self) -> bool:
return self.state != self.STATE_DISCONNECTED
def connect(self):
+ state = self.state
try:
self.connection.connect()
self.state = self.STATE_CONNECTED
component.handle(Debug("Connected"))
+ if state == self.STATE_DISCONNECTED:
+ # This is the first connection
+ # Otherwise the state should be STATE_CONNECTING
+ # Initialize with the default layer
+ component.handle(Mapping(("default", None)))
+
except Exception as e:
print(e)
self.state = self.STATE_DISCONNECTED
@@ -90,23 +96,23 @@ class EndPoint(object): # If we do not have any entry from the macropad, just return
return
- print("recv", received)
layout = str(received, "utf-8").strip()
desktop = component.queryUtility(desktopEvent.IDesktop)
if desktop is not None:
title = desktop.getForegroundWindowTitle()
else:
title = None
- self.queue.put((layout, title))
+ component.handle(Mapping((layout, title)))
def send(self, data: list[dict[str, str]]):
- """ Send the data to the macropad
+ """ Send the data to the endpoint. The data must be the representation
+ of a json element.
"""
if self.state != self.STATE_CONNECTED:
return
try:
- j = json.dumps( data )
- self.connection.write(bytes(j, "utf-8"))
+ j = json.JSONEncoder().encode( data ) + "\n"
+ self.connection.write(str.encode(j))
except Exception as e:
print("send error", e)
self.state = self.STATE_DISCONNECTED
diff --git a/interfaces/message.py b/interfaces/message.py index 23ce2ec..bf05bef 100755 --- a/interfaces/message.py +++ b/interfaces/message.py @@ -1,14 +1,31 @@ +"""Messages sent to the user.
+"""
+
from zope import interface
from zope.interface import Attribute
class IMessage(interface.Interface):
+ """Interface for all the user messages.
+ """
content = Attribute("""The text message to log""")
+ level = Attribute("""Level of the message, 0 is high level and 10 is the lower""")
class ISocketMessage(interface.Interface):
+ """Message to sent to the endpoint."""
content = Attribute("""Content to send""")
@interface.implementer(IMessage)
-class Debug(object):
+class Debug():
+ """Send a message with a low level"""
+
+ def __init__(self, message):
+ self.content = message
+ self.level = 10
+
+@interface.implementer(IMessage)
+class Error():
+ """Send a message with a high level"""
def __init__(self, message):
self.content = message
+ self.level = 0
diff --git a/macropad.pyw b/macropad.pyw index 87dfd40..98e97e4 100755 --- a/macropad.pyw +++ b/macropad.pyw @@ -1,23 +1,25 @@ #!/usr/bin/env python3 -import serial - +import threading +from os import path import tkinter as tk from tkinter import Text +import configparser +from sys import platform + import pystray from pystray import MenuItem as item -from PIL import Image, ImageTk -import threading -from os import path -import sys +from PIL import Image from zope import component import interfaces from interfaces import endpoint -from interfaces.message import IMessage, Debug +from interfaces.message import IMessage, Debug, Error + +import consumer +from configuration import Mapping -import configparser script_path = path.dirname(path.realpath(__file__)) config_file = path.join(script_path, "config.ini") @@ -25,66 +27,43 @@ config_file = path.join(script_path, "config.ini") config = configparser.ConfigParser(delimiters="=") config.read(config_file) -from collections import OrderedDict - -init_mapping = config["mapping"] -mapping = OrderedDict(init_mapping) - -from queue import Queue -q = Queue() - -component.provideAdapter(interfaces.endpoint.EndPoint) -# -# Guess the platform and the load the corresponding event listener -# - -from sys import platform -if platform == "win32": - import win32 - window_listener = win32.Listener(mapping, q) - component.provideUtility(window_listener, interfaces.desktopEvent.IDesktop) - window_listener.start() - -elif platform == 'linux': - import xlib - xlib_listener = xlib.Listener(mapping, q) - component.provideUtility(xlib_listener, interfaces.desktopEvent.IDesktop) - xlib_listener.start() # # How to connect to the peripherical # +component.provideAdapter(interfaces.endpoint.EndPoint) if config.has_section("connection.serial"): - from serial_conn import SerialConnection - endpoint = component.queryAdapter(SerialConnection(config["connection.serial"]), endpoint.IEndpoint) - endpoint.queue = q - endpoint.connect() - component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) + endpoint = component.queryAdapter( + SerialConnection(config["connection.serial"]), endpoint.IEndpoint) elif config.has_section("connection.socket"): - from socket_conn import SocketConnection - endpoint = component.queryAdapter(SocketConnection(config["connection.socket"]), endpoint.IEndpoint) - endpoint.queue = q - component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) - endpoint.connect() + endpoint = component.queryAdapter( + SocketConnection(config["connection.socket"]), endpoint.IEndpoint) + +component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) +endpoint.connect() if config.has_section("socket.serve"): import socketserver - server = socketserver.Handler(config["socket.serve"], q) + server = socketserver.Handler(config["socket.serve"]) else: server = None +handler = consumer.SocketMessageConsumer() +handler.start() + -class Icon(object): +class Icon(): + """Icon displayed in the notification bar.""" def __init__(self, image): menu=( - item('Quit', self.quit_window), + item('Quit', self.quit_window), item('Show', self.show_window, default=True), item('Reset',self.reset), ) @@ -93,12 +72,16 @@ class Icon(object): self.stop = threading.Event() self.show_hide = threading.Event() - # Start the icon into a new thread in order to keep the main loop - # control + # Start the icon into a new thread in order to keep the main loop control icon_thread = threading.Thread(target=self.icon.run) self.icon_thread = icon_thread def start(self): + """ Start the icon. + + Handler is runned in a dedicated thread to avoid blocking the + events from the main loop. + """ self.icon_thread.start() def quit(self): @@ -111,18 +94,21 @@ class Icon(object): self.show_hide.set() def reset(self): - # Create a copy of the dictonnary before updating the keys - tmp_mapping = dict(mapping) - for key in tmp_mapping.keys() : - if key not in init_mapping.keys(): - del mapping[key] + """ Read the configuration file again, and update the mapping list. + """ + config.read(config_file) + mapping.set(config) -import json -class Application(object): +class Application(): + """ The main application. + """ def __init__(self): + # Override the default function called when exception are reported + tk.Tk.report_callback_exception = self.report_callback_exception self.window = tk.Tk() self.text = Text(self.window, height=8) + self.text.tag_config("error", background="red", foreground="white") ## State of the #pplication self.running = True self.visible = False @@ -149,15 +135,46 @@ class Application(object): self.icon.start() component.handle(Debug("Started")) + def connect_desktop(self): + """ Launch the thread listening events from the desktop + """ + component.handle(Debug(platform)) + listener = None + if platform == "win32": + import win32 + listener = win32.Listener(mapping) + elif platform == 'linux': + import xlib + listener = xlib.Listener(mapping) + if listener is not None: + component.provideUtility(listener, interfaces.desktopEvent.IDesktop) + listener.start() + else: + component.handle(Error("Unsupported system %s" % platform)) + + def report_callback_exception(self, exc, val, tb): + """ Handle exception reported inside the Tk application. + This method overrid the default Tk.tk.report_callback_exception + method. + """ + import traceback + traceback.print_exception(exc, value=val, tb=tb) + self.icon.stop.set() + self.icon.quit() + self.running = False + self.window.destroy() + component.queryUtility(interfaces.desktopEvent.IDesktop).stop() + return + def hide(self): self.icon.show_hide.clear() self.visible = False self.window.withdraw() self.update() - def update(self): if self.icon.stop.is_set(): + print("stopping") self.icon.quit() self.running = False self.window.destroy() @@ -173,58 +190,28 @@ class Application(object): self.visible = not self.visible @component.adapter(IMessage) - def log(self, message : str): + def log(self, message : IMessage): + """ Log a message. + The message is printed in the console, and displayed in the + application window. + """ print(message.content) try: - f = open("/tmp/macropad.log", "a") - f.write(message.content) - f.write("\n") - f.close() self.text.insert("1.0", "\n") self.text.insert("1.0", message.content) self.text.delete("200.0", "end") + if message.level == 0: + self.text.tag_add("error", "1.0", "1.end") + self.icon.show_hide.set() except Exception as e: print(e) - def send(self, data: str): - if data == self.last_layout: - return - self.last_layout = data - - conn = component.queryUtility(interfaces.endpoint.IEndpoint) - if isinstance(data, dict): - conn.send(data) - return - elif isinstance(data, str): - if not path.exists(data): - print("The file '%s' does not exists" % data) - return - with open(data, "r") as json_file: - json_data = json_file.read() - j = json.loads(json_data) - content = json.dumps(j) - - conn.send(j) - - def associate(self, layout: str, name: str): - mapping[name] = layout - component.handle(Debug("Associating %s with %s" % (name, layout))) - def exec(self): try: self.update() - if server is not None: server.update() - conn = component.queryUtility(interfaces.endpoint.IEndpoint) - if not conn.isConnected(): - component.handle(Debug("Reconnecting…")) - - conn.state = conn.STATE_CONNECTING - self.window.after(1000, conn.connect) - else: - # Check if we have something to read from the server, and by - # the by if the server is still active. - conn.fetch() - except Exception as e: + if server is not None: + server.update() + except BaseException as e: component.handle(Debug( str(e) )) print(e) # Got any error, stop the application properly @@ -233,20 +220,23 @@ class Application(object): if app.running: self.window.after(200, self.exec) - while not q.empty(): - last_layout, app_ = q.get(False) - self.send(last_layout) - if app_ is not None: self.associate(last_layout, app_) - if __name__ == '__main__': + # Start the main application, Initializing the message listener before + # listening desktop events app = Application() + # The application is started, ready to display any error message. + mapping = Mapping(config) + component.provideUtility(mapping, interfaces.configuration.IConfiguration) + app.connect_desktop() # Initialize the main loop app.exec() try: app.window.mainloop() - except: + except BaseException: app.running = False app.window.destroy() app.icon.quit() - component.queryUtility(interfaces.desktopEvent.IDesktop).stop() + desktop = component.queryUtility(interfaces.desktopEvent.IDesktop) + if desktop is not None: + desktop.stop() @@ -28,6 +28,12 @@ for debian you may need to install : sudo apt install python3-pil.imagetk python3-serial python3-zope.component python3-pystray
+for windows, you will also need
+
+.. code:: bash
+
+ python3 -m pip install pywin32
+
Configuration
=============
@@ -65,7 +71,9 @@ Socket server host = localhost
name = The name of the application to associate with
-Use a proxy to connect with the keyboard (as server)
+Use a proxy to connect with the keyboard (as server). The name will be used to
+restore the registered layout if the application is selected. You can declare
+multiple application using a `,` as a separator.
The mapping
-----------
diff --git a/serial_conn.py b/serial_conn.py index a08543a..7925673 100755 --- a/serial_conn.py +++ b/serial_conn.py @@ -44,3 +44,4 @@ class SerialConnection(object): """ Write into the connection.
Raise an exception if disconnected """
self.s.write(content)
+ self.s.write(bytes("\n", "utf-8"))
diff --git a/socketserver.py b/socketserver.py index 57000d8..b2234bc 100755..100644 --- a/socketserver.py +++ b/socketserver.py @@ -10,6 +10,7 @@ import socket
import selectors
import json
+import csv
from dataclasses import dataclass
from typing import Callable
@@ -17,7 +18,8 @@ from typing import Callable from zope import component
from interfaces import desktopEvent
-from interfaces.message import Debug, ISocketMessage
+from interfaces.message import Debug
+from consumer import Mapping # Event used to send the mapping change request
@dataclass
class waitEvent:
@@ -33,18 +35,17 @@ actions = { "layout" : lambda x : x,
}
-class Handler(object):
+
+class Handler():
""" Listen the incomming connexions and dispatch them into the connexion
object """
- def __init__(self, configuration, layout_queue):
+ def __init__(self, configuration):
""" Initialize the the socket server
-
- The layout_queue will be populated with all the elements received from the socket.
"""
super().__init__()
self.sel = selectors.DefaultSelector()
- component.provideHandler(self.sendMessage)
+ #component.provideHandler(self.sendMessage)
self.socket = socket.socket()
self.socket.bind((configuration["host"], int(configuration["port"])))
@@ -54,25 +55,30 @@ class Handler(object): c = waitEvent()
c.callback = self.accept
self.sel.register(self.socket, selectors.EVENT_READ, c)
- self.application_name = configuration.get("name", None)
- if self.application_name is not None:
- self.application_name = self.application_name.lower()
+ # Read the name to associate the connection with.
+ application_name = configuration.get("name", None)
+ if application_name is not None:
+ # Split the values using `,` as separator
+ reader = csv.reader([application_name])
+ self.application_name = [name.lower().strip() for name in next(reader)]
+ else:
+ self.application_name = None
+
self.connexions = []
- self.layout_queue = layout_queue
- @component.adapter(ISocketMessage)
- def sendMessage(self, content:ISocketMessage) -> None:
- """ Send a message to all the sockets connected to the server
- """
- c = sendEvent(message = content.content)
- c.callback = self._send
- map = self.sel.get_map()
- for value in list(map.values())[:]:
- if value.fileobj == self.socket:
- # Ignore the main connexion, keep it only in accept mode.
- continue
- self.sel.modify(value.fileobj, selectors.EVENT_WRITE, c)
+ #@component.adapter(ISocketMessage)
+ #def sendMessage(self, content:ISocketMessage) -> None:
+ # """ Send a message to all the sockets connected to the server
+ # """
+ # c = sendEvent(message = content.content)
+ # c.callback = self._send
+ # map = self.sel.get_map()
+ # for value in list(map.values())[:]:
+ # if value.fileobj == self.socket:
+ # # Ignore the main connexion, keep it only in accept mode.
+ # continue
+ # self.sel.modify(value.fileobj, selectors.EVENT_WRITE, c)
def update(self):
""" Read the status for all the sockets, if they are available.
@@ -114,7 +120,7 @@ class Handler(object): try:
js = json.loads(json_data)
for key in js.keys():
- component.handle(Debug("Received %s from the socket" % (key)))
+ component.handle(Debug("Received '%s' from the socket" % (key)))
except Exception as e:
print("Can’t read", json_data, e)
return
@@ -122,21 +128,16 @@ class Handler(object): if self.application_name is not None:
# As we have a name in the configuration to associate with, use
# this name as a reference
- title = self.application_name
+ for title in self.application_name:
+ component.handle(Mapping((js, title)))
else:
# Associate the layout with the current window
title = component.queryUtility(desktopEvent.IDesktop).getForegroundWindowTitle()
- self.layout_queue.put((js, title))
-
- # The application name is hardcoded in the code, and should be reported
- # in the configuration or somewhere else.
-
- # The name of the current application is not reliable because the
- # message can be send with some lattency.
- self.layout_queue.put((js, self.application_name))
+ component.handle(Mapping((js, title)))
def _send(self:object, conn:socket, mask:int, text) -> None:
""" Internal method used to dispatch the message to the socket. """
+
try:
conn.sendall(bytes(text , "utf-8"))
except:
@@ -145,5 +146,7 @@ class Handler(object): self._switch_read(conn)
def close(self, conn):
+ """ Close all the existing connexions
+ """
self.sel.unregister(conn)
conn.close()
@@ -1,6 +1,6 @@ -# Required for the window title name
from ctypes import wintypes, windll, create_unicode_buffer, WINFUNCTYPE
from typing import Optional, Dict
+import sys
from zope import interface
from interfaces import desktopEvent
@@ -9,6 +9,7 @@ import win32gui import win32con
from zope import component
from interfaces.message import IMessage, Debug
+from consumer import Mapping
# This code tries to hook the Focus changes events with the callback method.
@@ -54,10 +55,9 @@ def setHook(WinEventProc, eventType): @interface.implementer(desktopEvent.IDesktop)
class Listener(object):
- def __init__(self, mapping: Dict[str, str], queue):
+ def __init__(self, mapping: Dict[str, str]):
self.WinEventProc = WinEventProcType(self.callback)
self.mapping = mapping
- self.queue = queue
def getForegroundWindowTitle(self) -> Optional[str]:
""" Get the window title name.
@@ -76,31 +76,49 @@ class Listener(object): else:
return None
+
def callback(self, hWinEventHook, event, hwnd, idObject, idChild, dwEventThread,
dwmsEventTime) -> None:
if hwnd is None:
return
+ match_event = event in (win32con.EVENT_OBJECT_FOCUS,
+ win32con.EVENT_OBJECT_NAMECHANGE)
+ if not match_event:
+ return
+
+ # Loop until we found the main window
+ parent = windll.user32.GetParent(hwnd)
+ while parent != 0:
+ hwnd = parent
+ parent = windll.user32.GetParent(hwnd)
+
+ is_foreground = (hwnd == windll.user32.GetForegroundWindow())
+ if not is_foreground:
+ return
length = windll.user32.GetWindowTextLengthW(hwnd)
title = create_unicode_buffer(length + 1)
windll.user32.GetWindowTextW(hwnd, title, length + 1)
if title.value is None:
return
- title = str(title.value).lower()
+ title = str(title.value).lower().strip()
if title == "":
return
+
for pattern, code in self.mapping.items():
if pattern == "default":
continue
if pattern in title:
- self.queue.put ( (code, None) )
+ for key in code.keys():
+ print(f"Mapping '{title}' to {key}")
+ component.handle(Mapping((code, None)))
return
- print("Matching '%s' to default" % title)
# Get the default mapping to apply if there is any defined
- default = self.mapping.get("default", None)
- if default is not None:
- self.queue.put ( (default, None) )
+ # This only applies when the window raising the event is the main
+ # window
+ print(f"Mapping '{title}' to default")
+ component.handle(Mapping(("default", None)))
def start(self) -> None:
self.hookIDs = [setHook(self.WinEventProc, et) for et in eventTypes.keys()]
@@ -1,24 +1,26 @@ -#!/usr/bin/python3
+from threading import Thread
+
import Xlib
import Xlib.display
from zope import interface
-from interfaces import desktopEvent
-from threading import Thread
-from interfaces.message import IMessage, Debug
from zope import component
+from interfaces.message import Debug
+from interfaces import desktopEvent
+from consumer import Mapping
+
@interface.implementer(desktopEvent.IDesktop)
class Listener(Thread):
- def __init__(self, mapping, queue):
- Thread.__init__(self)
- self.queue = queue
+ def __init__(self, mapping):
+ super().__init__()
self.mapping = mapping
self.active_window = None
self.last_code = None
self.running = False
+ self.daemon = True
def getForegroundWindowTitle(self):
""" Return the name of the selected window
@@ -26,62 +28,92 @@ class Listener(Thread): return self.active_window
def run(self):
- disp = Xlib.display.Display()
- NET_WM_NAME = disp.intern_atom('_NET_WM_NAME')
- WM_CLASS = disp.intern_atom('WM_CLASS')
- NET_ACTIVE_WINDOW = disp.intern_atom('_NET_ACTIVE_WINDOW')
- root = disp.screen().root
+ self.disp = Xlib.display.Display()
+ root = self.disp.screen().root
root.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
+ self.NET_WM_NAME = self.disp.intern_atom('_NET_WM_NAME')
+ self.WM_CLASS = self.disp.intern_atom('WM_CLASS')
+ self.WM_NAME = self.disp.intern_atom('WM_NAME')
+ self.NET_ACTIVE_WINDOW = self.disp.intern_atom('_NET_ACTIVE_WINDOW')
component.handle(Debug("Waiting for xlib event"))
self.running = True
while self.running:
- event = disp.next_event()
+
try:
- window_id = root.get_full_property(NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType).value[0]
- window = disp.create_resource_object('window', window_id)
- window.change_attributes(event_mask=Xlib.X.PropertyChangeMask)
- window_prop = window.get_full_property(NET_WM_NAME, 0)
- if window_prop is None:
- continue
- window_name = str(window_prop.value).lower()
- class_name = str(window.get_full_property(WM_CLASS, 0).value).lower()
- except Xlib.error.XError:
- continue
+ self.handle_event(root, self.disp.next_event())
+ except Exception as e:
+ component.handle(Debug( str(e) ))
+ print(e)
+
+ def get_property_value(self, window, property_):
+ """ Return the requested property for the given window, or None if the
+ property is not defined. """
+ prop_name = window.get_full_property(property_, 0)
+ if prop_name is not None:
+ return str(prop_name.value).lower()
+ return None
+
- if window_name is None or window_name == self.active_window:
+ def handle_event(self, root, _event):
+ try:
+ window_id_property = root.get_full_property(
+ self.NET_ACTIVE_WINDOW,
+ Xlib.X.AnyPropertyType
+ )
+ if window_id_property is None:
+ return
+ window_id = window_id_property.value[0]
+ window = self.disp.create_resource_object('window', window_id)
+
+ window_name = self.get_property_value(window, self.NET_WM_NAME) \
+ or self.get_property_value(window, self.WM_NAME)
+ class_name = self.get_property_value(window, self.WM_CLASS)
+ except Xlib.error.XError:
+ return
+
+ if window_name is None or window_name == self.active_window:
+ return
+
+ self.active_window = window_name
+ found = False
+
+ # Create a reveverse dictionnary for getting the users added first
+ # In python, the elements in a dictionnary are sorted by insertion
+ # order, so we get the user added first here
+ for pattern in reversed(self.mapping.keys()):
+ # Ignore the default layout
+ if pattern == "default":
+ continue
+ if not ((window_name is not None and pattern.lstrip() in window_name)
+ or (class_name is not None and pattern.lstrip() in class_name)):
continue
+ code = self.mapping[pattern]
+ # We found something. At this point, even if we do not update
+ # the layer (because it is the same as the previous) we do not
+ # switch back to the default layer.
+ found = True
+ if code != self.last_code:
- self.active_window = window_name
- found = False
-
- # Create a reveverse dictionnary for getting the users added first
- # In python, the elements in a dictionnary are sorted by insertion
- # order, so we get the user added first here
- for pattern in reversed(self.mapping):
- # Ignore the default layout
- if pattern == "default":
- continue
- if not (pattern.lstrip() in window_name or pattern.lstrip() in class_name):
- continue
- code = self.mapping[pattern]
- # We found something. At this point, even if we do not update
- # the layer (because it is the same as the previous) we do not
- # switch back to the default layer.
- found = True
- if code != self.last_code:
-
- component.handle(Debug("Switching to '%s' for '%s'" % (pattern, window_name)))
- self.queue.put ( (code, None) )
- self.last_code = code
- break
- if not found and self.last_code != "default":
- default = self.mapping.get("default", None)
- if default is None:
- continue
-
- self.queue.put ( (default, None) )
- self.last_code = "default"
-
- def stop(self):
+ component.handle(Debug(f"Switching to '{pattern}' for '{window_name}'"))
+ component.handle(Mapping((code, None)))
+ self.last_code = code
+ # We found a matching configuration. Even if the match is the
+ # same as the current one, we break the loop in order to
+ # prevent another layer to update.
+ break
+ if not found:
+ print(f"Mapping '{window_name}' / '{class_name}' to default")
+ if not found and self.last_code != "default":
+ default = self.mapping.get("default", None)
+ if default is None:
+ return
+
+ component.handle(Mapping((default, None)))
+ self.last_code = "default"
+
+
+ def stop(self) -> None:
+ """ Stop the thread properly.
+ """
self.running = False
|
