Skip to content

Data Models

Core data structures and models used throughout the application.

Configuration

Configuration data models for sensors and application settings.

This module defines dataclasses for storing sensor configuration, including default settings, serial communication parameters, calculated dependencies, and global application configuration.

calculatedConfigSensorData dataclass

Bases: defaultConfigSensorData

Configuration for calculated/derived sensors.

Extends defaultConfigSensorData for sensors that compute values based on other sensor measurements rather than direct serial input.

Attributes:

Name Type Description
dependencies list[configSensorData]

List of source sensors required for calculations.

Source code in src/core/models/config_data.py
44
45
46
47
48
49
50
51
52
53
54
@dataclass
class calculatedConfigSensorData(defaultConfigSensorData):
    """Configuration for calculated/derived sensors.

    Extends defaultConfigSensorData for sensors that compute values based on
    other sensor measurements rather than direct serial input.

    Attributes:
        dependencies: List of source sensors required for calculations.
    """
    dependencies: list[configSensorData] = field(default_factory=list[configSensorData])

configData dataclass

Global application configuration.

Attributes:

Name Type Description
sensors Dict[SensorId, defaultConfigSensorData]

Dictionary mapping sensor IDs to their configuration.

emulation list[SensorId]

List of sensor IDs to run in emulation/simulation mode.

Source code in src/core/models/config_data.py
56
57
58
59
60
61
62
63
64
65
@dataclass
class configData:
    """Global application configuration.

    Attributes:
        sensors: Dictionary mapping sensor IDs to their configuration.
        emulation: List of sensor IDs to run in emulation/simulation mode.
    """
    sensors : Dict[SensorId, defaultConfigSensorData]
    emulation : list[SensorId] = field(default_factory=list)

configSensorData dataclass

Bases: defaultConfigSensorData

Sensor configuration with serial communication parameters.

Extends defaultConfigSensorData with physical serial connection details.

Attributes:

Name Type Description
baud int

Serial port baud rate (default: 115200).

serialId str

Serial port identifier/device path.

enabled bool

Whether this sensor is actively monitored.

Source code in src/core/models/config_data.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class configSensorData(defaultConfigSensorData):
    """Sensor configuration with serial communication parameters.

    Extends defaultConfigSensorData with physical serial connection details.

    Attributes:
        baud: Serial port baud rate (default: 115200).
        serialId: Serial port identifier/device path.
        enabled: Whether this sensor is actively monitored.
    """
    baud : int = 115200
    serialId : str = ""
    enabled: bool = True

defaultConfigSensorData dataclass

Base sensor configuration with default values.

Attributes:

Name Type Description
id SensorId

Unique sensor identifier from SensorId enum.

description str

Human-readable description of the sensor.

displayName str

Name to display in user interfaces.

max float

Maximum measurement value for the sensor.

Source code in src/core/models/config_data.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@dataclass
class defaultConfigSensorData:
    """Base sensor configuration with default values.

    Attributes:
        id: Unique sensor identifier from SensorId enum.
        description: Human-readable description of the sensor.
        displayName: Name to display in user interfaces.
        max: Maximum measurement value for the sensor.
    """
    id: SensorId
    description : str = "No description"
    displayName : str = "Unnamed Sensor"
    max: float = 5.0

Sensor Models

Sensor ID enumeration for type-safe sensor references.

SensorId

Bases: Enum

Enumeration of all available sensors.

Source code in src/core/models/sensor_enum.py
 5
 6
 7
 8
 9
10
11
12
13
class SensorId(Enum):
    """Enumeration of all available sensors."""
    FORCE = 0
    DISP_1 = 1
    DISP_2 = 2
    DISP_3 = 3
    DISP_4 = 4
    DISP_5 = 5
    ARC = 6

Sensor Data

Sensor data model.

SensorData dataclass

Data class representing a single sensor reading.

Source code in src/core/models/sensor_data.py
 8
 9
