diff --git a/README.md b/README.md index 56738b2..28146e3 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,13 @@ This demo cycles through 15 different haptic patterns with 3-second intervals to The application uses `dbus-monitor` to listen for notifications on the D-Bus session bus. When a notification is detected on the `org.freedesktop.Notifications` interface, it sends a HID++ command to the MX Master 4 to trigger its built-in haptic motor. +If both USB receiver and Bluetooth are connected, the app prefers USB by default. You can force the connection mode by setting `MX4_CONNECTION`: + +- `MX4_CONNECTION=bluetooth` +- `MX4_CONNECTION=usb` + +Bluetooth uses a WebHID-style report payload for haptic effects, inspired by [mx-master-4-webhid](https://github.com/mario-gutierrez/mx-master-4-webhid). + This works with any application that sends notifications through the standard freedesktop.org notification specification, including: - System notifications diff --git a/src/mx_master_4.py b/src/mx_master_4.py index 9a13225..dad813b 100644 --- a/src/mx_master_4.py +++ b/src/mx_master_4.py @@ -5,6 +5,13 @@ import hid LOGITECH_VID = 0x046D +USAGE_PAGE_HIDPP = 0xFF00 +BUS_USB = 0x03 +BUS_BLUETOOTH = 0x05 +PRODUCT_HINTS = ("mx master 4", "mx master") +WEBHID_REPORT_ID = 0x11 +WEBHID_REPORT_LENGTH = 19 +WEBHID_DEVICE_INDEX = 0x02 class ReportID(IntEnum): @@ -23,28 +30,75 @@ class FunctionID(IntEnum): class MXMaster4: device: hid.Device | None = None - def __init__(self, path: str, device_idx: int): + def __init__(self, path: str, device_idx: int, bus_type: int | None): self.path = path self.device_idx = device_idx + self.is_bluetooth = self._is_bluetooth(bus_type) + + @staticmethod + def _is_bluetooth(bus_type: int | str | None) -> bool: + if hasattr(bus_type, "name"): + try: + return "bluetooth" in bus_type.name.lower() + except Exception: + pass + if isinstance(bus_type, str): + return "bluetooth" in bus_type.lower() + if isinstance(bus_type, int): + return bus_type == BUS_BLUETOOTH + return False @classmethod - def find(cls): + def find(cls, prefer_bluetooth: bool = False): devices = hid.enumerate(LOGITECH_VID) + candidates = [] for device in devices: - if device["usage_page"] == 65280: - path = device["path"].decode("utf-8") - logging.debug(f"Found: %s", device["product_string"]) - logging.debug(f"\tPath: %s", path) - logging.debug( - f"\tVID:PID: %.04X:%.04X", - device["vendor_id"], - device["product_id"], - ) - logging.debug(f"\tInterface: %s", device.get("interface_number")) - return cls(path, device["interface_number"]) - - return None + product = (device.get("product_string") or "").lower() + usage_page = device.get("usage_page") + if usage_page != USAGE_PAGE_HIDPP and not any( + hint in product for hint in PRODUCT_HINTS + ): + continue + + path = device.get("path") + if isinstance(path, bytes): + path = path.decode("utf-8", errors="ignore") + else: + path = str(path) + + device_idx = device.get("interface_number") + if not isinstance(device_idx, int) or not 0 <= device_idx <= 0xFF: + device_idx = 0x00 + + bus_type = device.get("bus_type") + score = 0 + if usage_page == USAGE_PAGE_HIDPP: + score += 2 + if any(hint in product for hint in PRODUCT_HINTS): + score += 1 + if bus_type == BUS_BLUETOOTH: + score += 2 if prefer_bluetooth else 1 + elif bus_type == BUS_USB: + score += 2 if not prefer_bluetooth else 1 + + candidates.append((score, path, device_idx, device)) + + if not candidates: + return None + + candidates.sort(key=lambda entry: entry[0], reverse=True) + _, path, device_idx, device = candidates[0] + logging.debug("Found: %s", device.get("product_string")) + logging.debug("\tPath: %s", path) + logging.debug( + "\tVID:PID: %.04X:%.04X", + device.get("vendor_id", 0), + device.get("product_id", 0), + ) + logging.debug("\tInterface: %s", device.get("interface_number")) + logging.debug("\tBus: %s", device.get("bus_type")) + return cls(path, device_idx, device.get("bus_type")) def __enter__(self): self.device = hid.Device(path=self.path.encode()) @@ -58,12 +112,32 @@ def write(self, data: bytes): raise Exception("Device not open") self.device.write(data) + def haptic(self, pattern_id: int): + if self.is_bluetooth: + return self._webhid_haptic(pattern_id) + return self.hidpp(FunctionID.Haptic, pattern_id) + + def _webhid_haptic(self, pattern_id: int): + if not 0 <= pattern_id <= 0xFF: + raise Exception("Haptic pattern out of range") + payload = bytearray(WEBHID_REPORT_LENGTH) + payload[0] = WEBHID_DEVICE_INDEX + payload[1] = (int(FunctionID.Haptic) >> 8) & 0xFF + payload[2] = int(FunctionID.Haptic) & 0xFF + payload[3] = pattern_id + report = bytes([WEBHID_REPORT_ID]) + payload + logging.debug( + "Sending: %02X %s", + WEBHID_REPORT_ID, + payload.hex(), + ) + self.write(report) + def hidpp( - self, - feature_idx: FunctionID, - *args: int, + self, + feature_idx: FunctionID, + *args: int, ) -> tuple[int, bytes]: - if len(args) > 16: raise Exception("Too many arguments") @@ -118,10 +192,7 @@ def demo(): with mx_master_4 as dev: for i in range(15): logging.info("Haptic %d", i) - dev.hidpp( - FunctionID.Haptic, - i, - ) + dev.haptic(i) sleep(3) diff --git a/src/watch.py b/src/watch.py index 6d861a7..53738fb 100644 --- a/src/watch.py +++ b/src/watch.py @@ -1,8 +1,8 @@ import logging +import os import subprocess -import threading -from mx_master_4 import FunctionID, MXMaster4 +from mx_master_4 import MXMaster4 def monitor_notifications(device): @@ -26,7 +26,7 @@ def monitor_notifications(device): # When we see a Notify method call, trigger haptic if "member=Notify" in line or "method call" in line.lower(): try: - device.hidpp(FunctionID.Haptic, 0) + device.haptic(0) logging.info("✓ Haptic feedback triggered!") except Exception as e: logging.error("Failed to trigger haptic: %s", e) @@ -38,7 +38,14 @@ def monitor_notifications(device): def main(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") - device = MXMaster4.find() + preference = (os.getenv("MX4_CONNECTION") or "auto").lower() + if preference == "bluetooth": + device = MXMaster4.find(prefer_bluetooth=True) + elif preference == "usb": + device = MXMaster4.find(prefer_bluetooth=False) + else: + device = MXMaster4.find(prefer_bluetooth=False) + if not device: logging.error("MX Master 4 not found!") exit(1)