Skip to content

Developer Guide

Architecture Overview

src/
├── main.py                 # FastAPI application entry point
├── schemas.py             # Pydantic response models
├── core/                  # Core business logic
│   ├── config_loader.py   # Configuration management
│   ├── service_manager.py # Service orchestration
│   ├── models/            # Data structures and models
│   ├── services/          # Microservices (sensor, serial, test)
│   └── processing/        # Data processing pipeline
└── routers/               # API endpoints
    ├── api.py            # Main router aggregation
    ├── sensor.py         # Sensor operations
    ├── test.py           # Test management
    ├── history.py        # Historical data
    └── graphique.py      # Data visualization

Core Services

Sensor Manager

SensorManager

Manages sensor data acquisition from serial ports (via EventHub) or emulation.

Source code in src/core/services/sensor_manager.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class SensorManager:
    """
    Manages sensor data acquisition from serial ports (via EventHub) or emulation.
    """
    def __init__(self):
        self.running = False
        self.emulated_sensors: list[SensorId] = []
        self.sensors: list[SensorData] = [SensorData(0.0, id, math.nan, math.nan) for id in SensorId]
        self._emulation_task: Optional[asyncio.Task] = None
        self.offsets: list[float] = [0.0 for _ in SensorId]
        self._serial_handlers: list[SerialHandler] = []
        self.queue: asyncio.Queue[tuple[SensorId, str, float]] = asyncio.Queue(maxsize=1024)
        self._sensors_task: SensorsTask = SensorsTask(self.queue)
        self.notify_funcs: list[Callable[[SensorData], None]] = []
        self.zero_requests: dict[SensorId, int] = {}
        self.sensor_ports: list[str] = [""] * len(SensorId)
        arc_config = config_loader.get_sensor_config(SensorId.ARC)
        self.arc_sensor_dependencies: list[SensorId] = []
        if isinstance(arc_config, calculatedConfigSensorData):
            self.arc_sensor_dependencies: list[SensorId] = [dep.id for dep in arc_config.dependencies]

        self.add_write_func(self._on_serial_data)

    def start(self, emulated_sensors: Optional[list[SensorId]] = None, sensor_ports: Optional[Dict[SensorId, tuple[str, int]]] = None):
        """Start sensor data acquisition."""
        emulated_sensors = emulated_sensors or []
        if self.running:
            if set(self.emulated_sensors) != set(emulated_sensors):
                self.stop()
            else:
                return

        self.emulated_sensors = emulated_sensors
        self.running = True
        logger.info(f"SensorManager started (Emulation: {[s.name for s in emulated_sensors]})")

        if self.emulated_sensors:
            self._emulation_task = asyncio.create_task(self._emulation_loop())

        if sensor_ports is None and not self.emulated_sensors:
            logger.warning("No sensor ports provided and no emulation requested.")

        if sensor_ports is not None:
            # Launch a SensorTask for each real sensor NOT in emulated_sensors
            for sensor_id, (port, baud) in sensor_ports.items():
                if sensor_id in self.emulated_sensors:
                    continue
                if port == "":
                    logger.warning(f"Sensor {sensor_id} has no assigned port, skipping...")
                    continue
                self.sensor_ports[sensor_id.value] = port
                full_port = PORT_PREFIX + port
                serial_handler = SerialHandler(sensor_id=sensor_id, port=full_port, queue=self.queue, baudrate=baud, serial_timeout=0.5)

                serial_handler.start()
                self._serial_handlers.append(serial_handler)

        self._sensors_task.start()

    def set_mode(self, emulated_sensors: list[SensorId]):
        """Set the operation mode (emulation or real hardware)."""
        if self.running:
            self.stop()
            self.start(emulated_sensors=emulated_sensors)
        else:
            self.emulated_sensors = emulated_sensors

    def is_sensor_connected(self, sensor_id: SensorId) -> bool:
        """
        Check if a sensor is currently connected.
        Handles ARC and emulation logic.
        """
        # Emulation mode: enabled in config = connected
        if sensor_id in self.emulated_sensors:

            return config_loader.is_sensor_enabled(sensor_id)

        # Hardware mode: check if we have a running SensorTask
        if sensor_id == SensorId.ARC:
            for sensor in self.arc_sensor_dependencies:
                if not self.is_sensor_connected(sensor):
                    return False
            return True

        port = self.sensor_ports[sensor_id.value]
        if not port:
            return False

        return os.path.exists(PORT_PREFIX + port)

    def stop(self):
        """Stop sensor data acquisition."""
        self.running = False
        if self._emulation_task:
            self._emulation_task.cancel()
            self._emulation_task = None
        # Stop all serial handlers
        for handler in self._serial_handlers:
            handler.stop()
        self._serial_handlers.clear()
        self._sensors_task.stop()
        logger.info("SensorManager stopped")

    async def _emulation_loop(self):
        start_time = time.time()
        while self.running and self.emulated_sensors:
            await self._emulate_data(start_time)
            await asyncio.sleep(0.1) # Rate limit

    def _on_serial_data(self, sensorId: SensorId, time_val: float, line: str):
        """Process a line of serial data for a given sensor ID and timestamp."""
        try:
            if sensorId == SensorId.FORCE:
                self._parse_force(sensorId, time_val, line)

            elif (sensorId == SensorId.DISP_1 or
                  sensorId == SensorId.DISP_2 or
                  sensorId == SensorId.DISP_3 or
                  sensorId == SensorId.DISP_4 or
                  sensorId == SensorId.DISP_5):
                self._parse_motion(sensorId, time_val, line)

        except Exception as e:
            logger.warning(f"Error parsing line: {line} -> {e}")

    def set_zero(self, sensor_id: SensorId):
        """Manually zero a sensor by updating its offset."""
        if sensor_id in SensorId:
            self.zero_requests[sensor_id] = 3

    def _parse_force(self, sensorId: SensorId, time: float, line: str):
        """Parse a line of serial data for the FORCE sensor."""
        # ASC2 20945595 -165341 -1.527986e-01 -4.965955e+01 -0.000000e+00
        if not line or "ASC2" not in line:
            return

        parts = line.split()
        if len(parts) >= 5:
            try:
                val = float(parts[4]) # Calibrated value
                self._notify(sensorId, time, val)
            except ValueError:
                pass

    def _parse_motion(self, sensorId: SensorId, time: float, line: str):
        """Parse a line of serial data for the DISP sensors."""
        # 76 144 262 us SPC_VAL usSenderId=0x2E01 ulMicros=76071216 Val=0.000
        if not line or "SPC_VAL" not in line:
            return

        parts = line.split()
        sender_id = None
        val = None
        sending_timestamp_str = ""
        sending_timestamp = math.nan
        us_seen = False
        request_timestamp = math.nan
        for part in parts:
            if not us_seen:
                if part.endswith("us"):
                    sending_timestamp_str += part[:-2]
                    try:
                        sending_timestamp = float(sending_timestamp_str) / 1e6
                    except ValueError:
                        pass
                    us_seen = True
                else:
                    sending_timestamp_str += part
            elif part.startswith("usSenderId="):
                sender_id = part.split("=")[1]
            elif part.startswith("ulMicros="):
                try:
                    request_timestamp = float(part.split("=")[1]) / 1e6
                except ValueError:
                    pass
            elif part.startswith("Val="):
                try:
                    val = float(part.split("=")[1])
                except ValueError:
                    pass

        if not math.isnan(sending_timestamp) and not math.isnan(request_timestamp):
            time = time - (sending_timestamp - request_timestamp)
        else:
            time = math.nan

        if sender_id and val is not None and time is not math.nan:
            #####################################################################################
            #                                                                                   #
            #                               !!!!! WARNING !!!!!                                 #
            # TODO: Remove this workaround once sensors stop sending zero values randomly       #
            #                                                                                   #
            #####################################################################################
            if val < 0.02:
                val = math.nan
                logger.warning(f" LINE: {line}, val: {val}, time: {time}, sender_id: {sender_id}")

            self._notify(sensorId, time, val)

    def _calculate_arc(self, data: SensorData):
        """Calculate ARC value based on DISP_1, DISP_2, and DISP_3 if the incoming data is from one of those sensors."""
        if (data.sensor_id in self.arc_sensor_dependencies):
            disp1 = self.sensors[SensorId.DISP_1.value]
            disp2 = self.sensors[SensorId.DISP_2.value]
            disp3 = self.sensors[SensorId.DISP_3.value]
            arc = self.sensors[SensorId.ARC.value]

            if(not math.isnan(disp1.value) and
               not math.isnan(disp2.value) and
               not math.isnan(disp3.value)):
                arc.raw_value = disp1.raw_value - (disp2.raw_value + disp3.raw_value) / 2
                arc.value = disp1.value - (disp2.value + disp3.value) / 2
                arc.offset = disp1.offset - (disp2.offset + disp3.offset) / 2
                arc.timestamp = data.timestamp
                self.sensors[SensorId.ARC.value] = arc

                return arc
        return None

    async def _emulate_data(self, start_time):
        elapsed = time.time() - start_time

        from core.config_loader import config_loader

        # Emulate Force (Sine wave) only if enabled
        if SensorId.FORCE in self.emulated_sensors and config_loader.is_sensor_enabled(SensorId.FORCE):
            force_val = 500 + 500 * math.sin(elapsed) + random.uniform(-10, 10)
            line = f"ASC2 {int(elapsed * 1e6)} -39696 -3.577285e-02 {force_val:.6e} -0.000000e+00"
            if not self.queue.full():
                await self.queue.put((SensorId.FORCE, line, time.time()))

        # Emulate Displacement (Linear + Noise) with per-sensor phase offsets to avoid overlap
        phase_offsets = {
            SensorId.DISP_1: 0.0,
            SensorId.DISP_2: 1.5,
            SensorId.DISP_3: 3.0,
            SensorId.DISP_4: 4.5,
            SensorId.DISP_5: 6.0,
        }

        for sensor_id, phase in phase_offsets.items():
            if sensor_id in self.emulated_sensors and config_loader.is_sensor_enabled(sensor_id):
                disp_val = (((elapsed + phase) * 0.1) % 10 + random.uniform(-0.05, 0.05)) * {
                    SensorId.DISP_1: 1.00, SensorId.DISP_2: 1.10, SensorId.DISP_3: 0.90,
                    SensorId.DISP_4: 1.20, SensorId.DISP_5: 0.80
                }[sensor_id]

                elapsed_us = int(elapsed * 1e6)
                line = f"{elapsed_us} us SPC_VAL usSenderId=0x2E01 ulMicros={elapsed_us} Val={disp_val:.3f}"
                if not self.queue.full():
                    await self.queue.put((sensor_id, line, time.time()))

    def _notify(self, sensor_id: SensorId, time: float, value: float):
        """Notify all registered functions with new sensor data, applying zeroing if requested."""
        if self.zero_requests.get(sensor_id, 0) > 0:
            if not math.isnan(value):
                self.offsets[sensor_id.value] = value
                self.zero_requests[sensor_id] = 0
                logger.info(f"Zeroed sensor {sensor_id}. New offset: {self.offsets[sensor_id.value]}")
            else:
                self.zero_requests[sensor_id] -= 1
                if self.zero_requests[sensor_id] == 0:
                    logger.warning(f"Failed to zero sensor {sensor_id} after 3 attempts with NaN values.")

        # Apply offset
        offset = self.offsets[sensor_id.value]
        corrected_value = value - offset

        # Publish raw value (before offset correction)
        data = SensorData(
            timestamp=time,
            sensor_id=sensor_id,
            value=corrected_value,
            raw_value=value,
            offset=offset
        )

        self.sensors[sensor_id.value] = data

        for func in self.notify_funcs:
            func(data)

        if self._calculate_arc(data) is not None:
            arc_data = self.sensors[SensorId.ARC.value]
            for func in self.notify_funcs:
                func(arc_data)

    def add_write_func(self, write_func: Callable[[SensorId, float, str], None]):
        """Add a write function to sensors task."""
        self._sensors_task.add_write_func(write_func)

    def add_func_notify(self, func: Callable[[SensorData], None]):
        """Add a function that will be called with new sensor data."""
        self.notify_funcs.append(func)