10
11
12
13
14
15
16
17
@dataclass
class SensorData:
    """
    Data class representing a single sensor reading.
    """
    timestamp: float
    sensor_id: SensorId
    value: float
    raw_value: float = 0.0
    offset: float = 0.0

Sensor State

Test state enumeration for tracking test session status.

TestState

Bases: Enum

Enumeration of all possible test states.

Source code in src/core/models/test_state.py
 5
 6
 7
 8
 9
10
class TestState(Enum):
    """Enumeration of all possible test states."""
    NOTHING = "nothing"
    PREPARED = "prepared"
    RUNNING = "running"
    STOPPED = "stopped"  # Test stopped (recording ended) but not yet finalized

Utilities

Circular Buffer

CircularBuffer for efficient time-series data storage with O(1) access and insertion. Optimized for speed: precomputed indices, vectorized operations where possible. Supports reference arrays for different time windows with uniform point spacing.

CircularBuffer

Efficient circular buffer for storing (time, value) tuples. - O(1) insertion at the end - O(1) random access - Fixed capacity, overwrites oldest when full - Optimized for speed: pre-allocated buffer, direct indexing

Source code in src/core/models/circular_buffer.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class CircularBuffer:
    """
    Efficient circular buffer for storing (time, value) tuples.
    - O(1) insertion at the end
    - O(1) random access
    - Fixed capacity, overwrites oldest when full
    - Optimized for speed: pre-allocated buffer, direct indexing
    """

    __slots__ = ('capacity', 'buffer', 'write_index', 'count', '_mask')

    def __init__(self, capacity: int):
        """
        Initialize circular buffer.

        Args:
            capacity: Maximum number of (time, value) tuples to store
        """
        self.capacity = capacity
        self.buffer: List[Tuple[float, float]] = [(0.0, 0.0)] * capacity
        self.write_index = 0  # Next position to write
        self.count = 0  # Number of valid entries (0 to capacity)
        # Precompute mask for power-of-2 capacities (faster modulo)
        self._mask = capacity - 1 if (capacity & (capacity - 1)) == 0 else None

    def append(self, time: float, value: float) -> None:
        """Add a (time, value) tuple to the buffer. O(1)."""
        self.buffer[self.write_index] = (time, value)
        # Fast modulo if power of 2; else standard
        if self._mask is not None:
            self.write_index = (self.write_index + 1) & self._mask
        else:
            self.write_index = (self.write_index + 1) % self.capacity
        if self.count < self.capacity:
            self.count += 1

    def get(self, index: int) -> Tuple[float, float]:
        """
        Get item at logical index (0 = oldest, count-1 = newest).
        O(1) access.
        """
        if index < 0 or index >= self.count:
            raise IndexError(f"Index {index} out of range [0, {self.count})")
        # Direct computation: no loop, no extra calculations
        if self._mask is not None:
            physical_index = (self.write_index - self.count + index) & self._mask
        else:
            physical_index = (self.write_index - self.count + index) % self.capacity
        return self.buffer[physical_index]

    def get_all(self) -> List[Tuple[float, float]]:
        """Get all valid entries in chronological order. Optimized for bulk retrieval."""
        if self.count == 0:
            return []

        # Pre-allocate result list with placeholder tuples
        result: List[Tuple[float, float]] = [(0.0, 0.0)] * self.count

        # If buffer is not wrapped, direct slice is faster
        if self.write_index >= self.count:
            # Simple case: all data is contiguous
            start = self.write_index - self.count
            for i in range(self.count):
                result[i] = self.buffer[start + i]
        else:
            # Wrapped case: split into two parts
            first_part_size = self.capacity - (self.write_index - self.count)
            # First part: from (write_index - count) to end
            start_idx = self.capacity - (self.count - self.write_index)
            for i in range(first_part_size):
                result[i] = self.buffer[start_idx + i]
            # Second part: from start to write_index
            for i in range(self.count - first_part_size):
                result[first_part_size + i] = self.buffer[i]

        return result

    def get_range(self, start_index: int, end_index: int) -> List[Tuple[float, float]]:
        """Get entries from start_index to end_index (exclusive). Optimized retrieval."""
        if start_index < 0 or end_index > self.count or start_index > end_index:
            raise IndexError(f"Invalid range [{start_index}, {end_index}) for buffer of size {self.count}")

        range_size = end_index - start_index
        if range_size == 0:
            return []

        result: List[Tuple[float, float]] = [(0.0, 0.0)] * range_size

        # Compute physical indices for start and end
        if self._mask is not None:
            phys_start = (self.write_index - self.count + start_index) & self._mask
            phys_end = (self.write_index - self.count + end_index) & self._mask
        else:
            phys_start = (self.write_index - self.count + start_index) % self.capacity
            phys_end = (self.write_index - self.count + end_index) % self.capacity

        # Check if range wraps
        if phys_start <= phys_end or (phys_start > phys_end and phys_end <= phys_start):
            # No wrap: contiguous in buffer
            if phys_start < phys_end:
                for i in range(range_size):
                    result[i] = self.buffer[phys_start + i]
            else:
                # Wrapped: first part from phys_start to end, then from 0 to phys_end
                first_part = self.capacity - phys_start
                for i in range(first_part):
                    result[i] = self.buffer[phys_start + i]
                for i in range(phys_end):
                    result[first_part + i] = self.buffer[i]
        else:
            # Simple wrap case
            for i in range(range_size):
                if self._mask is not None:
                    idx = (phys_start + i) & self._mask
                else:
                    idx = (phys_start + i) % self.capacity
                result[i] = self.buffer[idx]

        return result

    def is_full(self) -> bool:
        """Check if buffer is at capacity."""
        return self.count == self.capacity

    def size(self) -> int:
        """Get number of valid entries."""
        return self.count

    def clear(self) -> None:
        """Clear all entries."""
        self.write_index = 0
        self.count = 0

