Skip to content

Processing API

Signal processing pipeline — DAG-based, immutable processing steps.

ProcessingNode dataclass

A single node in the processing DAG.

Each node represents one processing step applied to its parent's data. The root node (step 0) holds the original unprocessed data.

Source code in dig/processing/pipeline.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@dataclass
class ProcessingNode:
    """A single node in the processing DAG.

    Each node represents one processing step applied to its parent's data.
    The root node (step 0) holds the original unprocessed data.
    """

    step_id: int
    name: str
    parameters: dict[str, Any]
    timestamp: float
    parent_id: int | None
    data: np.ndarray | None = field(repr=False)

    def to_dict(self) -> dict[str, Any]:
        """Serialize node metadata (excluding data array) for audit export."""
        return {
            "step_id": self.step_id,
            "name": self.name,
            "parameters": dict(self.parameters),
            "timestamp": self.timestamp,
            "parent_id": self.parent_id,
            "software_version": SOFTWARE_VERSION,
        }

to_dict()

Serialize node metadata (excluding data array) for audit export.

Source code in dig/processing/pipeline.py
41
42
43
44
45
46
47
48
49
50
def to_dict(self) -> dict[str, Any]:
    """Serialize node metadata (excluding data array) for audit export."""
    return {
        "step_id": self.step_id,
        "name": self.name,
        "parameters": dict(self.parameters),
        "timestamp": self.timestamp,
        "parent_id": self.parent_id,
        "software_version": SOFTWARE_VERSION,
    }

ProcessingPipeline

Immutable DAG processing pipeline.

Usage

pipe = ProcessingPipeline(raw_data)

Chain operations — each returns a new pipeline instance

pipe = pipe.process(dewow_fft, sample_rate=1000.0) pipe = pipe.process(remove_background_global) pipe = pipe.process(bandpass_butterworth, low_cut=100e6, high_cut=500e6)

Branch from any previous step (non-linear undo)

branch = pipe.branch(step_id=1) branch = branch.process(different_filter, ...)

Export audit trail

history = pipe.export_history()

Source code in dig/processing/pipeline.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class ProcessingPipeline:
    """Immutable DAG processing pipeline.

    Usage:
        pipe = ProcessingPipeline(raw_data)
        # Chain operations — each returns a new pipeline instance
        pipe = pipe.process(dewow_fft, sample_rate=1000.0)
        pipe = pipe.process(remove_background_global)
        pipe = pipe.process(bandpass_butterworth, low_cut=100e6, high_cut=500e6)

        # Branch from any previous step (non-linear undo)
        branch = pipe.branch(step_id=1)
        branch = branch.process(different_filter, ...)

        # Export audit trail
        history = pipe.export_history()
    """

    def __init__(
        self,
        data: np.ndarray,
        nodes: list[ProcessingNode] | None = None,
        survey_metadata: dict[str, Any] | None = None,
    ):
        self._survey_metadata = survey_metadata or {}

        if nodes is None:
            # Root node — original unprocessed data
            root = ProcessingNode(
                step_id=0,
                name="original",
                parameters={},
                timestamp=time.time(),
                parent_id=None,
                data=np.asarray(data, dtype=np.float64).copy(),
            )
            self._nodes: list[ProcessingNode] = [root]
        else:
            self._nodes = list(nodes)

    @property
    def data(self) -> np.ndarray:
        """Current (latest) data array."""
        return self._nodes[-1].data

    @property
    def current_step(self) -> ProcessingNode:
        """The most recent processing node."""
        return self._nodes[-1]

    @property
    def original_data(self) -> np.ndarray:
        """The original unprocessed data (root node)."""
        return self._nodes[0].data

    @property
    def num_steps(self) -> int:
        return len(self._nodes)

    @property
    def steps(self) -> list[ProcessingNode]:
        """All processing steps in order."""
        return list(self._nodes)

    def process(
        self,
        func: Callable[..., np.ndarray],
        /,
        **kwargs: Any,
    ) -> ProcessingPipeline:
        """Apply a processing function and return a new pipeline instance.

        The function receives the current data as its first argument.
        Additional keyword arguments are passed through and recorded
        in the audit trail.

        Args:
            func: Processing function (data, **kwargs) -> np.ndarray
            **kwargs: Additional arguments passed to the function

        Returns:
            New ProcessingPipeline with the step appended
        """
        current_data = self.data
        func_name = self._resolve_func_name(func)

        # Apply the function
        result = func(current_data, **kwargs)
        result = np.asarray(result, dtype=np.float64)

        # Create new node
        new_node = ProcessingNode(
            step_id=len(self._nodes),
            name=func_name,
            parameters=kwargs,
            timestamp=time.time(),
            parent_id=self.current_step.step_id,
            data=result,
        )

        return ProcessingPipeline(
            data=None,  # not used when nodes provided
            nodes=self._nodes + [new_node],
            survey_metadata=self._survey_metadata,
        )

    def branch(self, step_id: int) -> ProcessingPipeline:
        """Create a branch from a previous step (non-linear undo).

        Args:
            step_id: The step ID to branch from

        Returns:
            New pipeline starting from that step's data
        """
        if step_id < 0 or step_id >= len(self._nodes):
            raise ValueError(f"Step {step_id} not found. Pipeline has {len(self._nodes)} steps.")

        source = self._nodes[step_id]
        return ProcessingPipeline(
            data=source.data.copy(),
            survey_metadata=self._survey_metadata,
        )

    def export_history(self, include_data: bool = False) -> list[dict[str, Any]]:
        """Export the processing history as a list of serializable dicts.

        Args:
            include_data: If True, include array shapes in output

        Returns:
            List of step dicts suitable for JSON serialization
        """
        history = []
        for node in self._nodes:
            entry = node.to_dict()
            if include_data and node.data is not None:
                entry["data_shape"] = list(node.data.shape)
                entry["data_dtype"] = str(node.data.dtype)
            history.append(entry)
        return history

    def export_history_json(self, indent: int = 2) -> str:
        """Export processing history as a JSON string."""
        return json.dumps(self.export_history(include_data=True), indent=indent)

    def get_step(self, step_id: int) -> ProcessingNode:
        """Get a specific processing step by ID."""
        if step_id < 0 or step_id >= len(self._nodes):
            raise ValueError(f"Step {step_id} not found. Pipeline has {len(self._nodes)} steps.")
        return self._nodes[step_id]

    def _resolve_func_name(self, func: Callable) -> str:
        """Get a human-readable name for a function."""
        name = getattr(func, "__name__", None)
        if name:
            return name
        module = getattr(func, "__module__", "")
        return f"{module}.{type(func).__name__}"

    def __repr__(self) -> str:
        return (
            f"ProcessingPipeline(steps={self.num_steps}, "
            f"shape={self.data.shape}, "
            f"current={self.current_step.name})"
        )

