Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/bt_api_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def main(args=None):
+ "]"
)

hw_info = shim_dev.get_device_hardware_version()
print(f"- hardware: [{hw_info.name}]")

shim_dev.add_stream_callback(stream_cb)

shim_dev.start_streaming()
Expand Down
8 changes: 6 additions & 2 deletions pyshimmer/bluetooth/bt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
ChannelDataType,
EChannelType,
ESensorGroup,
set_active_dtype_assignment,
)
from pyshimmer.dev.exg import ExGRegister
from pyshimmer.dev.fw_version import (
Expand Down Expand Up @@ -384,7 +385,9 @@ def initialize(self) -> None:
"""
self._thread.start()
self._set_fw_capabilities()

# Select the active channel dtype map for the detected hardware.
set_active_dtype_assignment(self._hw_version)

if self.capabilities.supports_ack_disable and self._disable_ack:
self.set_status_ack(enabled=False)

Expand Down Expand Up @@ -604,7 +607,8 @@ def get_inquiry(self) -> tuple[float, int, list[EChannelType]]:
- The active data channels of the device as list, does not include the
TIMESTAMP channel
"""
return self._process_and_wait(InquiryCommand())
cmd = InquiryCommand(self._hw_version) # pass HW so the command knows header length
return self._process_and_wait(cmd)

def get_data_types(self):
"""Get the active data channels of the device
Expand Down
10 changes: 8 additions & 2 deletions pyshimmer/bluetooth/bt_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,9 @@ class InquiryCommand(ResponseCommand):
channels
"""

def __init__(self):
def __init__(self, hw_version: HardwareVersion | None = None):
super().__init__(INQUIRY_RESPONSE)
self._hw_version = hw_version

@staticmethod
def decode_channel_types(ct_bin: bytes) -> list[EChannelType]:
Expand All @@ -412,9 +413,14 @@ def decode_channel_types(ct_bin: bytes) -> list[EChannelType]:
def send(self, ser: BluetoothSerial) -> None:
ser.write_command(INQUIRY_COMMAND)

def _is_shimmer3r(self) -> bool:
return self._hw_version == HardwareVersion.SHIMMER3R


def receive(self, ser: BluetoothSerial) -> any:
fmt = "<HI3xBB" if self._is_shimmer3r() else "<HIBB"
sr_val, _, n_ch, buf_size = ser.read_response(
INQUIRY_RESPONSE, arg_format="<HIBB"
INQUIRY_RESPONSE, arg_format=fmt
)
channel_conf = ser.read(n_ch)

Expand Down
63 changes: 62 additions & 1 deletion pyshimmer/dev/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections.abc import Iterable
from enum import Enum, auto, unique

from pyshimmer.dev.fw_version import HardwareVersion
from pyshimmer.util import raise_to_next_pow, unpack, flatten_list, bit_is_set


Expand Down Expand Up @@ -355,7 +356,7 @@ class ESensorGroup(Enum):
"""
Assigns each channel type its appropriate data type.
"""
ChDataTypeAssignment: dict[EChannelType, ChannelDataType] = {
ChDataTypeAssignmentShimmer3: dict[EChannelType, ChannelDataType] = {
EChannelType.ACCEL_LN_X: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_LN_Y: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_LN_Z: ChannelDataType(2, signed=True, le=True),
Expand Down Expand Up @@ -400,6 +401,58 @@ class ESensorGroup(Enum):
EChannelType.TIMESTAMP: ChannelDataType(3, signed=False, le=True),
}

"""
Assigns each channel type its appropriate data type.
"""
ChDataTypeAssignmentShimmer3R: dict[EChannelType, ChannelDataType] = {
EChannelType.ACCEL_LN_X: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_LN_Y: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_LN_Z: ChannelDataType(2, signed=True, le=True),
EChannelType.VBATT: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_WR_X: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_WR_Y: ChannelDataType(2, signed=True, le=True),
EChannelType.ACCEL_WR_Z: ChannelDataType(2, signed=True, le=True),
EChannelType.MAG_REG_X: ChannelDataType(2, signed=True, le=True),
EChannelType.MAG_REG_Y: ChannelDataType(2, signed=True, le=True),
EChannelType.MAG_REG_Z: ChannelDataType(2, signed=True, le=True),
EChannelType.GYRO_X: ChannelDataType(2, signed=True, le=True),
EChannelType.GYRO_Y: ChannelDataType(2, signed=True, le=True),
EChannelType.GYRO_Z: ChannelDataType(2, signed=True, le=True),
EChannelType.EXTERNAL_ADC_A0: ChannelDataType(2, signed=False, le=True),
EChannelType.EXTERNAL_ADC_A1: ChannelDataType(2, signed=False, le=True),
EChannelType.EXTERNAL_ADC_A2: ChannelDataType(2, signed=False, le=True),
EChannelType.INTERNAL_ADC_A3: ChannelDataType(2, signed=False, le=True),
EChannelType.INTERNAL_ADC_A0: ChannelDataType(2, signed=False, le=True),
EChannelType.INTERNAL_ADC_A1: ChannelDataType(2, signed=False, le=True),
EChannelType.INTERNAL_ADC_A2: ChannelDataType(2, signed=False, le=True),
EChannelType.ACCEL_HG_X: None,
EChannelType.ACCEL_HG_Y: None,
EChannelType.ACCEL_HG_Z: None,
EChannelType.MAG_WR_X: None,
EChannelType.MAG_WR_Y: None,
EChannelType.MAG_WR_Z: None,
EChannelType.TEMPERATURE: ChannelDataType(2, signed=False, le=False),
EChannelType.PRESSURE: ChannelDataType(3, signed=False, le=False),
EChannelType.GSR_RAW: ChannelDataType(2, signed=False, le=True),
EChannelType.EXG1_STATUS: ChannelDataType(1, signed=False, le=True),
EChannelType.EXG1_CH1_24BIT: ChannelDataType(3, signed=True, le=False),
EChannelType.EXG1_CH2_24BIT: ChannelDataType(3, signed=True, le=False),
EChannelType.EXG2_STATUS: ChannelDataType(1, signed=False, le=True),
EChannelType.EXG2_CH1_24BIT: ChannelDataType(3, signed=True, le=False),
EChannelType.EXG2_CH2_24BIT: ChannelDataType(3, signed=True, le=False),
EChannelType.EXG1_CH1_16BIT: ChannelDataType(2, signed=True, le=False),
EChannelType.EXG1_CH2_16BIT: ChannelDataType(2, signed=True, le=False),
EChannelType.EXG2_CH1_16BIT: ChannelDataType(2, signed=True, le=False),
EChannelType.EXG2_CH2_16BIT: ChannelDataType(2, signed=True, le=False),
EChannelType.STRAIN_HIGH: ChannelDataType(2, signed=False, le=True),
EChannelType.STRAIN_LOW: ChannelDataType(2, signed=False, le=True),
EChannelType.TIMESTAMP: ChannelDataType(3, signed=False, le=True),
}

ChDataTypeAssignmentShimmer: dict[EChannelType, ChannelDataType] = dict(ChDataTypeAssignmentShimmer3)

ChDataTypeAssignment = ChDataTypeAssignmentShimmer

"""
This dictionary contains the mapping from sensor to data channels. Since one sensor can
record on multiple channels, the mapping is one-to-many.
Expand Down Expand Up @@ -529,6 +582,14 @@ class ESensorGroup(Enum):
ENABLED_SENSORS_LEN = 0x03
SENSOR_DTYPE = ChannelDataType(size=ENABLED_SENSORS_LEN, signed=False, le=True)

def set_active_dtype_assignment(hw: HardwareVersion | None) -> None:
"""
Mutate the active dtype map in-place to match the given hardware.
Using in-place mutation ensures modules that imported the dict earlier see the update.
"""
target = ChDataTypeAssignmentShimmer3R if hw == HardwareVersion.SHIMMER3R else ChDataTypeAssignmentShimmer3
ChDataTypeAssignmentShimmer.clear()
ChDataTypeAssignmentShimmer.update(target)

def get_enabled_channels(sensors: list[ESensorGroup]) -> list[EChannelType]:
"""Determine the set of data channels for a set of enabled sensors
Expand Down
26 changes: 26 additions & 0 deletions test/bluetooth/test_bt_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,32 @@ def test_inquiry_command(self):
self.assertEqual(buf_size, 1)
self.assertEqual(ctypes, [EChannelType.INTERNAL_ADC_A1])

def test_inquiry_command_shimmer3r(self):
# INQUIRY response layout for 3R:
# 0x02 (resp code)
# sr_val = 0x0040
# misc = 0x0901FF01 (LE bytes: 01 FF 01 09)
# + 3 extra bytes (skipped by parser)
# n_ch = 0x01
# buf = 0x01
# channels[0] = 0x12 (INTERNAL_ADC_A1)
resp_3r = (
b"\x02" # INQUIRY_RESPONSE
b"\x40\x00" # sr_val (0x0040 = 512 Hz after dr2sr)
b"\x01\xff\x01\x09" # misc (LE)
b"\x00\x00\x00" # the 3 extra bytes for Shimmer3R
b"\x01" # n_ch
b"\x01" # buf_size
b"\x00" # channel id
)

cmd = InquiryCommand(HardwareVersion.SHIMMER3R)
sr, buf_size, ctypes = self.assert_cmd(cmd, b"\x01", b"\x02", resp_3r)

self.assertEqual(sr, 512.0)
self.assertEqual(buf_size, 1)
self.assertEqual(ctypes, [EChannelType.ACCEL_LN_X]) # 0x01 = ACCEL_LN_X

def test_start_streaming_command(self):
cmd = StartStreamingCommand()
self.assert_cmd(cmd, b"\x07")
Expand Down