__init__(capacity)

Initialize circular buffer.

Parameters:

Name Type Description Default
capacity int

Maximum number of (time, value) tuples to store

required
Source code in src/core/models/circular_buffer.py
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, capacity: int):
    """
    Initialize circular buffer.

    Args:
        capacity: Maximum number of (time, value) tuples to store
    """
    self.capacity = capacity
    self.buffer: List[Tuple[float, float]] = [(0.0, 0.0)] * capacity
    self.write_index = 0  # Next position to write
    self.count = 0  # Number of valid entries (0 to capacity)
    # Precompute mask for power-of-2 capacities (faster modulo)
    self._mask = capacity - 1 if (capacity & (capacity - 1)) == 0 else None

append(time, value)

Add a (time, value) tuple to the buffer. O(1).

Source code in src/core/models/circular_buffer.py
48
49
50
51
52
53
54
55
56
57
def append(self, time: float, value: float) -> None:
    """Add a (time, value) tuple to the buffer. O(1)."""
    self.buffer[self.write_index] = (time, value)
    # Fast modulo if power of 2; else standard
    if self._mask is not None:
        self.write_index = (self.write_index + 1) & self._mask
    else:
        self.write_index = (self.write_index + 1) % self.capacity
    if self.count < self.capacity:
        self.count += 1

clear()

Clear all entries.

Source code in src/core/models/circular_buffer.py
151
152
153
154
def clear(self) -> None:
    """Clear all entries."""
    self.write_index = 0
    self.count = 0

get(index)

Get item at logical index (0 = oldest, count-1 = newest). O(1) access.

Source code in src/core/models/circular_buffer.py
59
60
61
62
63
64
65
66
67
68
69
70
71
def get(self, index: int) -> Tuple[float, float]:
    """
    Get item at logical index (0 = oldest, count-1 = newest).
    O(1) access.
    """
    if index < 0 or index >= self.count:
        raise IndexError(f"Index {index} out of range [0, {self.count})")
    # Direct computation: no loop, no extra calculations
    if self._mask is not None:
        physical_index = (self.write_index - self.count + index) & self._mask
    else:
        physical_index = (self.write_index - self.count + index) % self.capacity
    return self.buffer[physical_index]