add_func_notify(func)

Add a function that will be called with new sensor data.

Source code in src/core/services/sensor_manager.py
312
313
314
def add_func_notify(self, func: Callable[[SensorData], None]):
    """Add a function that will be called with new sensor data."""
    self.notify_funcs.append(func)

add_write_func(write_func)

Add a write function to sensors task.

Source code in src/core/services/sensor_manager.py
308
309
310
def add_write_func(self, write_func: Callable[[SensorId, float, str], None]):
    """Add a write function to sensors task."""
    self._sensors_task.add_write_func(write_func)

is_sensor_connected(sensor_id)

Check if a sensor is currently connected. Handles ARC and emulation logic.

Source code in src/core/services/sensor_manager.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def is_sensor_connected(self, sensor_id: SensorId) -> bool:
    """
    Check if a sensor is currently connected.
    Handles ARC and emulation logic.
    """
    # Emulation mode: enabled in config = connected
    if sensor_id in self.emulated_sensors:

        return config_loader.is_sensor_enabled(sensor_id)

    # Hardware mode: check if we have a running SensorTask
    if sensor_id == SensorId.ARC:
        for sensor in self.arc_sensor_dependencies:
            if not self.is_sensor_connected(sensor):
                return False
        return True

    port = self.sensor_ports[sensor_id.value]
    if not port:
        return False

    return os.path.exists(PORT_PREFIX + port)

set_mode(emulated_sensors)

Set the operation mode (emulation or real hardware).

Source code in src/core/services/sensor_manager.py
80
81
82
83
84
85
86
def set_mode(self, emulated_sensors: list[SensorId]):
    """Set the operation mode (emulation or real hardware)."""
    if self.running:
        self.stop()
        self.start(emulated_sensors=emulated_sensors)
    else:
        self.emulated_sensors = emulated_sensors

set_zero(sensor_id)

Manually zero a sensor by updating its offset.

Source code in src/core/services/sensor_manager.py
146
147
148
149
def set_zero(self, sensor_id: SensorId):
    """Manually zero a sensor by updating its offset."""
    if sensor_id in SensorId:
        self.zero_requests[sensor_id] = 3

start(emulated_sensors=None, sensor_ports=None)

Start sensor data acquisition.

Source code in src/core/services/sensor_manager.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def start(self, emulated_sensors: Optional[list[SensorId]] = None, sensor_ports: Optional[Dict[SensorId, tuple[str, int]]] = None):
    """Start sensor data acquisition."""
    emulated_sensors = emulated_sensors or []
    if self.running:
        if set(self.emulated_sensors) != set(emulated_sensors):
            self.stop()
        else:
            return

    self.emulated_sensors = emulated_sensors
    self.running = True
    logger.info(f"SensorManager started (Emulation: {[s.name for s in emulated_sensors]})")

    if self.emulated_sensors:
        self._emulation_task = asyncio.create_task(self._emulation_loop())

    if sensor_ports is None and not self.emulated_sensors:
        logger.warning("No sensor ports provided and no emulation requested.")

    if sensor_ports is not None:
        # Launch a SensorTask for each real sensor NOT in emulated_sensors
        for sensor_id, (port, baud) in sensor_ports.items():
            if sensor_id in self.emulated_sensors:
                continue
            if port == "":
                logger.warning(f"Sensor {sensor_id} has no assigned port, skipping...")
                continue
            self.sensor_ports[sensor_id.value] = port
            full_port = PORT_PREFIX + port
            serial_handler = SerialHandler(sensor_id=sensor_id, port=full_port, queue=self.queue, baudrate=baud, serial_timeout=0.5)

            serial_handler.start()
            self._serial_handlers.append(serial_handler)

    self._sensors_task.start()

stop()

Stop sensor data acquisition.

Source code in src/core/services/sensor_manager.py
111
112
113
114
115
116
117
118
119
120
121
122
def stop(self):
    """Stop sensor data acquisition."""
    self.running = False
    if self._emulation_task:
        self._emulation_task.cancel()
        self._emulation_task = None
    # Stop all serial handlers
    for handler in self._serial_handlers:
        handler.stop()
    self._serial_handlers.clear()
    self._sensors_task.stop()
    logger.info("SensorManager stopped")

Serial Handler

SerialHandler

Source code in src/core/services/serial_handler.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class SerialHandler:
    def __init__(self, 
        sensor_id: SensorId,
        port: str,
        queue: asyncio.Queue[tuple[SensorId, str, float]],
        baudrate: int = 9600,
        serial_timeout: float = 0.01,
    ):

        self.baudrate = baudrate
        self.port = port
        self.timeout = serial_timeout
        self.serial = None
        self.sensor_id = sensor_id
        self.queue = queue
        self.running = False

    def start(self):
        """Start the serial handler by launching the read loop in an asynchronous task."""
        self.running = True
        asyncio.create_task(self.read_serial())

    def stop(self):
        """Stop the serial handler and close the serial port if open."""
        self.running = False
        if self.serial and self.serial.is_open:
            self.serial.close()

    async def read_serial(self):
        """Continuously read from the serial port, handle reconnections, and put data into the queue."""
        while self.running:
            try:
                if self.serial is None:
                    try:
                        self.serial = await asyncio.to_thread(
                            serial.Serial, port=self.port, baudrate=self.baudrate, timeout=self.timeout
                        )
                        logger.info(f"SerialHandler for {self.sensor_id.name} reconnected to {self.port} at {self.baudrate} baud.")

                        # Apply platform-specific optimizations
                        if platform.system() == "Linux":
                            optimize_linux_serial(self.serial)

                    except Exception as e:
                        logger.error(f"Failed to reconnect SerialHandler for {self.sensor_id.name} on {self.port}: {e}")
                        await asyncio.sleep(0.5)
                        continue

                line = await asyncio.to_thread(self.serial.readline)
                if line:
                    timestamp = time.time()

                    decoded_line = line.decode('utf-8', errors='ignore').strip()

                    if decoded_line:
                        if self.queue.full():
                            try:
                                self.queue.get_nowait()
                            except asyncio.QueueEmpty:
                                pass
                        await self.queue.put((self.sensor_id, decoded_line, timestamp))

            except Exception as e:
                logger.error(f"Error reading from serial port for {self.sensor_id}: {e}")
                if self.serial:
                    try:
                        self.serial.close()
                    except Exception as close_error:
                        logger.error(f"Error closing serial port for {self.sensor_id}: {close_error}")
                    self.serial = None
                await asyncio.sleep(1.0)

read_serial() async

