Generic Component Implementations
¶
Generic "ready-to-use" implementations of common components.
Other generic and largely reusable components can be added to this submodule.
Note
Numpy (and jaxtyping) are the only dependencies; to keep the
abstract_dataloader
's dependencies lightweight and flexible, components
should only be added here if they do not require any additional
dependencies.
abstract_dataloader.generic.ComposedPipeline
¶
Bases: Pipeline[TRaw, TTransformed, TCollated, TProcessed]
, Generic[TRaw, TRawInner, TTransformed, TCollated, TProcessedInner, TProcessed]
Compose transforms sequentially with pre and post transforms.
Type Parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
transform
|
Pipeline[TRawInner, TTransformed, TCollated, TProcessedInner]
|
pipeline to compose. |
required |
pre
|
Transform[TRaw, TRawInner] | None
|
pre-transform to apply on the CPU side; skipped if |
None
|
post
|
Transform[TProcessedInner, TProcessed] | None
|
post-transform to apply on the GPU side; skipped if |
None
|
Source code in src/abstract_dataloader/generic/composition.py
batch
¶
Transform data batch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
TCollated
|
A |
required |
Returns:
Type | Description |
---|---|
TProcessed
|
The |
Source code in src/abstract_dataloader/generic/composition.py
sample
¶
Transform single samples.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
TRaw
|
A single |
required |
Returns:
Type | Description |
---|---|
TTransformed
|
A single |
Source code in src/abstract_dataloader/generic/composition.py
abstract_dataloader.generic.ParallelPipelines
¶
Bases: Pipeline[PRaw, PTransformed, PCollated, PProcessed]
Compose multiple transforms in parallel.
For example, with transforms {"radar": radar_tf, "lidar": lidar_tf, ...}
,
the composed transform performs:
Note
This implies that the type parameters must be dict[str, Any]
, so this
class is parameterized by a separate set of
Composed(Raw|Transformed|Collated|Processed)
types with this bound.
Tip
See torch.ParallelPipelines
for an
implementation which is compatible with nn.Module
-based
pipelines.
Type Parameters
PRaw
,PTransformed
,PCollated
,PProcessed
: seePipeline
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
transforms
|
Pipeline
|
transforms to compose. The key indicates the subkey to apply each transform to. |
{}
|
Source code in src/abstract_dataloader/generic/composition.py
abstract_dataloader.generic.SequencePipeline
¶
Bases: Pipeline[Sequence[TRaw], Sequence[TTransformed], Sequence[TCollated], Sequence[TProcessed]]
Transform which passes an additional sequence axis through.
The given Pipeline
is modified to accept Sequence[...]
for each
data type in its pipeline, and return a list[...]
across the additional
axis, thus "passing through" the axis.
For example, suppose a sequence dataloader reads
[
[Raw[s=0, t=0], Raw[s=0, t=1], ... Raw[s=0, t=n]]
[Raw[s=1, t=0], Raw[s=1, t=1], ... Raw[s=1, t=n]]
...
[Raw[s=b, t=0], Raw[s=b, t=1], ... Raw[s=b, t=n]
]
for sequence length t = 0...n
and batch sample s = 0...b
. For sequence
length t
, the output of the transforms will be batched with the sequence
on the outside:
Type Parameters
TRaw
,TTransformed
,TCollated
,TProcessed
: seePipeline
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
transform
|
Pipeline[TRaw, TTransformed, TCollated, TProcessed]
|
input transform. |
required |
Source code in src/abstract_dataloader/generic/sequence.py
abstract_dataloader.generic.Metadata
dataclass
¶
abstract_dataloader.generic.Window
¶
Bases: Sensor[SampleStack, Metadata]
, Generic[SampleStack, Sample, TMetadata]
Load sensor data across a time window using a sensor transform.
Use this class as a generic transform to give time history to any sensor:
sensor = ... # implements spec.Sensor
with_history = generic.Window(sensor, past=5, future=1, parallel=7)
In this example, 5 past samples, the current sample, and 1 future sample are loaded on every index:
with_history[i] = [
sensor[i], sensor[i + 1], ... sensor[i + 5], sensor[i + 6]]
^
# timestamp for synchronization
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sensor
|
Sensor[Sample, TMetadata]
|
sensor to wrap. |
required |
collate_fn
|
Callable[[list[Sample]], SampleStack] | None
|
collate function for aggregating a list of samples; if not specified, the samples are simply returned as a list. |
None
|
past
|
int
|
number of past samples, in addition to the current sample. Set
to |
0
|
future
|
int
|
number of future samples, in addition to the current sample.
Set to |
0
|
parallel
|
int | None
|
maximum number of samples to load in parallel; if |
None
|
Type Parameters
SampleStack
: a collated series of consecutive samples. Can simply belist[Sample]
.Sample
: single observation sample type.TMetadata
: metadata type for the underlying sensor. Note that theWindow
wrapper doesn't actually have metadata typeTMetadata
; this type is just passed through from the sensor which is wrapped.
Source code in src/abstract_dataloader/generic/sequence.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
|
__getitem__
¶
Fetch measurements from this sensor, by index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index
|
int | integer
|
sample index; note that |
required |
Returns:
Type | Description |
---|---|
SampleStack
|
A set of |
Source code in src/abstract_dataloader/generic/sequence.py
from_partial_sensor
classmethod
¶
from_partial_sensor(sensor: Callable[[str], Sensor[Sample, TMetadata]], collate_fn: Callable[[list[Sample]], SampleStack] | None = None, past: int = 0, future: int = 0, parallel: int | None = None) -> Callable[[str], Window[SampleStack, Sample, TMetadata]]
Partially initialize from partially initialized sensor.
Use this to create windowed sensor constructors which can be
applied to different traces to construct a dataset. For example,
if you have a sensor_constructor
:
sensor_constructor = ...
windowed_sensor_constructor = Window.from_partial_sensor(
sensor_constructor, ...)
# ... somewhere inside the dataset constructor
sensor_instance = windowed_sensor_constructor(path_to_trace)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sensor
|
Callable[[str], Sensor[Sample, TMetadata]]
|
sensor constructor to wrap. |
required |
collate_fn
|
Callable[[list[Sample]], SampleStack] | None
|
collate function for aggregating a list of samples; if not specified, the samples are simply returned as a list. |
None
|
past
|
int
|
number of past samples, in addition to the current sample.
Set to |
0
|
future
|
int
|
number of future samples, in addition to the current
sample. Set to |
0
|
parallel
|
int | None
|
maximum number of samples to load in parallel; if |
None
|
Source code in src/abstract_dataloader/generic/sequence.py
abstract_dataloader.generic.Empty
¶
Bases: Synchronization
Dummy synchronization which does not synchronize sensor pairs.
No samples will be registered, and the trace can only be used as a collection of sensors.
Source code in src/abstract_dataloader/generic/sync.py
abstract_dataloader.generic.Nearest
¶
Bases: Synchronization
Nearest sample synchronization, with respect to a reference sensor.
Applies the following:
- Compute the midpoints between observations between each sensor.
- Find which bin the reference sensor timestamps fall into.
- Calculate the resulting time delta between timestamps. If this exceeds
tol
for any sensor-reference pair, remove this match.
See Synchronization
for protocol details.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reference
|
str
|
reference sensor to synchronize to. |
required |
tol
|
float
|
synchronization time tolerance, in seconds. Setting |
0.1
|
Source code in src/abstract_dataloader/generic/sync.py
__call__
¶
Apply synchronization protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timestamps
|
dict[str, Float64[ndarray, _N]]
|
input sensor timestamps. |
required |
Returns:
Type | Description |
---|---|
dict[str, UInt32[ndarray, M]]
|
Synchronized index map. |
Source code in src/abstract_dataloader/generic/sync.py
abstract_dataloader.generic.Next
¶
Bases: Synchronization
Next sample synchronization, with respect to a reference sensor.
Applies the following:
- Find the start time, defined by the earliest time which is observed by all sensors, and the end time, defined by the last time which is observed by all sensors.
- Truncate the reference sensor's timestamps to this start and end time, and use this as the query timestamps.
- For each time in the query, find the first sample from each sensor which is after this time.
See Synchronization
for protocol details.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reference
|
str
|
reference sensor to synchronize to. |
required |
Source code in src/abstract_dataloader/generic/sync.py
__call__
¶
Apply synchronization protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timestamps
|
dict[str, Float64[ndarray, _N]]
|
input sensor timestamps. |
required |
Returns:
Type | Description |
---|---|
dict[str, UInt32[ndarray, M]]
|
Synchronized index map. |