get_all()

Get all valid entries in chronological order. Optimized for bulk retrieval.

Source code in src/core/models/circular_buffer.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_all(self) -> List[Tuple[float, float]]:
    """Get all valid entries in chronological order. Optimized for bulk retrieval."""
    if self.count == 0:
        return []

    # Pre-allocate result list with placeholder tuples
    result: List[Tuple[float, float]] = [(0.0, 0.0)] * self.count

    # If buffer is not wrapped, direct slice is faster
    if self.write_index >= self.count:
        # Simple case: all data is contiguous
        start = self.write_index - self.count
        for i in range(self.count):
            result[i] = self.buffer[start + i]
    else:
        # Wrapped case: split into two parts
        first_part_size = self.capacity - (self.write_index - self.count)
        # First part: from (write_index - count) to end
        start_idx = self.capacity - (self.count - self.write_index)
        for i in range(first_part_size):
            result[i] = self.buffer[start_idx + i]
        # Second part: from start to write_index
        for i in range(self.count - first_part_size):
            result[first_part_size + i] = self.buffer[i]

    return result

get_range(start_index, end_index)

Get entries from start_index to end_index (exclusive). Optimized retrieval.

Source code in src/core/models/circular_buffer.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_range(self, start_index: int, end_index: int) -> List[Tuple[float, float]]:
    """Get entries from start_index to end_index (exclusive). Optimized retrieval."""
    if start_index < 0 or end_index > self.count or start_index > end_index:
        raise IndexError(f"Invalid range [{start_index}, {end_index}) for buffer of size {self.count}")

    range_size = end_index - start_index
    if range_size == 0:
        return []

    result: List[Tuple[float, float]] = [(0.0, 0.0)] * range_size

    # Compute physical indices for start and end
    if self._mask is not None:
        phys_start = (self.write_index - self.count + start_index) & self._mask
        phys_end = (self.write_index - self.count + end_index) & self._mask
    else:
        phys_start = (self.write_index - self.count + start_index) % self.capacity
        phys_end = (self.write_index - self.count + end_index) % self.capacity

    # Check if range wraps
    if phys_start <= phys_end or (phys_start > phys_end and phys_end <= phys_start):
        # No wrap: contiguous in buffer
        if phys_start < phys_end:
            for i in range(range_size):
                result[i] = self.buffer[phys_start + i]
        else:
            # Wrapped: first part from phys_start to end, then from 0 to phys_end
            first_part = self.capacity - phys_start
            for i in range(first_part):
                result[i] = self.buffer[phys_start + i]
            for i in range(phys_end):
                result[first_part + i] = self.buffer[i]
    else:
        # Simple wrap case
        for i in range(range_size):
            if self._mask is not None:
                idx = (phys_start + i) & self._mask
            else:
                idx = (phys_start + i) % self.capacity
            result[i] = self.buffer[idx]

    return result

is_full()

Check if buffer is at capacity.

Source code in src/core/models/circular_buffer.py
143
144
145
def is_full(self) -> bool:
    """Check if buffer is at capacity."""
    return self.count == self.capacity

size()

Get number of valid entries.

Source code in src/core/models/circular_buffer.py
147
148
149
def size(self) -> int:
    """Get number of valid entries."""
    return self.count

DisplayDuration

Bases: Enum

Enum for display durations in seconds

Source code in src/core/models/circular_buffer.py
10
11
12
13
14
15
16
17
18
19
20
class DisplayDuration(Enum):
    """Enum for display durations in seconds"""
    DURATION_30S = 30
    DURATION_1MIN = 60
    DURATION_2MIN = 120
    DURATION_5MIN = 300
    DURATION_10MIN = 600

    def value_seconds(self) -> int:
        """Get duration in seconds"""
        return self.value

value_seconds()

