diff --git a/controller/python/controller.py b/controller/python/controller.py index dca5074e409e0a2afe1ce0fdb2ea347d1af19a91..60750a8bddf7e6ef3d46c1b7a8b6fe1bfa960177 100644 --- a/controller/python/controller.py +++ b/controller/python/controller.py @@ -1,6 +1,6 @@ import smbus import struct -from typing import Optional +from typing import Any, Optional class Controller: @@ -12,30 +12,43 @@ class Controller: MAX_MOTOR_PERCENTAGE = 0x11 MOTOR_SPEED = 0x30 ENCODER_TICKS = 0x32 + STATUS = 0x36 def __init__(self): self.i2c = smbus.SMBus(self.I2C_BUS) + def _read(self, command, n, unpack_spec) -> tuple: + return struct.unpack( + unpack_spec, + bytes(self.i2c.read_i2c_block_data(self.I2C_ADDR, command, n)), + ) + + def _write(self, command: int, data: list[int]): + self.i2c.write_i2c_block_data(self.I2C_ADDR, command, data) + def who_am_i(self) -> int: """Check that the motors controller board is present. This should return the same value as Controller.I2C_ADDR.""" - return self.i2c.read_i2c_block_data(self.I2C_ADDR, self.WHO_AM_I, 1)[0] + return self._read(self.WHO_AM_I, 1, "B") def set_max_percentage(self, percent: int): """Set the maximum percentage of power which will be used (between 1 and 100) - for further speed instructions. This has no effect on an running command.""" + for further speed instructions. This has no effect on an running command. + """ if percent <= 0 or percent > 100: raise ValueError - self.i2c.write_i2c_block_data( - self.I2C_ADDR, self.MAX_MOTOR_PERCENTAGE, [percent] - ) + self._write(self.MAX_MOTOR_PERCENTAGE, [percent]) + def get_max_percentage(self) -> int: + """Get the maximum percentage of power which will be used (between 1 and 100).""" + return self._read(self.MAX_MOTOR_PERCENTAGE, 1, "B")[0] + def set_motor_speed(self, left: Optional[int], right: Optional[int]): """Set the motor speed between -100 and 100. None means not to change the motor value. Using None for both motors will put the controller board in standby mode and motors will stop.""" - def convert(v, arg): + def convert(v: Optional[int], arg: str): if v is None: return -128 if not isinstance(v, int) or v < -100 or v > 100: @@ -44,12 +57,18 @@ class Controller: ) return v - self.i2c.write_i2c_block_data( - self.I2C_ADDR, + self._write( self.MOTOR_SPEED, - list(struct.pack("bb", convert(left, "left"), convert(right, "right"))), + list( + struct.pack( + "bb", convert(left, "left"), convert(right, "right") + ) + ), ) + def get_motor_speed(self) -> tuple[int, int]: + return self._read(self.MOTOR_SPEED, 2, "bb") + def set_left_motor_speed(self, speed: int): """Set the left motor speed between -127 and 127.""" self.set_motor_speed(speed, None) @@ -62,11 +81,15 @@ class Controller: """Stop the motors by putting the controller board in standby mode.""" self.set_motor_speed(None, None) - def get_encoder_ticks(self) -> (int, int): + def get_encoder_ticks(self) -> tuple[int, int]: """Retrieve the encoder ticks since the last time it was queried. The ticks must be retrieved before they overflow a 2 byte signed integer (-32768..32767) or the result will make no sense. Return a pair with left and right data.""" - return struct.unpack( - "hh", - bytes(self.i2c.read_i2c_block_data(self.I2C_ADDR, self.ENCODER_TICKS, 4)), - ) + return self._read(self.ENCODER_TICKS, 4, "hh") + + def get_status(self) -> dict[str, Any]: + """Return a dict with various status fields: + - "moving": True if at least one motor is moving, False otherwise + """ + (status,) = self._read(self.STATUS, 1, "?") + return {"moving": (status & 1) != 0} diff --git a/controller/src/logic.rs b/controller/src/logic.rs index 23cc17d8dcf2b059072263ac18296890a2920c38..3168fa972ae2f50a09f942e35cbbb420b1cbc4b8 100644 --- a/controller/src/logic.rs +++ b/controller/src/logic.rs @@ -48,6 +48,7 @@ const WHO_AM_I: u8 = 0x0f; const MAX_MOTOR_PERCENTAGE: u8 = 0x11; const MOTOR_SPEED: u8 = 0x30; const ENCODER_TICKS: u8 = 0x32; +const STATUS: u8 = 0x36; fn handle_command(command: &[u8], response: &mut Deque<u8, MESSAGE_SIZE>) { #[allow(static_mut_refs)] @@ -55,7 +56,7 @@ fn handle_command(command: &[u8], response: &mut Deque<u8, MESSAGE_SIZE>) { defmt::trace!("Processing command {:?}", command); match command { [WHO_AM_I, ..] => { - let _ = response.push_back(0x57); + response.push_back(0x57).unwrap(); } &[MAX_MOTOR_PERCENTAGE, p] => { if (1..=100).contains(&p) { @@ -65,7 +66,7 @@ fn handle_command(command: &[u8], response: &mut Deque<u8, MESSAGE_SIZE>) { } } [MAX_MOTOR_PERCENTAGE] => { - let _ = response.push_back(state.max_motor_percentage); + response.push_back(state.max_motor_percentage).unwrap(); } [MOTOR_SPEED, ms @ ..] if ms.len() == 2 => { let (max_pwm, mmp) = ( @@ -101,17 +102,20 @@ fn handle_command(command: &[u8], response: &mut Deque<u8, MESSAGE_SIZE>) { } } [MOTOR_SPEED] => { - for &s in &state.motor_speed { - let _ = response.push_back(s); - } + response.push_back(state.motor_speed[0]).unwrap(); + response.push_back(state.motor_speed[1]).unwrap(); } [ENCODER_TICKS] => { let (left, right) = state.encoders.ticks(); let (left, right) = (left.to_le_bytes(), right.to_le_bytes()); - let _ = response.push_back(left[0]); - let _ = response.push_back(left[1]); - let _ = response.push_back(right[0]); - let _ = response.push_back(right[1]); + response.push_back(left[0]).unwrap(); + response.push_back(left[1]).unwrap(); + response.push_back(right[0]).unwrap(); + response.push_back(right[1]).unwrap(); + } + [STATUS] => { + let moving = (state.motor_speed != [0, 0]) as u8; + response.push_back(moving << 0).unwrap(); } &[_, ..] => { defmt::warn!("unknown command or args {:#04x}", command);