Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions roborock/data/b01_q10/b01_q10_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
automatically update objects from raw device responses.
"""

import base64
from dataclasses import dataclass, field

from ..containers import RoborockBase
Expand All @@ -23,6 +24,14 @@
YXWaterLevel,
)

_Q10_CUSTOMER_CLEAN_COUNT_SIZE = 1
_Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE = 26
_Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE = 20
_Q10_CUSTOMER_CLEAN_VERTEX_SIZE = 4
_Q10_CUSTOMER_CLEAN_NAME_MAX_LENGTH = _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE - 1
_Q10_SENTINEL_U8 = 0xFF
_Q10_SENTINEL_U16 = 0xFFFF


@dataclass
class dpCleanRecord(RoborockBase):
Expand Down Expand Up @@ -91,6 +100,185 @@ class dpTimeZone(RoborockBase):
time_zone_sec: int | None = None


@dataclass
class Q10RoomVertex(RoborockBase):
x_raw: int
y_raw: int
x: float
y: float


@dataclass
class Q10RoomConfig(RoborockBase):
index: int
room_id: int
room_type: int
Comment thread
lboue marked this conversation as resolved.
clean_order: int | None
clean_count: int
clean_type: int | None
fan_level: int | None
water_level: int | None
material: int | None
clean_line: int | None
raw_room_name: str
vertices_num: int
vertices: list[Q10RoomVertex] = field(default_factory=list)

@property
def room_name(self) -> str:
"""Return a normalized user-facing room name."""
return normalize_q10_room_name(self.raw_room_name)


@dataclass
class Q10RoomsConfig(RoborockBase):
raw_length: int = 0
declared_count: int = 0
rooms: list[Q10RoomConfig] = field(default_factory=list)

@property
def parsed_count(self) -> int:
return len(self.rooms)


def parse_customer_clean_payload(payload_b64: str) -> Q10RoomsConfig:
Comment thread
lboue marked this conversation as resolved.
"""Parse CUSTOMER_CLEAN payload.

Layout:
- 1 byte: room count
- For each room:
- 26-byte room metadata block
- 20-byte room name block
- 1 byte vertex count followed by 4 bytes per vertex (x, y as u16)

Example room bytes:
- metadata starts with room id and room type (e.g. ``00 2A 07`` for room id 42)
- name block starts with length, then UTF-8 bytes (e.g. ``0E rr_living_room``)
- each vertex is ``x_hi x_lo y_hi y_lo``
"""
raw = base64.b64decode(payload_b64)
if not raw:
return Q10RoomsConfig()

count = raw[0]
if not _validate_customer_clean_layout(raw, count):
return Q10RoomsConfig(raw_length=len(raw), declared_count=count, rooms=[])

offset = _Q10_CUSTOMER_CLEAN_COUNT_SIZE
rooms: list[Q10RoomConfig] = []

for index in range(count):
room_block = raw[offset : offset + _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE]
room_id = int.from_bytes(room_block[0:2], "big", signed=False)
room_type = room_block[2]
clean_order = _u16_to_optional(int.from_bytes(room_block[3:5], "big", signed=False))
clean_count = int.from_bytes(room_block[5:7], "big", signed=False)
clean_type = _u8_to_optional(room_block[7])
fan_level = _u8_to_optional(room_block[8])
water_level = _u8_to_optional(room_block[9])
material = _u8_to_optional(room_block[10])
clean_line = _u8_to_optional(room_block[11])
offset += _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE

room_name_bytes = raw[offset : offset + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE]
raw_room_name = _decode_q10_room_name(room_name_bytes)
offset += _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE

vertices_num = raw[offset]
offset += 1

vertices: list[Q10RoomVertex] = []
for _ in range(vertices_num):
x_raw = int.from_bytes(raw[offset : offset + 2], "big", signed=False)
offset += 2
y_raw = int.from_bytes(raw[offset : offset + 2], "big", signed=False)
offset += 2
vertices.append(Q10RoomVertex(x_raw=x_raw, y_raw=y_raw, x=x_raw / 10.0, y=y_raw / 10.0))

rooms.append(
Q10RoomConfig(
index=index,
room_id=room_id,
room_type=room_type,
clean_order=clean_order,
clean_count=clean_count,
clean_type=clean_type,
fan_level=fan_level,
water_level=water_level,
material=material,
clean_line=clean_line,
raw_room_name=raw_room_name,
vertices_num=vertices_num,
vertices=vertices,
)
)

return Q10RoomsConfig(raw_length=len(raw), declared_count=count, rooms=rooms)


def normalize_q10_room_name(room_name: str) -> str:
Comment thread
lboue marked this conversation as resolved.
"""Normalize room names reported by the Q10 firmware.

Strips leading/trailing whitespace, then converts firmware-prefixed names
(``rr_<slug>``) into title-cased human-readable strings.

Examples::