Continuously read from the serial port, handle reconnections, and put data into the queue.

Source code in src/core/services/serial_handler.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
async def read_serial(self):
    """Continuously read from the serial port, handle reconnections, and put data into the queue."""
    while self.running:
        try:
            if self.serial is None:
                try:
                    self.serial = await asyncio.to_thread(
                        serial.Serial, port=self.port, baudrate=self.baudrate, timeout=self.timeout
                    )
                    logger.info(f"SerialHandler for {self.sensor_id.name} reconnected to {self.port} at {self.baudrate} baud.")

                    # Apply platform-specific optimizations
                    if platform.system() == "Linux":
                        optimize_linux_serial(self.serial)

                except Exception as e:
                    logger.error(f"Failed to reconnect SerialHandler for {self.sensor_id.name} on {self.port}: {e}")
                    await asyncio.sleep(0.5)
                    continue

            line = await asyncio.to_thread(self.serial.readline)
            if line:
                timestamp = time.time()

                decoded_line = line.decode('utf-8', errors='ignore').strip()

                if decoded_line:
                    if self.queue.full():
                        try:
                            self.queue.get_nowait()
                        except asyncio.QueueEmpty:
                            pass
                    await self.queue.put((self.sensor_id, decoded_line, timestamp))

        except Exception as e:
            logger.error(f"Error reading from serial port for {self.sensor_id}: {e}")
            if self.serial:
                try:
                    self.serial.close()
                except Exception as close_error:
                    logger.error(f"Error closing serial port for {self.sensor_id}: {close_error}")
                self.serial = None
            await asyncio.sleep(1.0)

start()

Start the serial handler by launching the read loop in an asynchronous task.

Source code in src/core/services/serial_handler.py
93
94
95
96
def start(self):
    """Start the serial handler by launching the read loop in an asynchronous task."""
    self.running = True
    asyncio.create_task(self.read_serial())

stop()

Stop the serial handler and close the serial port if open.

Source code in src/core/services/serial_handler.py
 98
 99
100
101
102
def stop(self):
    """Stop the serial handler and close the serial port if open."""
    self.running = False
    if self.serial and self.serial.is_open:
        self.serial.close()

check_linux_serial_limits()

Check Linux system limits that affect serial performance

Source code in src/core/services/serial_handler.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def check_linux_serial_limits():
    """Check Linux system limits that affect serial performance"""

    checks = {}

    try:
        # Check USB buffer sizes
        with open('/sys/module/usbcore/parameters/usbfs_memory_mb', 'r') as f:
            usb_memory = int(f.read().strip())
            checks['usb_memory'] = f"{usb_memory} MB"

            if usb_memory < 64:
                logger.warning("USB memory buffer is low. Consider increasing it: "
                             "echo 64 > /sys/module/usbcore/parameters/usbfs_memory_mb")

    except FileNotFoundError:
        checks['usb_memory'] = "Not available"

    return checks

optimize_linux_serial(ser)

Linux-specific optimizations for low-latency serial communication

Source code in src/core/services/serial_handler.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def optimize_linux_serial(ser):
    """Linux-specific optimizations for low-latency serial communication"""

    optimizations = []

    # 1. Set low latency mode
    try:
        import fcntl
        import termios

        # Get file descriptor
        fd = ser.fileno()

        # Set low latency flag
        attrs = termios.tcgetattr(fd)
        attrs[6][termios.VMIN] = 1      # Minimum characters to read
        attrs[6][termios.VTIME] = 0     # Timeout in deciseconds
        termios.tcsetattr(fd, termios.TCSANOW, attrs)

        optimizations.append("Low latency mode")

    except (ImportError, AttributeError, OSError) as e:
        logger.debug(f"Could not set low latency mode: {e}")

    # 2. Disable exclusive access for performance
    try:
        ser.exclusive = False
        optimizations.append("Exclusive access disabled")
    except AttributeError:
        logger.debug("Serial port does not support exclusive mode setting")

    # 3. Set large buffer sizes
    try:
        ser.set_buffer_size(rx_size=16384, tx_size=8192)
        optimizations.append("Large buffers (16KB RX, 8KB TX)")
    except AttributeError:
        logger.debug("Serial port does not support buffer size configuration")

    if optimizations:
        logger.info(f"Applied Linux optimizations: {', '.join(optimizations)}")

    return optimizations

Test Manager

TestManager