Get duration in seconds

Source code in src/core/models/circular_buffer.py
18
19
20
def value_seconds(self) -> int:
    """Get duration in seconds"""
    return self.value

SensorDataStorage

Stores time-series data for all sensors with circular buffers and reference arrays. Heavily optimized for speed with precomputed offsets and direct indexing.

Each sensor gets indexed by sensor_id.value for O(1) access. All reference arrays have the SAME number of points but different spacing.

Source code in src/core/models/circular_buffer.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class SensorDataStorage:
    """
    Stores time-series data for all sensors with circular buffers and reference arrays.
    Heavily optimized for speed with precomputed offsets and direct indexing.

    Each sensor gets indexed by sensor_id.value for O(1) access.
    All reference arrays have the SAME number of points but different spacing.
    """

    __slots__ = ('sensor_count', 'sampling_frequency', 'total_capacity', 'buffers',
                 'reference_points_count', 'reference_arrays', 'reference_offsets',
                 '_duration_to_enum', '_precomputed_windows')

    def __init__(self, sensor_count: int, sampling_frequency: float):
        """
        Initialize sensor data storage with precomputed structures for speed.

        Args:
            sensor_count: Number of sensors (e.g., 7 for FORCE, DISP_1, DISP_2, DISP_3, DISP_4, DISP_5, ARC)
            sampling_frequency: Sampling frequency in Hz (points per second)
        """
        self.sensor_count = sensor_count
        self.sampling_frequency = sampling_frequency

        # Calculate capacity: frequency × 30s × 20 = 10 minutes of data
        points_per_30s = int(sampling_frequency * 30)
        total_capacity = points_per_30s * 20
        self.total_capacity = total_capacity

        # Create circular buffers for each sensor
        self.buffers: List[CircularBuffer] = [
            CircularBuffer(total_capacity) for _ in range(sensor_count)
        ]

        # Number of reference points (same for all durations)
        self.reference_points_count = points_per_30s

        # Precompute all window information at init time
        self.reference_arrays = {}
        self.reference_offsets = {}
        self._duration_to_enum = {}
        self._precomputed_windows = {}

        for duration in DisplayDuration:
            duration_seconds = duration.value_seconds()
            window_seconds = duration_seconds

            # Store duration -> seconds mapping for O(1) lookup
            self._duration_to_enum[window_seconds] = duration

            # Spacing info
            spacing = duration_seconds / self.reference_points_count
            self.reference_arrays[duration] = {
                "duration": duration_seconds,
                "points": self.reference_points_count,
                "spacing": spacing,
            }

            # Precompute offsets and window info
            max_points_in_window = int(self.sampling_frequency * duration_seconds)
            target_points = self.reference_points_count

            if target_points <= 0 or max_points_in_window <= 0:
                self.reference_offsets[duration] = []
                self._precomputed_windows[window_seconds] = None
            else:
                # Precompute offsets
                step = max_points_in_window / target_points
                offsets: List[int] = []
                for i in range(target_points):
                    idx = int(i * step)
                    if i == target_points - 1:
                        idx = max_points_in_window - 1
                    offsets.append(idx)

                self.reference_offsets[duration] = offsets

                # Store precomputed window info
                self._precomputed_windows[window_seconds] = {
                    "max_points": max_points_in_window,
                    "offsets": offsets,
                    "duration_enum": duration,
                }

    def append(self, sensor_idx: int, time: float, value: float) -> None:
        """Add a data point to a sensor's buffer. O(1)."""
        if sensor_idx < 0 or sensor_idx >= self.sensor_count:
            raise ValueError(f"Invalid sensor index {sensor_idx}")
        self.buffers[sensor_idx].append(time, value)

    def get_data(self, sensor_idx: int) -> List[Tuple[float, float]]:
        """Get all data points for a sensor."""
        if sensor_idx < 0 or sensor_idx >= self.sensor_count:
            raise ValueError(f"Invalid sensor index {sensor_idx}")
        return self.buffers[sensor_idx].get_all()

    def get_data_for_duration(
        self, sensor_idx: int, duration: DisplayDuration
    ) -> List[Tuple[float, float]]:
        """
        Get data points for a specific display duration with uniform spacing.
        Returns up to reference_points_count points evenly spaced across the duration.
        """
        if duration not in DisplayDuration:
            raise ValueError(f"Invalid duration: {duration}")
        return self.get_data_for_window_seconds(sensor_idx, duration.value_seconds())

    def get_data_for_window_seconds(self, sensor_idx: int, window_seconds: int) -> List[Tuple[float, float]]:
        """
        Retrieve data for a given time window (in seconds) with uniform spacing.
        Optimized: uses precomputed offsets and direct indexing, no dynamic computation.
        """
        if sensor_idx < 0 or sensor_idx >= self.sensor_count:
            raise ValueError(f"Invalid sensor index {sensor_idx}")

        # Fast lookup with precomputed window info
        window_info = self._precomputed_windows.get(window_seconds)
        if window_info is None:
            raise ValueError(f"Unsupported window_seconds: {window_seconds}")

        buffer = self.buffers[sensor_idx]
        if buffer.size() == 0:
            return []

        max_points_in_window = window_info["max_points"]
        offsets = window_info["offsets"]

        available_points = min(buffer.size(), max_points_in_window)
        target_points = self.reference_points_count

        # Case 1: Not enough points yet - fallback to partial sampling
        if available_points <= target_points:
            start_idx = buffer.size() - available_points
            return buffer.get_range(start_idx, buffer.size())

        # Case 2: Full window available - use precomputed offsets (fast path)
        if available_points >= max_points_in_window:
            window_start_idx = buffer.size() - max_points_in_window
            result: List[Tuple[float, float]] = [(0.0, 0.0)] * len(offsets)

            # Direct indexed access using precomputed offsets
            for i, off in enumerate(offsets):
                result[i] = buffer.get(window_start_idx + off)
            return result

        # Case 3: Partial window - subsample available points
        step = available_points / target_points
        start_idx = buffer.size() - available_points
        result: List[Tuple[float, float]] = [(0.0, 0.0)] * target_points

        for i in range(target_points):
            idx = int(start_idx + i * step)
            if i == target_points - 1:
                idx = start_idx + available_points - 1
            result[i] = buffer.get(idx)

        return result

    def clear_sensor(self, sensor_idx: int) -> None:
        """Clear data for a specific sensor."""
        if sensor_idx < 0 or sensor_idx >= self.sensor_count:
            raise ValueError(f"Invalid sensor index {sensor_idx}")
        self.buffers[sensor_idx].clear()

    def clear_all(self) -> None:
        """Clear all sensor data."""
        for buffer in self.buffers:
            buffer.clear()

    def get_sensor_buffer_stats(self, sensor_idx: int) -> dict:
        """Get statistics about a sensor's buffer."""
        if sensor_idx < 0 or sensor_idx >= self.sensor_count:
            raise ValueError(f"Invalid sensor index {sensor_idx}")
        buffer = self.buffers[sensor_idx]
        return {
            "capacity": buffer.capacity,
            "current_count": buffer.count,
            "is_full": buffer.is_full(),
            "utilization": buffer.count / buffer.capacity,
        }