normalize_q10_room_name("rr_living_room") # "Living Room"
normalize_q10_room_name("rr_entrance_hall") # "Entrance Hall"
normalize_q10_room_name("Kitchen") # "Kitchen"
normalize_q10_room_name("rr_") # "rr_" (no slug after prefix)
"""
raw_name = room_name.strip()
if not raw_name:
return raw_name
if raw_name.startswith("rr_"):
normalized = raw_name[3:].replace("_", " ").strip()
if not normalized:
return raw_name
return " ".join(part.capitalize() for part in normalized.split())
return raw_name


def _validate_customer_clean_layout(raw: bytes, count: int) -> bool:
"""Ensure the full payload can be parsed before building room objects."""
offset = _Q10_CUSTOMER_CLEAN_COUNT_SIZE
for _ in range(count):
minimum_bytes = (
_Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE + _Q10_CUSTOMER_CLEAN_COUNT_SIZE
)
if offset + minimum_bytes > len(raw):
return False

vertices_num = raw[offset + _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE]
offset += minimum_bytes
vertex_bytes = vertices_num * _Q10_CUSTOMER_CLEAN_VERTEX_SIZE
if offset + vertex_bytes > len(raw):
return False
offset += vertex_bytes

return True


def _u8_to_optional(value: int) -> int | None:
if value == _Q10_SENTINEL_U8:
return None
return value


def _u16_to_optional(value: int) -> int | None:
if value == _Q10_SENTINEL_U16:
return None
return value


def _decode_q10_room_name(room_name_bytes: bytes) -> str:
name_len = room_name_bytes[0]
if 0 < name_len <= _Q10_CUSTOMER_CLEAN_NAME_MAX_LENGTH:
return room_name_bytes[1 : 1 + name_len].decode("utf-8", errors="replace")
return room_name_bytes[1:].split(b"\x00", 1)[0].decode("utf-8", errors="replace")


@dataclass
class Q10Status(RoborockBase):
"""Core vacuum status for Q10 devices.
Expand Down
7 changes: 7 additions & 0 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .map import MapContentTrait
from .network_info import NetworkInfoTrait
from .remote import RemoteTrait
from .rooms import RoomsTrait
from .status import StatusTrait
from .vacuum import VacuumTrait
from .volume import SoundVolumeTrait
Expand All @@ -32,6 +33,7 @@
"DustCollectionTrait",
"MapContentTrait",
"NetworkInfoTrait",
"RoomsTrait",
"SoundVolumeTrait",
"StatusTrait",
]
Expand Down Expand Up @@ -78,12 +80,16 @@ class Q10PropertiesApi(Trait):
map: MapContentTrait
"""Trait for fetching the current parsed map (image + rooms)."""

rooms: RoomsTrait
"""Trait for reading room configuration from Q10 devices."""

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.command = CommandTrait(channel)
self.vacuum = VacuumTrait(self.command)
self.remote = RemoteTrait(self.command)
self.rooms = RoomsTrait(self.command)
self.status = StatusTrait()
self.volume = SoundVolumeTrait(self.command)
self.child_lock = ChildLockTrait(self.command)
Expand Down Expand Up @@ -148,6 +154,7 @@ def _handle_message(self, message: Q10Message) -> None:
# only updates the fields that it is responsible for.
for trait in self._updatable_traits:
trait.update_from_dps(message.dps)
self.rooms.update_from_dps(message.dps)


def create(channel: MqttChannel) -> Q10PropertiesApi:
Expand Down
73 changes: 73 additions & 0 deletions roborock/devices/traits/b01/q10/rooms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Rooms trait for Q10 B01 devices."""

import logging
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.b01_q10.b01_q10_containers import (
Q10RoomConfig,
Q10RoomsConfig,
parse_customer_clean_payload,
)
from roborock.devices.traits.common import TraitUpdateListener

from .command import CommandTrait

_LOGGER = logging.getLogger(__name__)


class RoomsTrait(Q10RoomsConfig, TraitUpdateListener):
"""Trait for managing Q10 room configuration."""

def __init__(self, command: CommandTrait) -> None:
super().__init__()
TraitUpdateListener.__init__(self, logger=_LOGGER)
self._command = command

async def refresh(self) -> None:
"""Request the current room configuration from the device."""
await self._command.send(B01_Q10_DP.COMMON, params={B01_Q10_DP.CUSTOMER_CLEAN_REQUEST.code: 0})

@property
def room_map(self) -> dict[int, Q10RoomConfig]:
"""Return the current room configurations keyed by room id."""
return {room.room_id: room for room in self.rooms}

@property
def room_names(self) -> dict[int, str]:
"""Return the current room names keyed by room id."""
return {room.room_id: room.room_name for room in self.rooms}

def get_room(self, room_id: int) -> Q10RoomConfig | None:
"""Return a room configuration by room id, if known."""
return self.room_map.get(int(room_id))

def get_room_name(self, room_id: int, default: str | None = None) -> str | None:
"""Return a room name by room id, optionally with a default."""
room = self.get_room(room_id)
if room is None:
return default
return room.room_name

def update_from_dps(self, decoded_dps: dict[B01_Q10_DP, Any]) -> None:
"""Update the trait from raw DPS data."""
payload = decoded_dps.get(B01_Q10_DP.CUSTOMER_CLEAN)
if not isinstance(payload, str):
return

try:
parsed = parse_customer_clean_payload(payload)
except Exception:
_LOGGER.debug("Failed to parse CUSTOMER_CLEAN payload", exc_info=True)
return

changed = (
self.raw_length != parsed.raw_length
or self.declared_count != parsed.declared_count
or self.rooms != parsed.rooms
)
self.raw_length = parsed.raw_length
self.declared_count = parsed.declared_count
self.rooms = parsed.rooms
if changed:
self._notify_update()
Loading
Loading