Source code in src/core/services/test_manager.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
class TestManager:
    def __init__(self):
        self.current_test: Optional[TestMetaData] = None
        self.is_running = False
        self.is_stopped = False  # Test has been stopped but not yet finalized
        self.test_history: List[TestMetaData] = []

        # Sensor data storage using efficient circular buffers
        # Indexed by SensorId.value for O(1) access
        # Point spacing is determined by DataProcessor publishing rate (PROCESSING_RATE),
        # not raw sensor frequency. If raw > processing rate, effective freq = processing rate.
        # Align buffer sampling with the processor publish rate to avoid underestimating window span
        self.data_storage = SensorDataStorage(len(list(SensorId)), SENSOR_SAMPLING_FREQ)

        self.start_time = 0.0

        # File handles
        self.raw_file = None           # raw.log - raw serial input
        self.raw_csv_file = None       # raw_data.csv - uncalibrated sensor data
        self.raw_csv_writer = None
        self.current_test_dir = None

        self.max_interpolation_gap = 0.5 # seconds - max gap between points to allow interpolation, otherwise leave blank in CSV

        # PIL Images for graphiques (DISP_1 and ARC)
        self.graphique_disp1 = Graphique(SensorId.DISP_1, SensorId.FORCE, GraphiqueConfig())
        arc_config = config_loader.get_sensor_config(SensorId.ARC)
        self.graphique_arc = Graphique(SensorId.ARC, SensorId.FORCE, GraphiqueConfig(x_min=-arc_config.max))

        self.graphique_disp1_history: tuple[SensorData, SensorData] = (SensorData(0.0, SensorId.DISP_1, math.nan), SensorData(0.0, SensorId.FORCE, math.nan))
        self.graphique_arc_history: tuple[SensorData, SensorData] = (SensorData(0.0, SensorId.ARC, math.nan), SensorData(0.0, SensorId.FORCE, math.nan))

        # Numeric output precision (decimals after the decimal point)
        # time: number of decimals for relative_time (seconds)
        # force: decimals for FORCE sensor
        # disp: decimals for displacement sensors (DISP_*, ARC)
        self.time_decimals = 3
        self.force_decimals = 2
        self.disp_decimals = 6

        # Ensure dirs
        os.makedirs(TEST_DATA_DIR, exist_ok=True)
        os.makedirs(ARCHIVE_DIR, exist_ok=True)

        self.files_added_to_current_test: list[str] = []

        # Emulation clock (used when no test is running)
        self.emulation_start_time: float | None = None

        # Load history
        self.reload_history()

        # Add write functions to receive data from SensorManager and processed data from DataProcessor
        sensor_manager.add_write_func(self._on_serial_data)

        sensor_manager.add_func_notify(self._on_raw_sensor_data)

    def reload_history(self):
        """Scans the disk for existing tests."""
        self.test_history = []
        logger.info(f"[RELOAD] Scanning {TEST_DATA_DIR}")

        if not os.path.exists(TEST_DATA_DIR):
            logger.info(f"[RELOAD] Directory does not exist: {TEST_DATA_DIR}")
            return

        items = os.listdir(TEST_DATA_DIR)
        logger.info(f"[RELOAD] Found {len(items)} items in directory")

        for dirname in items:
            # Do not surface the in-flight test (prepared/running/stopped) in history
            if self.current_test and dirname == self.current_test.test_id:
                logger.debug(f"[RELOAD] Skipping current in-progress test {dirname} from history")
                continue

            dirpath = os.path.join(TEST_DATA_DIR, dirname)
            if os.path.isdir(dirpath):
                meta_path = os.path.join(dirpath, "metadata.json")
                if os.path.exists(meta_path):
                    try:
                        with open(meta_path, 'r') as f:
                            data = json.load(f)
                            # Reconstruct dataclass (naive approach)
                            meta = TestMetaData(**data)
                            # Identify the real ID used as foldername if different
                            meta.test_id = dirname 
                            self.test_history.append(meta)
                            logger.debug(f"[RELOAD] Loaded test {dirname}")
                    except Exception as e:
                        logger.error(f"Failed to load test {dirname}: {e}")

        # Sort by date (desc)
        self.test_history.sort(key=lambda x: x.date, reverse=True)
        logger.info(f"[RELOAD] Finished loading {len(self.test_history)} tests")

    def get_test_state(self) -> TestState:
        """
        Get the current state of the test system.

        Returns:
            TestState.NOTHING: No test running and no test prepared
            TestState.PREPARED: No test running but metadata has been set
            TestState.RUNNING: A test is currently running
            TestState.STOPPED: A test has been stopped but not yet finalized
        """
        if self.is_running:
            return TestState.RUNNING
        elif self.is_stopped:
            return TestState.STOPPED
        elif self.current_test is not None:
            return TestState.PREPARED
        else:
            return TestState.NOTHING

    def prepare_test(self, metadata: TestMetaData):
        """Sets the test as current but does not start it yet. Creates the test directory and description file."""
        if self.is_running:
            raise RuntimeError("A test is already running.")

        # Generate unique test ID with timestamp
        safe_id = "".join([c for c in metadata.test_id if c.isalnum() or c in ('-','_')])
        if not safe_id: safe_id = "test"
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        final_id = f"{timestamp}_{safe_id}"

        # Update metadata with final ID
        metadata.test_id = final_id
        self.current_test = metadata

        # Create test directory
        self.current_test_dir = os.path.join(TEST_DATA_DIR, final_id)
        os.makedirs(self.current_test_dir, exist_ok=True)

        # Save metadata
        with open(os.path.join(self.current_test_dir, "metadata.json"), 'w') as f:
            json.dump(asdict(metadata), f, indent=2)

        # Create default description.md file
        description_content = f"# {metadata.test_id}\n\nDescription de l'expérience.\n\n## Informations\n- Date: {metadata.date}\n- Opérateur: {metadata.operator_name}\n- Spécimen: {metadata.specimen_code}"
        with open(os.path.join(self.current_test_dir, "description.md"), 'w', encoding='utf-8') as f:
            f.write(description_content)

    def start_test(self):
        if self.is_running:
            raise RuntimeError("A test is already running.")

        # Use prepared test
        if self.current_test is None:
            raise ValueError("No test metadata prepared. Call POST /info first.")

        if self.current_test_dir is None:
            raise RuntimeError("Test directory not initialized. This should not happen.")

        metadata = self.current_test
        # Directory and metadata already created by prepare_test()

        # Open raw file
        self.raw_file = open(os.path.join(self.current_test_dir, "raw.log"), 'w', buffering=1) # Line buffered

        # Open CSV file for raw data
        self.raw_csv_file = open(os.path.join(self.current_test_dir, "raw_data.csv"), 'w', newline='')
        self.raw_csv_writer = None

        # Initialize both graphiques (DISP_1 and ARC)
        self.graphique_disp1.reset()
        self.graphique_arc.reset()

        self.is_running = True
        # Reset emulation clock when a real/recorded test starts
        self.emulation_start_time = None
        # Clear data storage for new test
        self.data_storage.clear_all()
        self.start_time = datetime.datetime.now().timestamp()

        logger.info(f"Test started: {metadata.test_id}")

    def stop_test(self):
        """Stop recording data but keep test in memory for review/finalization."""
        if self.current_test is None:
            return

        if not self.is_running:
            return  # Already stopped or not running

        logger.info(f"Test stopped (recording ended): {self.current_test.test_id}")

        # Close files to stop recording
        if self.raw_file:
            self.raw_file.close()
            self.raw_file = None
        if self.raw_csv_file:
            self.raw_csv_file.close()
            self.raw_csv_file = None

        # Save graphiques to test directory
        self.graphique_disp1.save_graphique(self.current_test_dir, "graphique_disp1.png")
        self.graphique_arc.save_graphique(self.current_test_dir, "graphique_arc.png")

        # Mark as stopped but keep in memory
        self.is_running = False
        self.is_stopped = True

    def calculate_interpolated_data(self):

        if self.current_test is None:
            raise RuntimeError("No test in memory to calculate data for.")

        if self.current_test_dir is None:
            raise RuntimeError("Test directory not initialized. This should not happen.")

        raw_csv = open(os.path.join(self.current_test_dir, "raw_data.csv"), 'r')
        data_csv = open(os.path.join(self.current_test_dir, "data.csv"), 'w', newline='')
        headers = ["timestamp", "relative_time"] + [sensor.name for sensor in SensorId]
        data_csv_writer = csv.DictWriter(data_csv, fieldnames=headers)
        data_csv_writer.writeheader()
        raw_list: list[list[tuple[float, float]]] = [[] for _ in SensorId]
        reader = csv.DictReader(raw_csv)

        for row in reader:
            try:
                # Handle legacy 'SensorId.FORCE' or standard 'FORCE' string
                sensor_id_str = row["sensor_id"].split('.')[-1]
                sensor_id = SensorId[sensor_id_str]
                timestamp = float(row["timestamp"])
                raw_value = float(row["raw_value"])
                offset = float(row["offset"])
                raw_list[sensor_id.value].append((timestamp, raw_value - offset))
            except Exception as e:
                logger.warning(f"Error parsing raw CSV row {row}: {e}")

        valid_end_times = [raw_list[sensor_id.value][-1][0] for sensor_id in SensorId if raw_list[sensor_id.value]]
        if not valid_end_times:
            data_csv.close()
            raw_csv.close()
            return

        end_time = max(valid_end_times)
        number_of_points = int((end_time - self.start_time) * PROCESSING_RATE)

        for i in range(0, number_of_points):
            wantedTime = self.start_time + i * (1/PROCESSING_RATE)
            line = {"timestamp": f"{wantedTime:.{self.time_decimals}f}", "relative_time": f"{wantedTime - self.start_time:.{self.time_decimals}f}"}
            for sensor_id in SensorId:
                data_points = raw_list[sensor_id.value]
                if not data_points:
                    continue
                # Find two points that sandwich the wantedTime
                before = None
                after = None
                for t, val in data_points:
                    if not math.isnan(t) and not math.isnan(val) and t < wantedTime:
                        before = (t, val)
                    elif not math.isnan(t) and not math.isnan(val) and t > wantedTime and before is not None:
                        after = (t, val)
                        break

                if before is not None and after is not None and after[0] - before[0] <= self.max_interpolation_gap:
                    t1, v1 = before
                    t2, v2 = after
                    interpolated_val = v1 + (v2 - v1) * (wantedTime - t1) / (t2 - t1)
                    logger.debug(f"Interpolated {sensor_id.name} at {wantedTime:.3f}s: {interpolated_val:.3f}")
                    line[sensor_id.name] = f"{interpolated_val:.{self.force_decimals if sensor_id == SensorId.FORCE else self.disp_decimals}f}"
                else:
                    line[sensor_id.name] = ""

            data_csv_writer.writerow(line)
        data_csv.flush()
        raw_csv.close()
        data_csv.close()

    def finalize_test(self):
        """Move stopped test to history and clear current test."""
        if self.current_test is None or self.current_test_dir is None:
            raise ValueError("No test to finalize.")

        if not self.is_stopped:
            raise RuntimeError("Test is not stopped. Call PUT /stop first.")

        logger.info(f"Test finalized: {self.current_test.test_id}")

        # Clean up PIL images
        self.graphique_disp1.reset()
        self.graphique_arc.reset()

        # Clear data storage
        self.data_storage.clear_all()

        self.calculate_interpolated_data()
        self.create_export_csv()

        treatment = TreatmentModule(self.current_test_dir)
        treatment.load_metadata(self.current_test)
        treatment.load_data("data.csv")
        treatment.process_data()
        treatment.save_output()

        self.current_test = None
        self.current_test_dir = None
        self.is_stopped = False

        # Reset emulation clock to allow live time in simulation mode
        self.emulation_start_time = None

        self.files_added_to_current_test.clear()

        # Reload history now that the test is finalized and cleared from memory
        self.reload_history()

    def create_export_csv(self) -> None:
        """
        Create a comprehensive export CSV file for the current test, combining metadata, description, and data.csv content.
        """
        if self.current_test is None or self.current_test_dir is None:
            logger.error("No test in memory to create export CSV for.")
            return

        try:            
            metadata_path = os.path.join(self.current_test_dir, "metadata.json")
            description_path = os.path.join(self.current_test_dir, "description.md")
            data_csv_path = os.path.join(self.current_test_dir, "data.csv")

            # Read pieces if they exist
            metadata_text = ""
            try:
                with open(metadata_path, 'r', encoding='utf-8') as mf:
                    metadata_text = mf.read().strip()
            except Exception:
                metadata_text = ""

            description_text = ""
            try:
                with open(description_path, 'r', encoding='utf-8') as df:
                    description_text = df.read().strip()
            except Exception:
                description_text = ""

            data_csv_text = ""
            try:
                with open(data_csv_path, 'r', encoding='utf-8') as cf:
                    data_csv_text = cf.read().strip()
                    data_csv_text = data_csv_text.replace(",", "\t").replace(".", ",")
            except Exception:
                data_csv_text = ""


            # Compose CSV-like content: metadata, blank line, description, blank line, data.csv
            parts = []
            parts.append(f"\"metadata\": {metadata_text}")
            parts.append("")
            parts.append(description_text)
            parts.append("")
            parts.append(data_csv_text)

            export_content = "\n".join(parts)
            expport_csv_path = os.path.join(self.current_test_dir, "export.csv")
            with open(expport_csv_path, 'w', encoding='utf-8') as ef:
                ef.write(export_content)

        except Exception:
            logger.error("Failed to create export CSV for test %s", self.current_test.test_id)
            pass

    def archive_test(self, test_id: str):
        """Moves a test folder to the archive directory."""
        src = os.path.join(TEST_DATA_DIR, test_id)
        dst = os.path.join(ARCHIVE_DIR, test_id)
        if os.path.exists(src):
            shutil.move(src, dst)
            self.reload_history()
            logger.info(f"Archived test {test_id}")
            return True
        return False

    def delete_test(self, test_id: str):
        """Irreversibly deletes a test."""
        target = os.path.join(TEST_DATA_DIR, test_id)
        if os.path.exists(target):
            shutil.rmtree(target)
            self.reload_history()
            logger.info(f"Deleted test {test_id}")
            return True
        return False

    def _on_serial_data(self, sensor_id: SensorId, time: float, line: str):
        """Write raw serial data to raw.log file with timestamp and sensor ID."""
        if self.is_running and self.raw_file:
            self.raw_file.write(f"[{time}] {sensor_id.name} {line}\n")

    def _on_raw_sensor_data(self, sensor_data: SensorData):
        """Handle raw (uncalibrated) sensor data from SensorManager."""
        if not self.is_running or not self.raw_csv_file:
            return

        t = sensor_data.timestamp
        rel_time = t - self.start_time
        sensor_id = sensor_data.sensor_id
        value = sensor_data.value
        raw_value = sensor_data.raw_value

        if sensor_id == SensorId.DISP_1:
            self.graphique_disp1_history = (sensor_data, self.graphique_disp1_history[1])

            if not math.isnan(self.graphique_disp1_history[0].value) and not math.isnan(self.graphique_disp1_history[1].value):
                self.graphique_disp1.plot_point_on_graphique(self.graphique_disp1_history[0].value, self.graphique_disp1_history[1].value)

        elif sensor_id == SensorId.ARC:
            self.graphique_arc_history = (sensor_data, self.graphique_arc_history[1])

            if not math.isnan(self.graphique_arc_history[0].value) and not math.isnan(self.graphique_arc_history[1].value):
                self.graphique_arc.plot_point_on_graphique(self.graphique_arc_history[0].value, self.graphique_arc_history[1].value)

        elif sensor_id == SensorId.FORCE:
            self.graphique_disp1_history = (self.graphique_disp1_history[0], sensor_data)
            if not math.isnan(self.graphique_disp1_history[0].value) and not math.isnan(self.graphique_disp1_history[1].value):
                self.graphique_disp1.plot_point_on_graphique(self.graphique_disp1_history[0].value, self.graphique_disp1_history[1].value)

            self.graphique_arc_history = (self.graphique_arc_history[0], sensor_data)
            if not math.isnan(self.graphique_arc_history[0].value) and not math.isnan(self.graphique_arc_history[1].value):
                self.graphique_arc.plot_point_on_graphique(self.graphique_arc_history[0].value, self.graphique_arc_history[1].value)

        self._store_sensor_data(sensor_data)

        # Initialize CSV writer on first raw data
        if self.raw_csv_writer is None:
            headers = ["timestamp", "relative_time", "sensor_id", "raw_value", "offset"]
            self.raw_csv_writer = csv.DictWriter(self.raw_csv_file, fieldnames=headers)
            self.raw_csv_writer.writeheader()

        # Format numbers according to configured precision
        def _format_raw_value(sid_name: SensorId, val: float):
            if val is None:
                return ""
            if sid_name == SensorId.FORCE:
                return f"{val:.{self.force_decimals}f}"
            # DISP_* and ARC
            if SensorId.DISP_1 == sid_name or SensorId.DISP_2 == sid_name or SensorId.DISP_3 == sid_name or SensorId.DISP_4 == sid_name or SensorId.DISP_5 == sid_name or sid_name == SensorId.ARC:
                return f"{val:.{self.disp_decimals}f}"
            # Default
            return f"{val:.{self.force_decimals}f}"

        row = {
            "timestamp": f"{t:.{self.time_decimals}f}",
            "relative_time": f"{rel_time:.{self.time_decimals}f}",
            "sensor_id": sensor_id.name,
            "raw_value": _format_raw_value(sensor_id, raw_value),
            "offset": _format_raw_value(sensor_id, sensor_data.offset)
        }
        self.raw_csv_writer.writerow(row)

    def _store_sensor_data(self, data: SensorData, epsilon: float = 1e-6):
        """Store data in circular buffers
        Append (relative_time, value) tuples for each sensor
        Only append points that respect the storage sampling frequency.
        For each sensor, check the last recorded point time and ensure the
        new point's relative time is >= last_time + spacing (with small epsilon)."""
        sensor_idx = data.sensor_id.value
        val = data.value
        spacing = 1.0 / float(self.data_storage.sampling_frequency)
        rel_time = data.timestamp - self.start_time

        if not math.isnan(val):
            buffer = self.data_storage.buffers[sensor_idx]
            if buffer.size() == 0:
                # buffer empty -> always append
                self.data_storage.append(sensor_idx, rel_time, val)
            else:
                # Get last recorded time (logical index = size-1)
                last_time, _ = buffer.get(buffer.size() - 1)
                expected_time = last_time + spacing
                if rel_time + epsilon >= expected_time:
                    self.data_storage.append(sensor_idx, rel_time, val)

    def get_sensor_history(self, sensor_id: SensorId, window_seconds: int):
        """Return recent data for a sensor over the requested window (seconds)."""
        # Allow history access while a test is stopped but not yet finalized
        if not (self.is_running or self.is_stopped):
            raise RuntimeError("No test is currently running or stopped")
        return self.data_storage.get_data_for_window_seconds(sensor_id.value, window_seconds)

    def get_history(self) -> List[TestMetaData]:
        """Get list of all test histories, reloaded from disk."""
        self.reload_history()
        return self.test_history

    def get_relative_time(self) -> float:
        """Get current time relative to test start, or 0.0 if no test is running."""
        if self.is_running and self.start_time > 0:
            return time.time() - self.start_time

        # In simulation mode (no test running), expose a monotonic clock so time does not stay at 0
        try:
            from core.services.sensor_manager import sensor_manager
        except Exception:
            sensor_manager = None

        if sensor_manager and sensor_manager.emulated_sensors:
            if self.emulation_start_time is None:
                self.emulation_start_time = time.time()
            return time.time() - self.emulation_start_time

        return 0.0

    def get_graphique_png(self, sensor_name: str) -> bytes:
        """Return the graphique as PNG bytes."""

        if sensor_name == 'DISP_1':
            return self.graphique_disp1.get_graphique_png()

        else:
            return self.graphique_arc.get_graphique_png()

    def get_description(self, test_id: str) -> str:
        """Get the description.md content for a test."""
        desc_path = os.path.join(TEST_DATA_DIR, test_id, "description.md")
        if os.path.exists(desc_path):
            with open(desc_path, 'r', encoding='utf-8') as f:
                return f.read()

        # If not found in test_data, try archived_data
        desc_path = os.path.join(ARCHIVE_DIR, test_id, "description.md")
        if os.path.exists(desc_path):
            with open(desc_path, 'r', encoding='utf-8') as f:
                return f.read()

        raise FileNotFoundError(f"Description not found for test {test_id}")

    def set_description(self, test_id: str, content: str) -> bool:
        """Update the description.md content for a test."""
        # Try test_data first
        desc_path = os.path.join(TEST_DATA_DIR, test_id, "description.md")
        if os.path.exists(desc_path):
            with open(desc_path, 'w', encoding='utf-8') as f:
                f.write(content)
            logger.info(f"Updated description for test {test_id}")
            return True

        # Try archived_data
        desc_path = os.path.join(ARCHIVE_DIR, test_id, "description.md")
        if os.path.exists(desc_path):
            with open(desc_path, 'w', encoding='utf-8') as f:
                f.write(content)
            logger.info(f"Updated description for archived test {test_id}")
            return True

        return False

    def add_file(self, file: bytes, filename: str) -> bool:
        """Add a file to the current test directory."""
        if self.current_test_dir is None:
            return False

        self.files_added_to_current_test.append(filename)
        file_path = os.path.join(self.current_test_dir, filename)
        with open(file_path, 'wb') as f:
            f.write(file)
        logger.info(f"Added file {filename} to test {self.current_test.test_id}")  # type: ignore
        return True

    def list_files(self) -> List[str]:
        """List files added to the current test by the API calls."""
        if self.current_test_dir is None:
            return []

        # Return the list of files added via add_file, excluding raw.log and description.md
        return self.files_added_to_current_test

    def delete_file(self, filename: str) -> bool:
        """Delete a file that was added to the current test."""
        if self.current_test_dir is None:
            return False

        if filename not in self.files_added_to_current_test:
            return False

        file_path = os.path.join(self.current_test_dir, filename)
        if os.path.exists(file_path):
            os.remove(file_path)

        self.files_added_to_current_test.remove(filename)
        logger.info(f"Deleted file {filename} from test {self.current_test.test_id}")  # type: ignore
        return True

    def get_file(self, filename: str) -> Optional[bytes]:
        """Get the content of a file added to the current test."""
        if self.current_test_dir is None:
            return None

        if filename not in self.files_added_to_current_test:
            return None

        file_path = os.path.join(self.current_test_dir, filename)
        if os.path.exists(file_path):
            with open(file_path, 'rb') as f:
                return f.read()

        return None

    def get_treatment_json(self, name: str) -> dict|None:
        """Get the content of a treatment JSON file added to the current test."""
        if self.current_test_dir is None:
            return None

        file_path = os.path.join(ARCHIVE_DIR, name,  TreatmentModule.json_name)
        if os.path.exists(file_path):
            with open(file_path, 'r') as f:
                return json.load(f)

        return None

    def get_treatment_png(self, name: str) -> bytes|None:
        """Get the content of a treatment PNG file added to the current test."""
        if self.current_test_dir is None:
            return None

        file_path = os.path.join(ARCHIVE_DIR, name, TreatmentModule.image_name)
        if os.path.exists(file_path):
            with open(file_path, 'rb') as f:
                return f.read()

        return None

    def get_treatment_png_base64(self, name: str) -> bytes|None:
        """Get the content of a treatment PNG file added to the current test, encoded in base64 for API transmission."""
        png_bytes = self.get_treatment_png(name)
        if png_bytes is not None:
            return base64.b64encode(png_bytes)
        return None