__init__(sensor_count, sampling_frequency)

Initialize sensor data storage with precomputed structures for speed.

Parameters:

Name Type Description Default
sensor_count int

Number of sensors (e.g., 7 for FORCE, DISP_1, DISP_2, DISP_3, DISP_4, DISP_5, ARC)

required
sampling_frequency float

Sampling frequency in Hz (points per second)

required
Source code in src/core/models/circular_buffer.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def __init__(self, sensor_count: int, sampling_frequency: float):
    """
    Initialize sensor data storage with precomputed structures for speed.

    Args:
        sensor_count: Number of sensors (e.g., 7 for FORCE, DISP_1, DISP_2, DISP_3, DISP_4, DISP_5, ARC)
        sampling_frequency: Sampling frequency in Hz (points per second)
    """
    self.sensor_count = sensor_count
    self.sampling_frequency = sampling_frequency

    # Calculate capacity: frequency × 30s × 20 = 10 minutes of data
    points_per_30s = int(sampling_frequency * 30)
    total_capacity = points_per_30s * 20
    self.total_capacity = total_capacity

    # Create circular buffers for each sensor
    self.buffers: List[CircularBuffer] = [
        CircularBuffer(total_capacity) for _ in range(sensor_count)
    ]

    # Number of reference points (same for all durations)
    self.reference_points_count = points_per_30s

    # Precompute all window information at init time
    self.reference_arrays = {}
    self.reference_offsets = {}
    self._duration_to_enum = {}
    self._precomputed_windows = {}

    for duration in DisplayDuration:
        duration_seconds = duration.value_seconds()
        window_seconds = duration_seconds

        # Store duration -> seconds mapping for O(1) lookup
        self._duration_to_enum[window_seconds] = duration

        # Spacing info
        spacing = duration_seconds / self.reference_points_count
        self.reference_arrays[duration] = {
            "duration": duration_seconds,
            "points": self.reference_points_count,
            "spacing": spacing,
        }

        # Precompute offsets and window info
        max_points_in_window = int(self.sampling_frequency * duration_seconds)
        target_points = self.reference_points_count

        if target_points <= 0 or max_points_in_window <= 0:
            self.reference_offsets[duration] = []
            self._precomputed_windows[window_seconds] = None
        else:
            # Precompute offsets
            step = max_points_in_window / target_points
            offsets: List[int] = []
            for i in range(target_points):
                idx = int(i * step)
                if i == target_points - 1:
                    idx = max_points_in_window - 1
                offsets.append(idx)

            self.reference_offsets[duration] = offsets

            # Store precomputed window info
            self._precomputed_windows[window_seconds] = {
                "max_points": max_points_in_window,
                "offsets": offsets,
                "duration_enum": duration,
            }

