#!/usr/bin/env python3 import serial import tkinter as tk from tkinter import Text import pystray from pystray import MenuItem as item from PIL import Image, ImageTk import threading from os import path import sys from zope import component import interfaces from interfaces import endpoint from interfaces.message import IMessage, Debug import configparser script_path = path.dirname(path.realpath(__file__)) config_file = path.join(script_path, "config.ini") config = configparser.ConfigParser(delimiters="=") config.read(config_file) init_mapping = config["mapping"] mapping = dict(init_mapping) from queue import Queue q = Queue() component.provideAdapter(interfaces.endpoint.EndPoint) # # Guess the platform and the load the corresponding event listener # from sys import platform if platform == "win32": import win32 window_listener = win32.Listener(mapping, q) component.provideUtility(window_listener, interfaces.desktopEvent.IDesktop) window_listener.start() elif platform == 'linux': import xlib xlib_listener = xlib.Listener(mapping, q) component.provideUtility(xlib_listener, interfaces.desktopEvent.IDesktop) xlib_listener.start() # # How to connect to the peripherical # if config.has_section("connection.serial"): from serial_conn import SerialConnection endpoint = component.queryAdapter(SerialConnection(config["connection.serial"]), endpoint.IEndpoint) endpoint.queue = q endpoint.connect() component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) elif config.has_section("connection.socket"): from socket_conn import SocketConnection endpoint = component.queryAdapter(SocketConnection(config["connection.socket"]), endpoint.IEndpoint) endpoint.queue = q component.provideUtility(endpoint, interfaces.endpoint.IEndpoint) endpoint.connect() if config.has_section("socket.serve"): import socketserver server = socketserver.Handler(config["socket.serve"], q) else: server = None class Icon(object): def __init__(self, image): menu=( item('Quit', self.quit_window), item('Show', self.show_window, default=True), item('Reset',self.reset), ) self.icon=pystray.Icon("name", image, "Macropad companion", menu) self.stop = threading.Event() self.show_hide = threading.Event() # Start the icon into a new thread in order to keep the main loop # control icon_thread = threading.Thread(target=self.icon.run) self.icon_thread = icon_thread def start(self): self.icon_thread.start() def quit(self): self.icon.stop() def quit_window(self): self.stop.set() def show_window(self): self.show_hide.set() def reset(self): # Create a copy of the dictonnary before updating the keys tmp_mapping = dict(mapping) for key in tmp_mapping.keys() : if key not in init_mapping.keys(): del mapping[key] import json class Application(object): def __init__(self): self.window = tk.Tk() self.text = Text(self.window, height=8) ## State of the #pplication self.running = True self.visible = False self.focused_window = None self.last_layout = None component.provideHandler(self.log) # Window property self.window.withdraw() self.window.title("Macropad companion") icon = path.join(script_path, "favicon.ico") try: self.window.iconbitmap(icon) except: pass self.text.pack() # When closing, return back to the iconified mode self.window.protocol("WM_DELETE_WINDOW", self.hide) # Start the application in iconified mode image=Image.open(icon) self.icon = Icon(image) self.icon.start() component.handle(Debug("Started")) def hide(self): self.icon.show_hide.clear() self.visible = False self.window.withdraw() self.update() def update(self): if self.icon.stop.is_set(): self.icon.quit() self.running = False self.window.destroy() component.queryUtility(interfaces.desktopEvent.IDesktop).stop() return if self.icon.show_hide.is_set(): if not self.visible: self.window.deiconify() else: self.window.withdraw() self.icon.show_hide.clear() self.visible = not self.visible @component.adapter(IMessage) def log(self, message : str): print(message.content) try: f = open("/tmp/macropad.log", "a") f.write(message.content) f.write("\n") f.close() self.text.insert("1.0", "\n") self.text.insert("1.0", message.content) self.text.delete("200.0", "end") except Exception as e: print(e) def send(self, data: str): if data == self.last_layout: return self.last_layout = data if isinstance(data, dict): component.queryUtility(interfaces.endpoint.IEndpoint).send(data) return elif isinstance(data, str): if not path.exists(data): print("The file '%s' does not exists" % data) return with open(data, "r") as json_file: json_data = json_file.read() j = json.loads(json_data) content = json.dumps(j) component.queryUtility(interfaces.endpoint.IEndpoint).send(j) def associate(self, layout: str, name: str): mapping[name] = layout component.handle(Debug("Associating %s with %s" % (name, layout))) def exec(self): try: self.update() if server is not None: server.update() conn = component.queryUtility(interfaces.endpoint.IEndpoint) if not conn.isConnected(): component.handle(Debug("Reconnecting…")) conn.state = conn.STATE_CONNECTING self.window.after(5000, conn.connect) else: conn.fetch() except Exception as e: component.handle(Debug( str(e) )) print(e) # Got any error, stop the application properly self.icon.stop.set() if app.running: self.window.after(200, self.exec) while not q.empty(): last_layout, app_ = q.get(False) self.send(last_layout) if app_ is not None: self.associate(last_layout, app_) if __name__ == '__main__': app = Application() # Initialize the main loop app.exec() try: app.window.mainloop() except: app.running = False app.window.destroy() app.icon.quit() component.queryUtility(interfaces.desktopEvent.IDesktop).stop()