add_file(file, filename)

Add a file to the current test directory.

Source code in src/core/services/test_manager.py
587
588
589
590
591
592
593
594
595
596
597
def add_file(self, file: bytes, filename: str) -> bool:
    """Add a file to the current test directory."""
    if self.current_test_dir is None:
        return False

    self.files_added_to_current_test.append(filename)
    file_path = os.path.join(self.current_test_dir, filename)
    with open(file_path, 'wb') as f:
        f.write(file)
    logger.info(f"Added file {filename} to test {self.current_test.test_id}")  # type: ignore
    return True

archive_test(test_id)

Moves a test folder to the archive directory.

Source code in src/core/services/test_manager.py
403
404
405
406
407
408
409
410
411
412
def archive_test(self, test_id: str):
    """Moves a test folder to the archive directory."""
    src = os.path.join(TEST_DATA_DIR, test_id)
    dst = os.path.join(ARCHIVE_DIR, test_id)
    if os.path.exists(src):
        shutil.move(src, dst)
        self.reload_history()
        logger.info(f"Archived test {test_id}")
        return True
    return False

create_export_csv()

Create a comprehensive export CSV file for the current test, combining metadata, description, and data.csv content.

Source code in src/core/services/test_manager.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def create_export_csv(self) -> None:
    """
    Create a comprehensive export CSV file for the current test, combining metadata, description, and data.csv content.
    """
    if self.current_test is None or self.current_test_dir is None:
        logger.error("No test in memory to create export CSV for.")
        return

    try:            
        metadata_path = os.path.join(self.current_test_dir, "metadata.json")
        description_path = os.path.join(self.current_test_dir, "description.md")
        data_csv_path = os.path.join(self.current_test_dir, "data.csv")

        # Read pieces if they exist
        metadata_text = ""
        try:
            with open(metadata_path, 'r', encoding='utf-8') as mf:
                metadata_text = mf.read().strip()
        except Exception:
            metadata_text = ""

        description_text = ""
        try:
            with open(description_path, 'r', encoding='utf-8') as df:
                description_text = df.read().strip()
        except Exception:
            description_text = ""

        data_csv_text = ""
        try:
            with open(data_csv_path, 'r', encoding='utf-8') as cf:
                data_csv_text = cf.read().strip()
                data_csv_text = data_csv_text.replace(",", "\t").replace(".", ",")
        except Exception:
            data_csv_text = ""


        # Compose CSV-like content: metadata, blank line, description, blank line, data.csv
        parts = []
        parts.append(f"\"metadata\": {metadata_text}")
        parts.append("")
        parts.append(description_text)
        parts.append("")
        parts.append(data_csv_text)

        export_content = "\n".join(parts)
        expport_csv_path = os.path.join(self.current_test_dir, "export.csv")
        with open(expport_csv_path, 'w', encoding='utf-8') as ef:
            ef.write(export_content)

    except Exception:
        logger.error("Failed to create export CSV for test %s", self.current_test.test_id)
        pass