current_step property

The most recent processing node.

data property

Current (latest) data array.

original_data property

The original unprocessed data (root node).

steps property

All processing steps in order.

branch(step_id)

Create a branch from a previous step (non-linear undo).

Parameters:

Name Type Description Default
step_id int

The step ID to branch from

required

Returns:

Type Description
ProcessingPipeline

New pipeline starting from that step's data

Source code in dig/processing/pipeline.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def branch(self, step_id: int) -> ProcessingPipeline:
    """Create a branch from a previous step (non-linear undo).

    Args:
        step_id: The step ID to branch from

    Returns:
        New pipeline starting from that step's data
    """
    if step_id < 0 or step_id >= len(self._nodes):
        raise ValueError(f"Step {step_id} not found. Pipeline has {len(self._nodes)} steps.")

    source = self._nodes[step_id]
    return ProcessingPipeline(
        data=source.data.copy(),
        survey_metadata=self._survey_metadata,
    )

export_history(include_data=False)

Export the processing history as a list of serializable dicts.

Parameters:

Name Type Description Default
include_data bool

If True, include array shapes in output

False

Returns:

Type Description
list[dict[str, Any]]

List of step dicts suitable for JSON serialization

Source code in dig/processing/pipeline.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def export_history(self, include_data: bool = False) -> list[dict[str, Any]]:
    """Export the processing history as a list of serializable dicts.

    Args:
        include_data: If True, include array shapes in output

    Returns:
        List of step dicts suitable for JSON serialization
    """
    history = []
    for node in self._nodes:
        entry = node.to_dict()
        if include_data and node.data is not None:
            entry["data_shape"] = list(node.data.shape)
            entry["data_dtype"] = str(node.data.dtype)
        history.append(entry)
    return history

export_history_json(indent=2)

Export processing history as a JSON string.

Source code in dig/processing/pipeline.py
198
199
200
def export_history_json(self, indent: int = 2) -> str:
    """Export processing history as a JSON string."""
    return json.dumps(self.export_history(include_data=True), indent=indent)

get_step(step_id)

Get a specific processing step by ID.

Source code in dig/processing/pipeline.py
202
203
204
205
206
def get_step(self, step_id: int) -> ProcessingNode:
    """Get a specific processing step by ID."""
    if step_id < 0 or step_id >= len(self._nodes):
        raise ValueError(f"Step {step_id} not found. Pipeline has {len(self._nodes)} steps.")
    return self._nodes[step_id]

process(func, /, **kwargs)

Apply a processing function and return a new pipeline instance.

The function receives the current data as its first argument. Additional keyword arguments are passed through and recorded in the audit trail.

Parameters:

Name Type Description Default
func Callable[..., ndarray]

Processing function (data, **kwargs) -> np.ndarray

required
**kwargs Any

Additional arguments passed to the function

{}

Returns:

Type Description
ProcessingPipeline

New ProcessingPipeline with the step appended

Source code in dig/processing/pipeline.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def process(
    self,
    func: Callable[..., np.ndarray],
    /,
    **kwargs: Any,
) -> ProcessingPipeline:
    """Apply a processing function and return a new pipeline instance.

    The function receives the current data as its first argument.
    Additional keyword arguments are passed through and recorded
    in the audit trail.

    Args:
        func: Processing function (data, **kwargs) -> np.ndarray
        **kwargs: Additional arguments passed to the function

    Returns:
        New ProcessingPipeline with the step appended
    """
    current_data = self.data
    func_name = self._resolve_func_name(func)

    # Apply the function
    result = func(current_data, **kwargs)
    result = np.asarray(result, dtype=np.float64)

    # Create new node
    new_node = ProcessingNode(
        step_id=len(self._nodes),
        name=func_name,
        parameters=kwargs,
        timestamp=time.time(),
        parent_id=self.current_step.step_id,
        data=result,
    )

    return ProcessingPipeline(
        data=None,  # not used when nodes provided
        nodes=self._nodes + [new_node],
        survey_metadata=self._survey_metadata,
    )