diff --git a/custom_components/ble_monitor/ble_parser/xiaomi.py b/custom_components/ble_monitor/ble_parser/xiaomi.py old mode 100755 new mode 100644 index 1fb39cb0..04577b47 --- a/custom_components/ble_monitor/ble_parser/xiaomi.py +++ b/custom_components/ble_monitor/ble_parser/xiaomi.py @@ -86,7 +86,8 @@ 0x3E17: "KS1BP", 0x3BD5: "MJTZC01YM", 0x50FB: "ES3", - 0x5DB1: "MBS17" + 0x5DB1: "MBS17", + 0x64C5: "PTX-F1-Display" } # Structured objects for data conversions @@ -731,6 +732,19 @@ def obj4803(xobj): return {"battery": batt} +def obj605d(xobj): + """Temperature""" + if len(xobj) == 1: + temp = xobj[0] + return {"temperature": temp} + else: + return {} + + +def obj6012(xobj): + """Humidity""" + return obj4802(xobj) + def obj4804(xobj): """Opening status""" opening_state = xobj[0] @@ -1047,6 +1061,8 @@ def obj4e0c(xobj, device_type): "one btn switch": "toggle", "button switch": "single press", } + elif device_type == "PTX-F1-Display": + return obj560c(xobj, "KS1BP") else: result = {} return result @@ -1077,6 +1093,8 @@ def obj4e0d(xobj, device_type): "one btn switch": "toggle", "button switch": "double press", } + elif device_type == "PTX-F1-Display": + return obj560d(xobj, "KS1BP") else: result = {} return result @@ -1107,6 +1125,8 @@ def obj4e0e(xobj, device_type): "one btn switch": "toggle", "button switch": "long press", } + elif device_type == "PTX-F1-Display": + return obj560e(xobj, "KS1BP") else: result = {} return result @@ -1391,7 +1411,9 @@ def obj6e16(xobj): 0x560d: obj560d, 0x560e: obj560e, 0x5a16: obj5a16, - 0x6E16: obj6e16, + 0x605d: obj605d, + 0x6012: obj6012, + 0x6E16: obj6e16 } @@ -1495,6 +1517,8 @@ def parse_xiaomi(self, data: bytes, mac: bytes): # only process messages with same priority that have a unique packet id if prev_packet == packet_id: if self.filter_duplicates is True: + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug("Duplicate packet received, not processing. Data: %s", data.hex()) return None else: pass @@ -1504,11 +1528,15 @@ def parse_xiaomi(self, data: bytes, mac: bytes): # do not process advertisements with lower priority (ATC advertisements will be used instead) prev_adv_priority -= 1 self.adv_priority[mac] = prev_adv_priority + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug("Lower priority advertisement received, not processing. Data: %s", data.hex()) return None else: if prev_packet == packet_id: if self.filter_duplicates is True: # only process messages with highest priority and messages with unique packet id + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug("Duplicate packet received, not processing. Data: %s", data.hex()) return None self.lpacket_ids[mac] = packet_id @@ -1591,7 +1619,7 @@ def parse_xiaomi(self, data: bytes, mac: bytes): "0x4e0e", "0x560c", "0x560d", - "0x560e" + "0x560e", ]: result.update(resfunc(dobject, device_type)) else: @@ -1620,6 +1648,8 @@ def decrypt_mibeacon_v4_v5(self, data, i, mac): if mac not in self.no_key_message: _LOGGER.error("No encryption key found for device with MAC %s", to_mac(mac)) self.no_key_message.append(mac) + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug("Key error for device with MAC %s, cannot decrypt data. Data: %s", to_mac(mac), data.hex()) return None nonce = b"".join([mac[::-1], data[6:9], data[-7:-4]]) diff --git a/custom_components/ble_monitor/const.py b/custom_components/ble_monitor/const.py old mode 100755 new mode 100644 index db1251f0..cf8e9793 --- a/custom_components/ble_monitor/const.py +++ b/custom_components/ble_monitor/const.py @@ -2120,6 +2120,7 @@ class BLEMonitorBinarySensorEntityDescription( 'XMWXKG01YL' : [["rssi"], ["two btn switch left", "two btn switch right"], []], 'XMWXKG01LM' : [["battery", "rssi"], ["one btn switch"], []], 'PTX' : [["battery", "rssi"], ["one btn switch"], []], + 'PTX-F1-Display' : [["temperature", "humidity", "rssi"], ["four btn switch 1", "four btn switch 2", "four btn switch 3", "four btn switch 4"], []], 'YLAI003' : [["rssi", "battery"], ["button"], []], 'YLYK01YL' : [["rssi"], ["remote"], ["remote single press", "remote long press"]], 'YLYK01YL-FANCL' : [["rssi"], ["fan remote"], []], @@ -2280,6 +2281,7 @@ class BLEMonitorBinarySensorEntityDescription( 'XMWXKG01YL' : 'Xiaomi', 'XMWXKG01LM' : 'Xiaomi', 'PTX' : 'Xiaomi', + 'PTX-F1-Display' : 'Xiaomi', 'SV40' : 'Lockin', 'SU001-T' : 'Petoneer', 'ATC' : 'ATC', diff --git a/custom_components/ble_monitor/test/test_xiaomi_parser.py b/custom_components/ble_monitor/test/test_xiaomi_parser.py index e374595a..d0393b02 100644 --- a/custom_components/ble_monitor/test/test_xiaomi_parser.py +++ b/custom_components/ble_monitor/test/test_xiaomi_parser.py @@ -1215,6 +1215,90 @@ def test_Xiaomi_PTX(self): assert sensor_msg["button switch"] == "single press" assert sensor_msg["rssi"] == -52 + def test_Xiaomi_PTX_F1_Display_single_press(self): + """Test Xiaomi parser for PTX-F1-Display single press on switch 4.""" + self.aeskeys = {} + data_string = "043E3E0201000066554433221132020106191695FE5859C5642D6655443322112D9475BB270100AB9CBFA914093039303631352E72656D6F74652E7831737764C6".replace(" ", "") + data = bytes(bytearray.fromhex(data_string)) + + aeskey = "00112233445566778899aabbccddeeff" + + is_ext_packet = True if data[3] == 0x0D else False + mac = (data[8 if is_ext_packet else 7:14 if is_ext_packet else 13])[::-1] + mac_address = mac.hex() + p_mac = bytes.fromhex(mac_address.replace(":", "").lower()) + p_key = bytes.fromhex(aeskey.lower()) + self.aeskeys[p_mac] = p_key + # pylint: disable=unused-variable + ble_parser = BleParser(aeskeys=self.aeskeys) + sensor_msg, tracker_msg = ble_parser.parse_raw_data(data) + + assert sensor_msg["firmware"] == "Xiaomi (MiBeacon V5 encrypted)" + assert sensor_msg["type"] == "PTX-F1-Display" + assert sensor_msg["mac"] == "112233445566" + assert sensor_msg["packet"] == 45 + assert sensor_msg["data"] + assert sensor_msg["four btn switch 4"] == "toggle" + assert sensor_msg["button switch"] == "single press" + assert sensor_msg["rssi"] == -58 + assert sensor_msg["local_name"] == "090615.remote.x1swd" + + def test_Xiaomi_PTX_F1_Display_temperature(self): + """Test Xiaomi parser for PTX-F1-Display temperature.""" + self.aeskeys = {} + data_string = ( + "043E29020100006655443322111D020106191695FE5859C56433665544332211997B546B0C01009089C35AC4" + ).replace(" ", "") + data = bytes(bytearray.fromhex(data_string)) + + aeskey = "00112233445566778899aabbccddeeff" + + is_ext_packet = True if data[3] == 0x0D else False + mac = (data[8 if is_ext_packet else 7:14 if is_ext_packet else 13])[::-1] + mac_address = mac.hex() + p_mac = bytes.fromhex(mac_address.replace(":", "").lower()) + p_key = bytes.fromhex(aeskey.lower()) + self.aeskeys[p_mac] = p_key + # pylint: disable=unused-variable + ble_parser = BleParser(aeskeys=self.aeskeys) + sensor_msg, tracker_msg = ble_parser.parse_raw_data(data) + + assert sensor_msg["firmware"] == "Xiaomi (MiBeacon V5 encrypted)" + assert sensor_msg["type"] == "PTX-F1-Display" + assert sensor_msg["mac"] == "112233445566" + assert sensor_msg["packet"] == 51 + assert sensor_msg["data"] + assert sensor_msg["temperature"] == 25 + assert sensor_msg["rssi"] == -60 + assert sensor_msg["local_name"] == "" + + def test_Xiaomi_PTX_F1_Display_humidity(self): + """Test Xiaomi parser for PTX-F1-Display humidity.""" + self.aeskeys = {} + data_string = "043E29020100006655443322111D020106191695FE5859C56433665544332211D67B54550C01001D8F98BBC4".replace(" ", "") + data = bytes(bytearray.fromhex(data_string)) + + aeskey = "00112233445566778899aabbccddeeff" + + is_ext_packet = True if data[3] == 0x0D else False + mac = (data[8 if is_ext_packet else 7:14 if is_ext_packet else 13])[::-1] + mac_address = mac.hex() + p_mac = bytes.fromhex(mac_address.replace(":", "").lower()) + p_key = bytes.fromhex(aeskey.lower()) + self.aeskeys[p_mac] = p_key + # pylint: disable=unused-variable + ble_parser = BleParser(aeskeys=self.aeskeys) + sensor_msg, tracker_msg = ble_parser.parse_raw_data(data) + + assert sensor_msg["firmware"] == "Xiaomi (MiBeacon V5 encrypted)" + assert sensor_msg["type"] == "PTX-F1-Display" + assert sensor_msg["mac"] == "112233445566" + assert sensor_msg["packet"] == 51 + assert sensor_msg["data"] + assert sensor_msg["humidity"] == 39 + assert sensor_msg["rssi"] == -60 + assert sensor_msg["local_name"] == "" + def test_Xiaomi_XMPIRO2SXS(self): """Test Xiaomi parser for XMPIRO2SXS.""" self.aeskeys = {} diff --git a/docs/_devices/Xiaomi_PTX_F1_Display.md b/docs/_devices/Xiaomi_PTX_F1_Display.md new file mode 100644 index 00000000..9ea63b47 --- /dev/null +++ b/docs/_devices/Xiaomi_PTX_F1_Display.md @@ -0,0 +1,32 @@ +--- +manufacturer: Xiaomi +name: PTX F1 4-Button Wireless Switch (Display Version) +model: F1 +image: PTX_F1_Display.webp +physical_description: +broadcasted_properties: + - temperature + - humidity + - four btn switch 1 + - four btn switch 2 + - four btn switch 3 + - four btn switch 4 + - button switch + - rssi +broadcasted_property_notes: + - property: four btn switch 1 + note: always "toggle"; actual press type ('short press', 'double press', 'long press') is reported via the 'button switch' property + - property: four btn switch 2 + note: always "toggle"; actual press type ('short press', 'double press', 'long press') is reported via the 'button switch' property + - property: four btn switch 3 + note: always "toggle"; actual press type ('short press', 'double press', 'long press') is reported via the 'button switch' property + - property: four btn switch 4 + note: always "toggle"; actual press type ('short press', 'double press', 'long press') is reported via the 'button switch' property +broadcast_rate: +active_scan: +encryption_key: true +custom_firmware: +notes: + - Unable to retrieve the battery percentage right now. (need help!) + - The switch sensor state will return to `no press` after the time set with the [reset_timer](configuration_params#reset_timer) option. It is advised to change the reset time to 1 second (default = 35 seconds). +--- diff --git a/docs/assets/images/PTX_F1_Display.webp b/docs/assets/images/PTX_F1_Display.webp new file mode 100644 index 00000000..99d9408a Binary files /dev/null and b/docs/assets/images/PTX_F1_Display.webp differ diff --git a/tools/anonymize_mibeacon_v5.py b/tools/anonymize_mibeacon_v5.py new file mode 100644 index 00000000..3ee5b349 --- /dev/null +++ b/tools/anonymize_mibeacon_v5.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +"""Anonymize Xiaomi MiBeacon V4/V5 encrypted raw advertisements.""" + +from __future__ import annotations + +import argparse +from dataclasses import dataclass + +from Cryptodome.Cipher import AES + + +@dataclass +class AdStructure: + start: int + size: int + value: bytes + + +def _parse_ad_structures(raw: bytes) -> tuple[list[AdStructure], int, int, bool]: + if len(raw) < 4: + raise ValueError("Raw BLE HCI event is too short to determine advertisement packet type") + is_ext_packet = raw[3] == 0x0D + adpayload_start = 29 if is_ext_packet else 14 + if len(raw) < adpayload_start: + packet_type = "extended" if is_ext_packet else "legacy" + raise ValueError(f"Raw {packet_type} BLE HCI event is too short; expected at least {adpayload_start} bytes") + adpayload_size = raw[adpayload_start - 1] + if len(raw) < adpayload_start + adpayload_size: + raise ValueError( + f"Raw BLE HCI event is shorter than the advertised payload length ({adpayload_size} bytes)" + ) + structures: list[AdStructure] = [] + cursor = adpayload_start + remaining = adpayload_size + while remaining > 1: + adstruct_size = raw[cursor] + 1 + if adstruct_size <= 1 or adstruct_size > remaining: + break + chunk = raw[cursor:cursor + adstruct_size] + structures.append(AdStructure(cursor, adstruct_size, chunk)) + cursor += adstruct_size + remaining -= adstruct_size + return structures, adpayload_start, adpayload_size, is_ext_packet + + +def _extract_mac(raw: bytes, is_ext_packet: bool) -> bytes: + min_length = 14 if is_ext_packet else 13 + if len(raw) < min_length: + packet_type = "extended" if is_ext_packet else "legacy" + raise ValueError(f"Raw {packet_type} BLE HCI event is too short to contain a MAC address") + return (raw[8:14] if is_ext_packet else raw[7:13])[::-1] + + +def _calc_payload_start(service_data: bytes, mac: bytes) -> int: + if len(service_data) < 9: + raise ValueError("Xiaomi FE95 service data is too short; expected at least 9 bytes") + i = 9 + frame_control = service_data[4] + (service_data[5] << 8) + mac_include = (frame_control >> 4) & 1 + capability_include = (frame_control >> 5) & 1 + if mac_include: + i += 6 + if len(service_data) < i: + raise ValueError("Xiaomi FE95 service data is too short to contain the embedded MAC address") + embedded_mac = service_data[9:15][::-1] + if embedded_mac != mac: + raise ValueError("MAC in Xiaomi payload does not match advertisement MAC") + if capability_include: + i += 1 + if len(service_data) < i: + raise ValueError("Xiaomi FE95 service data is too short to contain the capability byte") + capability_types = service_data[i - 1] + if capability_types & 0x20: + i += 1 + if len(service_data) < i: + raise ValueError("Xiaomi FE95 service data is too short to contain the capability IO byte") + return i + + +def _decrypt_payload(service_data: bytes, mac: bytes, key: bytes) -> tuple[bytes, int]: + payload_start = _calc_payload_start(service_data, mac) + if len(service_data) < payload_start + 7: + raise ValueError("Xiaomi FE95 service data is too short to contain an encrypted payload and authentication tag") + nonce = b"".join([mac[::-1], service_data[6:9], service_data[-7:-4]]) + cipher = AES.new(key, AES.MODE_CCM, nonce=nonce, mac_len=4) + cipher.update(b"\x11") + plaintext = cipher.decrypt_and_verify(service_data[payload_start:-7], service_data[-4:]) + return plaintext, payload_start + + +def _encrypt_payload(service_data: bytearray, new_mac: bytes, new_key: bytes, payload_start: int, plaintext: bytes) -> bytes: + frame_control = service_data[4] + (service_data[5] << 8) + mac_include = (frame_control >> 4) & 1 + if mac_include: + service_data[9:15] = new_mac[::-1] + nonce = b"".join([new_mac[::-1], bytes(service_data[6:9]), bytes(service_data[-7:-4])]) + cipher = AES.new(new_key, AES.MODE_CCM, nonce=nonce, mac_len=4) + cipher.update(b"\x11") + encrypted = cipher.encrypt(plaintext) + token = cipher.digest() + rebuilt = bytearray(service_data) + rebuilt[payload_start:-7] = encrypted + rebuilt[-4:] = token + return bytes(rebuilt) + + +def anonymize_xiaomi_mibeacon_v5(raw_hex: str, real_key_hex: str, new_mac_hex: str, new_key_hex: str) -> dict[str, str]: + raw = bytearray.fromhex(raw_hex) + real_key = bytes.fromhex(real_key_hex) + new_key = bytes.fromhex(new_key_hex) + new_mac = bytes.fromhex(new_mac_hex) + structures, _, _, is_ext_packet = _parse_ad_structures(raw) + old_mac = _extract_mac(raw, is_ext_packet) + service_ad = next( + ( + item + for item in structures + if item.size > 4 and item.value[1] == 0x16 and item.value[2] == 0x95 and item.value[3] == 0xFE + ), + None, + ) + if service_ad is None: + raise ValueError("No Xiaomi FE95 service data found in raw advertisement") + service_data = service_ad.value + plaintext, payload_start = _decrypt_payload(service_data, old_mac, real_key) + encrypted_service_data = _encrypt_payload(bytearray(service_data), new_mac, new_key, payload_start, plaintext) + raw[service_ad.start:service_ad.start + service_ad.size] = encrypted_service_data + if is_ext_packet: + raw[8:14] = new_mac[::-1] + else: + raw[7:13] = new_mac[::-1] + verify_plaintext, _ = _decrypt_payload(bytes(encrypted_service_data), new_mac, new_key) + if verify_plaintext != plaintext: + raise ValueError("Verification failed: payload mismatch after re-encryption") + return { + "old_mac": old_mac.hex().upper(), + "new_mac": new_mac.hex().upper(), + "new_key": new_key.hex(), + "raw": bytes(raw).hex().upper(), + } + + +def main() -> None: + parser = argparse.ArgumentParser(description="Anonymize Xiaomi MiBeacon V4/V5 encrypted advertisements") + parser.add_argument("--raw", required=True, help="Raw BLE HCI event hex string") + parser.add_argument("--key", required=True, help="Original 16-byte AES key hex") + parser.add_argument("--new-mac", default="112233445566", help="Anonymized MAC hex, default: 112233445566") + parser.add_argument( + "--new-key", + default="00112233445566778899aabbccddeeff", + help="Anonymized 16-byte AES key hex, default: 00112233445566778899aabbccddeeff", + ) + args = parser.parse_args() + result = anonymize_xiaomi_mibeacon_v5(args.raw, args.key, args.new_mac, args.new_key) + print(f"old_mac={result['old_mac']}") + print(f"new_mac={result['new_mac']}") + print(f"new_key={result['new_key']}") + print(f"raw={result['raw']}") + + +if __name__ == "__main__": + main()