delete_file(filename)

Delete a file that was added to the current test.

Source code in src/core/services/test_manager.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def delete_file(self, filename: str) -> bool:
    """Delete a file that was added to the current test."""
    if self.current_test_dir is None:
        return False

    if filename not in self.files_added_to_current_test:
        return False

    file_path = os.path.join(self.current_test_dir, filename)
    if os.path.exists(file_path):
        os.remove(file_path)

    self.files_added_to_current_test.remove(filename)
    logger.info(f"Deleted file {filename} from test {self.current_test.test_id}")  # type: ignore
    return True

delete_test(test_id)

Irreversibly deletes a test.

Source code in src/core/services/test_manager.py
414
415
416
417
418
419
420
421
422
def delete_test(self, test_id: str):
    """Irreversibly deletes a test."""
    target = os.path.join(TEST_DATA_DIR, test_id)
    if os.path.exists(target):
        shutil.rmtree(target)
        self.reload_history()
        logger.info(f"Deleted test {test_id}")
        return True
    return False

finalize_test()

Move stopped test to history and clear current test.

Source code in src/core/services/test_manager.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def finalize_test(self):
    """Move stopped test to history and clear current test."""
    if self.current_test is None or self.current_test_dir is None:
        raise ValueError("No test to finalize.")

    if not self.is_stopped:
        raise RuntimeError("Test is not stopped. Call PUT /stop first.")

    logger.info(f"Test finalized: {self.current_test.test_id}")

    # Clean up PIL images
    self.graphique_disp1.reset()
    self.graphique_arc.reset()

    # Clear data storage
    self.data_storage.clear_all()

    self.calculate_interpolated_data()
    self.create_export_csv()

    treatment = TreatmentModule(self.current_test_dir)
    treatment.load_metadata(self.current_test)
    treatment.load_data("data.csv")
    treatment.process_data()
    treatment.save_output()

    self.current_test = None
    self.current_test_dir = None
    self.is_stopped = False

    # Reset emulation clock to allow live time in simulation mode
    self.emulation_start_time = None

    self.files_added_to_current_test.clear()

    # Reload history now that the test is finalized and cleared from memory
    self.reload_history()

get_description(test_id)

Get the description.md content for a test.

Source code in src/core/services/test_manager.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
def get_description(self, test_id: str) -> str:
    """Get the description.md content for a test."""
    desc_path = os.path.join(TEST_DATA_DIR, test_id, "description.md")
    if os.path.exists(desc_path):
        with open(desc_path, 'r', encoding='utf-8') as f:
            return f.read()

    # If not found in test_data, try archived_data
    desc_path = os.path.join(ARCHIVE_DIR, test_id, "description.md")
    if os.path.exists(desc_path):
        with open(desc_path, 'r', encoding='utf-8') as f:
            return f.read()

    raise FileNotFoundError(f"Description not found for test {test_id}")

get_file(filename)

Get the content of a file added to the current test.

Source code in src/core/services/test_manager.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def get_file(self, filename: str) -> Optional[bytes]:
    """Get the content of a file added to the current test."""
    if self.current_test_dir is None:
        return None

    if filename not in self.files_added_to_current_test:
        return None

    file_path = os.path.join(self.current_test_dir, filename)
    if os.path.exists(file_path):
        with open(file_path, 'rb') as f:
            return f.read()

    return None

get_graphique_png(sensor_name)

Return the graphique as PNG bytes.

Source code in src/core/services/test_manager.py
543
544
545
546
547
548
549
550
def get_graphique_png(self, sensor_name: str) -> bytes:
    """Return the graphique as PNG bytes."""

    if sensor_name == 'DISP_1':
        return self.graphique_disp1.get_graphique_png()

    else:
        return self.graphique_arc.get_graphique_png()

get_history()

Get list of all test histories, reloaded from disk.

Source code in src/core/services/test_manager.py
520
521
522
523
def get_history(self) -> List[TestMetaData]:
    """Get list of all test histories, reloaded from disk."""
    self.reload_history()
    return self.test_history

get_relative_time()

Get current time relative to test start, or 0.0 if no test is running.

Source code in src/core/services/test_manager.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def get_relative_time(self) -> float:
    """Get current time relative to test start, or 0.0 if no test is running."""
    if self.is_running and self.start_time > 0:
        return time.time() - self.start_time

    # In simulation mode (no test running), expose a monotonic clock so time does not stay at 0
    try:
        from core.services.sensor_manager import sensor_manager
    except Exception:
        sensor_manager = None

    if sensor_manager and sensor_manager.emulated_sensors:
        if self.emulation_start_time is None:
            self.emulation_start_time = time.time()
        return time.time() - self.emulation_start_time

    return 0.0

get_sensor_history(sensor_id, window_seconds)

Return recent data for a sensor over the requested window (seconds).

Source code in src/core/services/test_manager.py
513
514
515
516
517
518
def get_sensor_history(self, sensor_id: SensorId, window_seconds: int):
    """Return recent data for a sensor over the requested window (seconds)."""
    # Allow history access while a test is stopped but not yet finalized
    if not (self.is_running or self.is_stopped):
        raise RuntimeError("No test is currently running or stopped")
    return self.data_storage.get_data_for_window_seconds(sensor_id.value, window_seconds)

get_test_state()

Get the current state of the test system.

Returns:

Type Description
TestState

TestState.NOTHING: No test running and no test prepared

TestState

TestState.PREPARED: No test running but metadata has been set

TestState

TestState.RUNNING: A test is currently running

TestState

TestState.STOPPED: A test has been stopped but not yet finalized

Source code in src/core/services/test_manager.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def get_test_state(self) -> TestState:
    """
    Get the current state of the test system.

    Returns:
        TestState.NOTHING: No test running and no test prepared
        TestState.PREPARED: No test running but metadata has been set
        TestState.RUNNING: A test is currently running
        TestState.STOPPED: A test has been stopped but not yet finalized
    """
    if self.is_running:
        return TestState.RUNNING
    elif self.is_stopped:
        return TestState.STOPPED
    elif self.current_test is not None:
        return TestState.PREPARED
    else:
        return TestState.NOTHING

get_treatment_json(name)

Get the content of a treatment JSON file added to the current test.