append(sensor_idx, time, value)

Add a data point to a sensor's buffer. O(1).

Source code in src/core/models/circular_buffer.py
241
242
243
244
245
def append(self, sensor_idx: int, time: float, value: float) -> None:
    """Add a data point to a sensor's buffer. O(1)."""
    if sensor_idx < 0 or sensor_idx >= self.sensor_count:
        raise ValueError(f"Invalid sensor index {sensor_idx}")
    self.buffers[sensor_idx].append(time, value)

clear_all()

Clear all sensor data.

Source code in src/core/models/circular_buffer.py
321
322
323
324
def clear_all(self) -> None:
    """Clear all sensor data."""
    for buffer in self.buffers:
        buffer.clear()

clear_sensor(sensor_idx)

Clear data for a specific sensor.

Source code in src/core/models/circular_buffer.py
315
316
317
318
319
def clear_sensor(self, sensor_idx: int) -> None:
    """Clear data for a specific sensor."""
    if sensor_idx < 0 or sensor_idx >= self.sensor_count:
        raise ValueError(f"Invalid sensor index {sensor_idx}")
    self.buffers[sensor_idx].clear()

get_data(sensor_idx)

Get all data points for a sensor.

Source code in src/core/models/circular_buffer.py
247
248
249
250
251
def get_data(self, sensor_idx: int) -> List[Tuple[float, float]]:
    """Get all data points for a sensor."""
    if sensor_idx < 0 or sensor_idx >= self.sensor_count:
        raise ValueError(f"Invalid sensor index {sensor_idx}")
    return self.buffers[sensor_idx].get_all()

get_data_for_duration(sensor_idx, duration)

Get data points for a specific display duration with uniform spacing. Returns up to reference_points_count points evenly spaced across the duration.