Source code in src/core/services/test_manager.py
638
639
640
641
642
643
644
645
646
647
648
def get_treatment_json(self, name: str) -> dict|None:
    """Get the content of a treatment JSON file added to the current test."""
    if self.current_test_dir is None:
        return None

    file_path = os.path.join(ARCHIVE_DIR, name,  TreatmentModule.json_name)
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            return json.load(f)

    return None

get_treatment_png(name)

Get the content of a treatment PNG file added to the current test.

Source code in src/core/services/test_manager.py
650
651
652
653
654
655
656
657
658
659
660
def get_treatment_png(self, name: str) -> bytes|None:
    """Get the content of a treatment PNG file added to the current test."""
    if self.current_test_dir is None:
        return None

    file_path = os.path.join(ARCHIVE_DIR, name, TreatmentModule.image_name)
    if os.path.exists(file_path):
        with open(file_path, 'rb') as f:
            return f.read()

    return None

get_treatment_png_base64(name)

Get the content of a treatment PNG file added to the current test, encoded in base64 for API transmission.

Source code in src/core/services/test_manager.py
662
663
664
665
666
667
def get_treatment_png_base64(self, name: str) -> bytes|None:
    """Get the content of a treatment PNG file added to the current test, encoded in base64 for API transmission."""
    png_bytes = self.get_treatment_png(name)
    if png_bytes is not None:
        return base64.b64encode(png_bytes)
    return None

list_files()

List files added to the current test by the API calls.

Source code in src/core/services/test_manager.py
599
600
601
602
603
604
605
def list_files(self) -> List[str]:
    """List files added to the current test by the API calls."""
    if self.current_test_dir is None:
        return []

    # Return the list of files added via add_file, excluding raw.log and description.md
    return self.files_added_to_current_test

prepare_test(metadata)

Sets the test as current but does not start it yet. Creates the test directory and description file.

Source code in src/core/services/test_manager.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def prepare_test(self, metadata: TestMetaData):
    """Sets the test as current but does not start it yet. Creates the test directory and description file."""
    if self.is_running:
        raise RuntimeError("A test is already running.")

    # Generate unique test ID with timestamp
    safe_id = "".join([c for c in metadata.test_id if c.isalnum() or c in ('-','_')])
    if not safe_id: safe_id = "test"
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    final_id = f"{timestamp}_{safe_id}"

    # Update metadata with final ID
    metadata.test_id = final_id
    self.current_test = metadata

    # Create test directory
    self.current_test_dir = os.path.join(TEST_DATA_DIR, final_id)
    os.makedirs(self.current_test_dir, exist_ok=True)

    # Save metadata
    with open(os.path.join(self.current_test_dir, "metadata.json"), 'w') as f:
        json.dump(asdict(metadata), f, indent=2)

    # Create default description.md file
    description_content = f"# {metadata.test_id}\n\nDescription de l'expérience.\n\n## Informations\n- Date: {metadata.date}\n- Opérateur: {metadata.operator_name}\n- Spécimen: {metadata.specimen_code}"
    with open(os.path.join(self.current_test_dir, "description.md"), 'w', encoding='utf-8') as f:
        f.write(description_content)

reload_history()

Scans the disk for existing tests.

Source code in src/core/services/test_manager.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def reload_history(self):
    """Scans the disk for existing tests."""
    self.test_history = []
    logger.info(f"[RELOAD] Scanning {TEST_DATA_DIR}")

    if not os.path.exists(TEST_DATA_DIR):
        logger.info(f"[RELOAD] Directory does not exist: {TEST_DATA_DIR}")
        return

    items = os.listdir(TEST_DATA_DIR)
    logger.info(f"[RELOAD] Found {len(items)} items in directory")

    for dirname in items:
        # Do not surface the in-flight test (prepared/running/stopped) in history
        if self.current_test and dirname == self.current_test.test_id:
            logger.debug(f"[RELOAD] Skipping current in-progress test {dirname} from history")
            continue

        dirpath = os.path.join(TEST_DATA_DIR, dirname)
        if os.path.isdir(dirpath):
            meta_path = os.path.join(dirpath, "metadata.json")
            if os.path.exists(meta_path):
                try:
                    with open(meta_path, 'r') as f:
                        data = json.load(f)
                        # Reconstruct dataclass (naive approach)
                        meta = TestMetaData(**data)
                        # Identify the real ID used as foldername if different
                        meta.test_id = dirname 
                        self.test_history.append(meta)
                        logger.debug(f"[RELOAD] Loaded test {dirname}")
                except Exception as e:
                    logger.error(f"Failed to load test {dirname}: {e}")

    # Sort by date (desc)
    self.test_history.sort(key=lambda x: x.date, reverse=True)
    logger.info(f"[RELOAD] Finished loading {len(self.test_history)} tests")

set_description(test_id, content)

Update the description.md content for a test.

Source code in src/core/services/test_manager.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def set_description(self, test_id: str, content: str) -> bool:
    """Update the description.md content for a test."""
    # Try test_data first
    desc_path = os.path.join(TEST_DATA_DIR, test_id, "description.md")
    if os.path.exists(desc_path):
        with open(desc_path, 'w', encoding='utf-8') as f:
            f.write(content)
        logger.info(f"Updated description for test {test_id}")
        return True

    # Try archived_data
    desc_path = os.path.join(ARCHIVE_DIR, test_id, "description.md")
    if os.path.exists(desc_path):
        with open(desc_path, 'w', encoding='utf-8') as f:
            f.write(content)
        logger.info(f"Updated description for archived test {test_id}")
        return True

    return False

stop_test()

Stop recording data but keep test in memory for review/finalization.

Source code in src/core/services/test_manager.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def stop_test(self):
    """Stop recording data but keep test in memory for review/finalization."""
    if self.current_test is None:
        return

    if not self.is_running:
        return  # Already stopped or not running

    logger.info(f"Test stopped (recording ended): {self.current_test.test_id}")

    # Close files to stop recording
    if self.raw_file:
        self.raw_file.close()
        self.raw_file = None
    if self.raw_csv_file:
        self.raw_csv_file.close()
        self.raw_csv_file = None

    # Save graphiques to test directory
    self.graphique_disp1.save_graphique(self.current_test_dir, "graphique_disp1.png")
    self.graphique_arc.save_graphique(self.current_test_dir, "graphique_arc.png")

    # Mark as stopped but keep in memory
    self.is_running = False
    self.is_stopped = True

Configuration

Config Loader

ConfigLoader

Loads and manages sensor configuration from JSON file.

Source code in src/core/config_loader.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class ConfigLoader:
    """Loads and manages sensor configuration from JSON file."""

    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ConfigLoader, cls).__new__(cls)
            cls._instance._config = configData(sensors={}, emulation=[])
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if not self._initialized:
            self._config = self._get_default_config()
            self.load_config()
            self._initialized = True

    @staticmethod
    def get_config_path() -> Path:
        """Get the path to the sensors_config.json file."""
        # Config file should be in the project root/config directory
        config_path = Path(__file__).parent.parent.parent / "config" / "sensors_config.json"
        return config_path

    def load_config(self):
        """Load configuration from JSON file."""
        config_path = self.get_config_path()

        # Start from defaults to ensure _config is always a dict
        self._config = self._get_default_config()

        if not config_path.exists():
            logger.error(f"Configuration file not found: {config_path}")
            return

        try:
            with open(config_path, 'r') as f:
                json_data = json.load(f)
                # Parse JSON into configData structure
                for sensor_key, sensor_cfg in json_data.get("sensors", {}).items():
                    sensor_id = SensorId[sensor_key]
                    self._config.sensors[sensor_id] = configSensorData(
                        sensor_id,
                        baud=sensor_cfg.get("baud", 9600),
                        description=sensor_cfg.get("description", ""),
                        displayName=sensor_cfg.get("display_name", ""),
                        serialId=sensor_cfg.get("serial_id", ""),
                        max=sensor_cfg.get("max", 0.0),
                        enabled=sensor_cfg.get("enabled", True)
                    )

                for sensor_key, sensor_cfg in json_data.get("calculated_sensors", {}).items():
                    sensor_id = SensorId[sensor_key]
                    calculated_sensor = calculatedConfigSensorData(
                        sensor_id,
                        description=sensor_cfg.get("description", ""),
                        displayName=sensor_cfg.get("display_name", ""),
                        max=sensor_cfg.get("max", 0.0),
                        dependencies=[]
                    )
                    for dep_key in sensor_cfg.get("dependencies", []):
                        dep_id = SensorId[dep_key]
                        if dep_id in self._config.sensors:
                            dep_sensor = self._config.sensors[dep_id]
                            if isinstance(dep_sensor, configSensorData):
                                calculated_sensor.dependencies.append(
                                    dep_sensor
                                )
                        else:
                            logger.warning(f"Dependency sensor {dep_key} for {sensor_key} not found in config.")
                    self._config.sensors[sensor_id] = calculated_sensor

                self._config.emulation = [SensorId[s] for s in json_data.get("emulation", []) if s in SensorId.__members__]
            logger.info(f"Configuration loaded from {config_path}")

        except json.JSONDecodeError as e:
            logger.error(f"Failed to parse configuration file: {e}")
            self._config = self._get_default_config()

        except Exception as e:
            logger.error(f"Unexpected error loading configuration: {e}")
            self._config = self._get_default_config()

    @staticmethod
    def _get_default_config() -> configData:
        """Return default configuration."""

        return configData(
            emulation=[],
            sensors={
                SensorId.FORCE: configSensorData(SensorId.FORCE, baud=115200, description="", displayName="", serialId="", max=0.0, enabled=True),
                SensorId.DISP_1: configSensorData(SensorId.DISP_1, baud=9600, description="", displayName="", serialId="", max=0.0, enabled=True),
                SensorId.DISP_2: configSensorData(SensorId.DISP_2, baud=9600, description="", displayName="", serialId="", max=0.0, enabled=False),
                SensorId.DISP_3: configSensorData(SensorId.DISP_3, baud=9600, description="", displayName="", serialId="", max=0.0, enabled=False),
                SensorId.DISP_4: configSensorData(SensorId.DISP_4, baud=9600, description="", displayName="", serialId="", max=0.0, enabled=False),
                SensorId.DISP_5: configSensorData(SensorId.DISP_5, baud=9600, description="", displayName="", serialId="", max=0.0, enabled=False),
                SensorId.ARC: calculatedConfigSensorData(SensorId.ARC, description="", displayName="", max=0.0, dependencies=[]),
            }
        )

    def get_emulation_mode(self) -> list[SensorId]:
        """Get the emulation mode setting."""
        return self._config.emulation

    def get_sensor_config(self, sensor_id: SensorId) -> defaultConfigSensorData:
        """Get configuration for a specific sensor."""
        return self._config.sensors[sensor_id]

    def get_sensor_baud(self, sensor_id: SensorId) -> int:
        """Get the baud rate for a specific sensor."""
        sensor_config = self.get_sensor_config(sensor_id)
        if sensor_config and isinstance(sensor_config, configSensorData):
            return sensor_config.baud
        return 115200

    def is_sensor_enabled(self, sensor_id: SensorId) -> bool:
        """
        Check if a sensor is enabled in configuration (enabled: true, except ARC).
        """
        cfg = self.get_sensor_config(sensor_id)
        if isinstance(cfg, configSensorData):
            return cfg.enabled is True

        elif isinstance(cfg, calculatedConfigSensorData):
            for calc_sensor in cfg.dependencies:
                if not self.is_sensor_enabled(calc_sensor.id):
                    return False
            return True
        return False

    def get_all_sensors(self) -> Dict[SensorId, defaultConfigSensorData]:
        """Get all sensor configurations."""
        return self._config.sensors

    def get_enabled_sensors(self) -> Dict[SensorId, defaultConfigSensorData]:
        """Get only sensors that represent real hardware (have a baud)."""
        all_sensors = self.get_all_sensors()
        for sensor in list(all_sensors.keys()):
            if self.is_sensor_enabled(sensor) is not True:
                all_sensors.pop(sensor)
        return all_sensors

    def reload_config(self):
        """Reload configuration from file."""
        self.load_config()
        logger.info("Configuration reloaded")