Source code in src/core/models/circular_buffer.py
253
254
255
256
257
258
259
260
261
262
def get_data_for_duration(
    self, sensor_idx: int, duration: DisplayDuration
) -> List[Tuple[float, float]]:
    """
    Get data points for a specific display duration with uniform spacing.
    Returns up to reference_points_count points evenly spaced across the duration.
    """
    if duration not in DisplayDuration:
        raise ValueError(f"Invalid duration: {duration}")
    return self.get_data_for_window_seconds(sensor_idx, duration.value_seconds())

get_data_for_window_seconds(sensor_idx, window_seconds)

Retrieve data for a given time window (in seconds) with uniform spacing. Optimized: uses precomputed offsets and direct indexing, no dynamic computation.

Source code in src/core/models/circular_buffer.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def get_data_for_window_seconds(self, sensor_idx: int, window_seconds: int) -> List[Tuple[float, float]]:
    """
    Retrieve data for a given time window (in seconds) with uniform spacing.
    Optimized: uses precomputed offsets and direct indexing, no dynamic computation.
    """
    if sensor_idx < 0 or sensor_idx >= self.sensor_count:
        raise ValueError(f"Invalid sensor index {sensor_idx}")

    # Fast lookup with precomputed window info
    window_info = self._precomputed_windows.get(window_seconds)
    if window_info is None:
        raise ValueError(f"Unsupported window_seconds: {window_seconds}")

    buffer = self.buffers[sensor_idx]
    if buffer.size() == 0:
        return []

    max_points_in_window = window_info["max_points"]
    offsets = window_info["offsets"]

    available_points = min(buffer.size(), max_points_in_window)
    target_points = self.reference_points_count

    # Case 1: Not enough points yet - fallback to partial sampling
    if available_points <= target_points:
        start_idx = buffer.size() - available_points
        return buffer.get_range(start_idx, buffer.size())

    # Case 2: Full window available - use precomputed offsets (fast path)
    if available_points >= max_points_in_window:
        window_start_idx = buffer.size() - max_points_in_window
        result: List[Tuple[float, float]] = [(0.0, 0.0)] * len(offsets)

        # Direct indexed access using precomputed offsets
        for i, off in enumerate(offsets):
            result[i] = buffer.get(window_start_idx + off)
        return result

    # Case 3: Partial window - subsample available points
    step = available_points / target_points
    start_idx = buffer.size() - available_points
    result: List[Tuple[float, float]] = [(0.0, 0.0)] * target_points

    for i in range(target_points):
        idx = int(start_idx + i * step)
        if i == target_points - 1:
            idx = start_idx + available_points - 1
        result[i] = buffer.get(idx)

    return result

get_sensor_buffer_stats(sensor_idx)

Get statistics about a sensor's buffer.

Source code in src/core/models/circular_buffer.py
326
327
328
329
330
331
332
333
334
335
336
def get_sensor_buffer_stats(self, sensor_idx: int) -> dict:
    """Get statistics about a sensor's buffer."""
    if sensor_idx < 0 or sensor_idx >= self.sensor_count:
        raise ValueError(f"Invalid sensor index {sensor_idx}")
    buffer = self.buffers[sensor_idx]
    return {
        "capacity": buffer.capacity,
        "current_count": buffer.count,
        "is_full": buffer.is_full(),
        "utilization": buffer.count / buffer.capacity,
    }

Test Data

TestMetaData dataclass

Data class representing metadata for a test. Compatible with both dataclass operations and Pydantic validation.

Source code in src/core/models/test_data.py
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@dataclass
class TestMetaData:
    """
    Data class representing metadata for a test.
    Compatible with both dataclass operations and Pydantic validation.
    """
    test_id: str
    date: str
    operator_name: str
    specimen_code: str
    dim_length: float = 0.0
    dim_height: float = 0.0
    dim_width: float = 0.0
    loading_mode: str = ""
    sensor_spacing: float = 0.0
    ext_sensor_spacing: float = 0.0
    ext_support_spacing: float = 0.0
    load_point_spacing: float = 0.0