get_all_sensors()

Get all sensor configurations.

Source code in src/core/config_loader.py
142
143
144
def get_all_sensors(self) -> Dict[SensorId, defaultConfigSensorData]:
    """Get all sensor configurations."""
    return self._config.sensors

get_config_path() staticmethod

Get the path to the sensors_config.json file.

Source code in src/core/config_loader.py
29
30
31
32
33
34
@staticmethod
def get_config_path() -> Path:
    """Get the path to the sensors_config.json file."""
    # Config file should be in the project root/config directory
    config_path = Path(__file__).parent.parent.parent / "config" / "sensors_config.json"
    return config_path

get_emulation_mode()

Get the emulation mode setting.

Source code in src/core/config_loader.py
112
113
114
def get_emulation_mode(self) -> list[SensorId]:
    """Get the emulation mode setting."""
    return self._config.emulation

get_enabled_sensors()

Get only sensors that represent real hardware (have a baud).

Source code in src/core/config_loader.py
146
147
148
149
150
151
152
def get_enabled_sensors(self) -> Dict[SensorId, defaultConfigSensorData]:
    """Get only sensors that represent real hardware (have a baud)."""
    all_sensors = self.get_all_sensors()
    for sensor in list(all_sensors.keys()):
        if self.is_sensor_enabled(sensor) is not True:
            all_sensors.pop(sensor)
    return all_sensors

get_sensor_baud(sensor_id)

Get the baud rate for a specific sensor.

Source code in src/core/config_loader.py
120
121
122
123
124
125
def get_sensor_baud(self, sensor_id: SensorId) -> int:
    """Get the baud rate for a specific sensor."""
    sensor_config = self.get_sensor_config(sensor_id)
    if sensor_config and isinstance(sensor_config, configSensorData):
        return sensor_config.baud
    return 115200

get_sensor_config(sensor_id)

Get configuration for a specific sensor.

Source code in src/core/config_loader.py
116
117
118
def get_sensor_config(self, sensor_id: SensorId) -> defaultConfigSensorData:
    """Get configuration for a specific sensor."""
    return self._config.sensors[sensor_id]

is_sensor_enabled(sensor_id)

Check if a sensor is enabled in configuration (enabled: true, except ARC).

Source code in src/core/config_loader.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def is_sensor_enabled(self, sensor_id: SensorId) -> bool:
    """
    Check if a sensor is enabled in configuration (enabled: true, except ARC).
    """
    cfg = self.get_sensor_config(sensor_id)
    if isinstance(cfg, configSensorData):
        return cfg.enabled is True

    elif isinstance(cfg, calculatedConfigSensorData):
        for calc_sensor in cfg.dependencies:
            if not self.is_sensor_enabled(calc_sensor.id):
                return False
        return True
    return False

load_config()

Load configuration from JSON file.

Source code in src/core/config_loader.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def load_config(self):
    """Load configuration from JSON file."""
    config_path = self.get_config_path()

    # Start from defaults to ensure _config is always a dict
    self._config = self._get_default_config()

    if not config_path.exists():
        logger.error(f"Configuration file not found: {config_path}")
        return

    try:
        with open(config_path, 'r') as f:
            json_data = json.load(f)
            # Parse JSON into configData structure
            for sensor_key, sensor_cfg in json_data.get("sensors", {}).items():
                sensor_id = SensorId[sensor_key]
                self._config.sensors[sensor_id] = configSensorData(
                    sensor_id,
                    baud=sensor_cfg.get("baud", 9600),
                    description=sensor_cfg.get("description", ""),
                    displayName=sensor_cfg.get("display_name", ""),
                    serialId=sensor_cfg.get("serial_id", ""),
                    max=sensor_cfg.get("max", 0.0),
                    enabled=sensor_cfg.get("enabled", True)
                )

            for sensor_key, sensor_cfg in json_data.get("calculated_sensors", {}).items():
                sensor_id = SensorId[sensor_key]
                calculated_sensor = calculatedConfigSensorData(
                    sensor_id,
                    description=sensor_cfg.get("description", ""),
                    displayName=sensor_cfg.get("display_name", ""),
                    max=sensor_cfg.get("max", 0.0),
                    dependencies=[]
                )
                for dep_key in sensor_cfg.get("dependencies", []):
                    dep_id = SensorId[dep_key]
                    if dep_id in self._config.sensors:
                        dep_sensor = self._config.sensors[dep_id]
                        if isinstance(dep_sensor, configSensorData):
                            calculated_sensor.dependencies.append(
                                dep_sensor
                            )
                    else:
                        logger.warning(f"Dependency sensor {dep_key} for {sensor_key} not found in config.")
                self._config.sensors[sensor_id] = calculated_sensor

            self._config.emulation = [SensorId[s] for s in json_data.get("emulation", []) if s in SensorId.__members__]
        logger.info(f"Configuration loaded from {config_path}")

    except json.JSONDecodeError as e:
        logger.error(f"Failed to parse configuration file: {e}")
        self._config = self._get_default_config()

    except Exception as e:
        logger.error(f"Unexpected error loading configuration: {e}")
        self._config = self._get_default_config()

reload_config()

Reload configuration from file.

Source code in src/core/config_loader.py
154
155
156
157
def reload_config(self):
    """Reload configuration from file."""
    self.load_config()
    logger.info("Configuration reloaded")

Development Workflow

Setup

# Create virtual environment
bash scripts/create_venv.sh

# Install with dev dependencies
pip install -e '.[dev]'

Running the Application

# Start the server
python src/main.py

Testing

# Run tests
pytest

# Run with coverage
pytest --cov=src

Type Safety

The project uses type hints throughout for better IDE support and error catching. Always include type annotations in new code:

def process_sensor_data(
    sensor_id: SensorId,
    value: float,
    timestamp: float
) -> Point:
    """Process raw sensor data."""
    return Point(time=timestamp, value=value)

Documentation Standards

Docstring Style

Use Google-style docstrings for consistency:

def calculate_moving_average(
    values: list[float],
    window_size: int = 10
) -> float:
    \"\"\"Calculate moving average of values.

    Args:
        values: List of numeric values.
        window_size: Number of recent values to average.

    Returns:
        Moving average as float.

    Raises:
        ValueError: If window_size exceeds list length.
    \"\"\"
    if window_size > len(values):
        raise ValueError("Window size exceeds list length")
    return sum(values[-window_size:]) / window_size

Module Docstrings

Every module should have a top-level docstring:

\"\"\"Module description.

Brief explanation of what this module does.
\"\"\"