Skip to content

Parsers API

Format parsers for geophysical instruments.

DT1File

Sensors & Software .DT1/.HD format handler.

The .HD ASCII header provides survey-level metadata (samples per trace, time window, channels). The .DT1 binary file contains interleaved 128-byte trace headers and trace data.

Usage

dt1 = DT1File("survey.dt1") data = dt1.traces # (num_traces, samples_per_trace) array pos = dt1.trace_positions # odometer position per trace (m)

Source code in dig/parsers/dt1.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class DT1File:
    """Sensors & Software .DT1/.HD format handler.

    The .HD ASCII header provides survey-level metadata (samples per trace,
    time window, channels). The .DT1 binary file contains interleaved
    128-byte trace headers and trace data.

    Usage:
        dt1 = DT1File("survey.dt1")
        data = dt1.traces              # (num_traces, samples_per_trace) array
        pos = dt1.trace_positions      # odometer position per trace (m)
    """

    def __init__(self, dt1_path: str | Path, hd_path: str | Path | None = None):
        self.dt1_path = Path(dt1_path)
        if not self.dt1_path.exists():
            raise FileNotFoundError(f"DT1 file not found: {dt1_path}")

        # Auto-discover .HD file
        if hd_path is None:
            candidates = [
                self.dt1_path.with_suffix(".HD"),
                self.dt1_path.with_suffix(".hd"),
            ]
            for c in candidates:
                if c.exists():
                    hd_path = c
                    break
        self.hd_path = Path(hd_path) if hd_path and Path(hd_path).exists() else None

        # Read .HD content for Rust parser
        hd_content: str | None = None
        self._hd_metadata: dict[str, str] = {}
        if self.hd_path:
            hd_content = self.hd_path.read_text()
            self._parse_hd(hd_content)

        # Parse via Rust backend (passes .HD content for metadata)
        self._survey: PySurvey = parse_dt1(str(self.dt1_path), hd_content)

        # Convert trace data from Rust to numpy array
        self._build_traces()

    def _parse_hd(self, content: str) -> None:
        """Parse .HD key=value pairs into a dict."""
        for line in content.splitlines():
            line = line.strip()
            if not line or line.startswith("#") or line.startswith(";"):
                continue
            if "=" in line:
                key, _, val = line.partition("=")
                self._hd_metadata[key.strip()] = val.strip()

    def _build_traces(self) -> None:
        """Convert raw trace data bytes to numpy array."""
        raw = bytes(self._survey.trace_data)
        dtype = self._numpy_dtype
        expected_bytes = self.num_traces * self.samples_per_trace * dtype.itemsize

        if len(raw) == 0:
            self._traces = np.empty((0, 0), dtype=dtype)
        elif len(raw) >= expected_bytes:
            self._traces = np.frombuffer(raw[:expected_bytes], dtype=dtype).reshape(
                self.num_traces, self.samples_per_trace
            )
        else:
            # Partial data — pad with zeros
            arr = np.frombuffer(raw, dtype=dtype)
            total = self.num_traces * self.samples_per_trace
            if len(arr) < total:
                arr = np.pad(arr, (0, total - len(arr)))
            self._traces = arr.reshape(self.num_traces, self.samples_per_trace)

    @property
    def _numpy_dtype(self) -> np.dtype:
        mapping = {8: np.dtype(np.uint8), 16: np.dtype(np.int16), 32: np.dtype(np.int32)}
        return mapping.get(self._survey.bits_per_sample, np.dtype(np.int16))

    @property
    def traces(self) -> np.ndarray:
        """2D array of shape (num_traces, samples_per_trace)."""
        return self._traces

    @property
    def num_traces(self) -> int:
        return self._survey.num_traces

    @property
    def samples_per_trace(self) -> int:
        return self._survey.samples_per_trace

    @property
    def bits_per_sample(self) -> int:
        return self._survey.bits_per_sample

    @property
    def channels(self) -> int:
        return self._survey.channels

    @property
    def time_window_ns(self) -> float:
        return self._survey.time_window_ns

    @property
    def sample_interval_ns(self) -> float:
        return self._survey.sample_interval_ns

    @property
    def time_zero_ns(self) -> float:
        return self._survey.time_zero_ns

    @property
    def trace_positions(self) -> list[float]:
        """Odometer position (m) for each trace."""
        return list(self._survey.trace_positions)

    @property
    def trace_time_zeros(self) -> list[float]:
        """Time-zero offset (ns) for each trace."""
        return list(self._survey.trace_time_zeros)

    @property
    def trace_elevations(self) -> list[float]:
        """GPS elevation (m) for each trace."""
        return list(self._survey.trace_elevations)

    @property
    def hd_metadata(self) -> dict[str, str]:
        """Raw .HD key-value pairs."""
        return dict(self._hd_metadata)

    def get_trace(self, index: int) -> np.ndarray:
        """Return a single trace as a 1D array."""
        if index < 0 or index >= self.num_traces:
            raise IndexError(f"Trace index {index} out of range (0-{self.num_traces - 1})")
        return self._traces[index, :]

    def __repr__(self) -> str:
        return (
            f"DT1File(traces={self.num_traces}, "
            f"samples={self.samples_per_trace}, "
            f"bits={self.bits_per_sample})"
        )

hd_metadata property

Raw .HD key-value pairs.

trace_elevations property

GPS elevation (m) for each trace.

trace_positions property

Odometer position (m) for each trace.

trace_time_zeros property

Time-zero offset (ns) for each trace.

traces property

2D array of shape (num_traces, samples_per_trace).

get_trace(index)

Return a single trace as a 1D array.

Source code in dig/parsers/dt1.py
141
142
143
144
145
def get_trace(self, index: int) -> np.ndarray:
    """Return a single trace as a 1D array."""
    if index < 0 or index >= self.num_traces:
        raise IndexError(f"Trace index {index} out of range (0-{self.num_traces - 1})")
    return self._traces[index, :]

DZTFile

GSSI .DZT format handler.

Wraps the Rust DZT parser and provides NumPy array access to trace data via memory-mapped I/O. Supports .DZG sidecar GPS files for georeferencing.

Usage

dzt = DZTFile("survey.dzt") data = dzt.traces # (num_traces, samples_per_trace) array gps = dzt.gps_positions # [(trace_idx, lat, lon, alt), ...]

Source code in dig/parsers/dzt.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class DZTFile:
    """GSSI .DZT format handler.

    Wraps the Rust DZT parser and provides NumPy array access
    to trace data via memory-mapped I/O. Supports .DZG sidecar
    GPS files for georeferencing.

    Usage:
        dzt = DZTFile("survey.dzt")
        data = dzt.traces          # (num_traces, samples_per_trace) array
        gps = dzt.gps_positions    # [(trace_idx, lat, lon, alt), ...]
    """

    def __init__(self, path: str | Path, dzg_path: str | Path | None = None):
        self.path = Path(path)
        if not self.path.exists():
            raise FileNotFoundError(f"DZT file not found: {path}")

        # Parse header via Rust backend
        self._survey: PySurvey = parse_dzt(str(self.path))

        # Memory-map the raw data for trace access
        self._data = np.memmap(
            self.path,
            dtype=self._numpy_dtype,
            mode="r",
            offset=self._survey.header_offset,
        )
        self._data = self._data.reshape((self._survey.num_traces, self._survey.samples_per_trace))

        # Parse optional .DZG sidecar GPS file
        self._gps_positions: list[tuple[int, float, float, float]] = []
        if dzg_path:
            self._load_dzg(dzg_path)
        else:
            # Auto-discover .DZG sidecar (same stem, .dzg extension)
            auto_dzg = self.path.with_suffix(".dzg")
            if auto_dzg.exists():
                self._load_dzg(auto_dzg)

    def _load_dzg(self, dzg_path: str | Path) -> None:
        """Load GPS positions from a .DZG sidecar file."""
        self._gps_positions = parse_dzg(str(dzg_path))

    @property
    def _numpy_dtype(self) -> np.dtype:
        mapping = {8: np.dtype(np.uint8), 16: np.dtype(np.int16), 32: np.dtype(np.int32)}
        return mapping.get(self._survey.bits_per_sample, np.dtype(np.int16))

    @property
    def traces(self) -> np.ndarray:
        """2D array of shape (num_traces, samples_per_trace)."""
        return self._data

    @property
    def num_traces(self) -> int:
        return self._survey.num_traces

    @property
    def samples_per_trace(self) -> int:
        return self._survey.samples_per_trace

    @property
    def bits_per_sample(self) -> int:
        return self._survey.bits_per_sample

    @property
    def channels(self) -> int:
        return self._survey.channels

    @property
    def header_offset(self) -> int:
        return self._survey.header_offset

    @property
    def time_window_ns(self) -> float:
        return self._survey.time_window_ns

    @property
    def sample_interval_ns(self) -> float:
        return self._survey.sample_interval_ns

    @property
    def traces_per_second(self) -> float:
        return self._survey.traces_per_second

    @property
    def traces_per_meter(self) -> float:
        return self._survey.traces_per_meter

    @property
    def time_zero_ns(self) -> float:
        return self._survey.time_zero_ns

    @property
    def gps_positions(self) -> list[tuple[int, float, float, float]]:
        """GPS positions from .DZG sidecar: [(trace_idx, lat, lon, alt), ...]."""
        return list(self._gps_positions)

    @property
    def has_gps(self) -> bool:
        return len(self._gps_positions) > 0

    def get_trace(self, index: int) -> np.ndarray:
        """Return a single trace as a 1D array."""
        if index < 0 or index >= self.num_traces:
            raise IndexError(f"Trace index {index} out of range (0-{self.num_traces - 1})")
        return self._data[index, :]

    def __repr__(self) -> str:
        gps_info = f", gps={len(self._gps_positions)} fixes" if self.has_gps else ""
        return (
            f"DZTFile(traces={self.num_traces}, "
            f"samples={self.samples_per_trace}, "
            f"bits={self.bits_per_sample}"
            f"{gps_info})"
        )

gps_positions property

GPS positions from .DZG sidecar: [(trace_idx, lat, lon, alt), ...].

traces property

2D array of shape (num_traces, samples_per_trace).

get_trace(index)

Return a single trace as a 1D array.

Source code in dig/parsers/dzt.py
113
114
115
116
117
def get_trace(self, index: int) -> np.ndarray:
    """Return a single trace as a 1D array."""
    if index < 0 or index >= self.num_traces:
        raise IndexError(f"Trace index {index} out of range (0-{self.num_traces - 1})")
    return self._data[index, :]

MagnetometryFile

Bartington/Geoscan .dat/.grd format handler.

Parses the .grd ASCII header for grid metadata and the .dat binary file for magnetic gradient measurements. Handles zig-zag traverse reversal and void value detection.

Usage

mag = MagnetometryFile("survey.dat", "survey.grd") data = mag.data # (rows, cols) int16 array grid = mag.grid_metadata # dict of .grd parameters

Source code in dig/parsers/magnetometry.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class MagnetometryFile:
    """Bartington/Geoscan .dat/.grd format handler.

    Parses the .grd ASCII header for grid metadata and the .dat
    binary file for magnetic gradient measurements. Handles zig-zag
    traverse reversal and void value detection.

    Usage:
        mag = MagnetometryFile("survey.dat", "survey.grd")
        data = mag.data              # (rows, cols) int16 array
        grid = mag.grid_metadata     # dict of .grd parameters
    """

    def __init__(self, dat_path: str | Path, grd_path: str | Path | None = None):
        self.dat_path = Path(dat_path)
        if not self.dat_path.exists():
            raise FileNotFoundError(f"DAT file not found: {dat_path}")

        # Auto-discover .grd file
        if grd_path is None:
            candidates = [
                self.dat_path.with_suffix(".grd"),
                self.dat_path.with_suffix(".GRD"),
            ]
            grd_path = next((p for p in candidates if p.exists()), None)

        self.grd_path = Path(grd_path) if grd_path else None
        if not self.grd_path or not self.grd_path.exists():
            raise FileNotFoundError(f"GRD file not found for {dat_path}")

        # Parse via Rust backend
        self._survey: PySurvey = parse_magnetometry(str(self.dat_path), str(self.grd_path))

        # Build numpy grid from trace_data
        self._build_grid()

        # Parse raw .grd metadata for dict access
        self._raw_metadata: dict[str, str] = {}
        self._parse_raw_grd()

    def _build_grid(self) -> None:
        """Convert raw trace data bytes to 2D numpy array."""
        raw = bytes(self._survey.trace_data)
        rows = self._survey.num_traces
        cols = self._survey.samples_per_trace
        expected = rows * cols * 2  # int16

        if len(raw) >= expected:
            self._data = np.frombuffer(raw[:expected], dtype=np.int16).reshape(rows, cols)
        else:
            self._data = np.zeros((rows, cols), dtype=np.int16)

    def _parse_raw_grd(self) -> None:
        """Parse .grd file for raw metadata dict."""
        content = self.grd_path.read_text()
        for line in content.splitlines():
            line = line.strip()
            if not line or line.startswith(("#", "//")):
                continue
            for sep in (" ", "=", ":"):
                if sep in line:
                    parts = line.split(sep, 1)
                    if len(parts) == 2:
                        key, val = parts[0].strip().upper(), parts[1].strip()
                        self._raw_metadata[key] = val
                    break

    @property
    def data(self) -> np.ndarray:
        """2D array of shape (rows, cols) with magnetic gradient values (int16).

        Void values (-32768) indicate missing/no-data cells.
        Use np.ma.masked_equal(mag.data, VOID_VALUE) for masked arrays.
        """
        return self._data

    @property
    def shape(self) -> tuple[int, int]:
        """Grid dimensions (rows, cols)."""
        return (self._survey.num_traces, self._survey.samples_per_trace)

    @property
    def rows(self) -> int:
        return self._survey.num_traces

    @property
    def cols(self) -> int:
        return self._survey.samples_per_trace

    @property
    def cell_size(self) -> float:
        """Grid cell size in meters."""
        if len(self._survey.trace_positions) >= 3:
            return self._survey.trace_positions[2]
        return 0.5

    @property
    def origin_easting(self) -> float:
        """Grid origin easting coordinate."""
        if len(self._survey.trace_elevations) >= 1:
            return self._survey.trace_elevations[0]
        return 0.0

    @property
    def origin_northing(self) -> float:
        """Grid origin northing coordinate."""
        if len(self._survey.trace_elevations) >= 2:
            return self._survey.trace_elevations[1]
        return 0.0

    @property
    def rotation_deg(self) -> float:
        """Grid rotation in degrees."""
        if len(self._survey.trace_positions) >= 4:
            return self._survey.trace_positions[3]
        return 0.0

    @property
    def grid_metadata(self) -> dict[str, str]:
        """Raw .grd key-value pairs."""
        return dict(self._raw_metadata)

    @property
    def void_mask(self) -> np.ndarray:
        """Boolean mask: True where data is void/missing."""
        return self._data == VOID_VALUE

    def __repr__(self) -> str:
        return f"MagnetometryFile(shape=({self.rows}, {self.cols}), cell_size={self.cell_size})"

cell_size property

Grid cell size in meters.

data property

2D array of shape (rows, cols) with magnetic gradient values (int16).

Void values (-32768) indicate missing/no-data cells. Use np.ma.masked_equal(mag.data, VOID_VALUE) for masked arrays.

grid_metadata property

Raw .grd key-value pairs.

origin_easting property

Grid origin easting coordinate.

origin_northing property

Grid origin northing coordinate.

rotation_deg property

Grid rotation in degrees.

shape property

Grid dimensions (rows, cols).

void_mask property

Boolean mask: True where data is void/missing.

SEGYFile

SEG-Y format handler.

SEG-Y is the Society of Exploration Geophysicists interchange format, widely used as a universal interoperability standard.

Source code in dig/parsers/segy.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SEGYFile:
    """SEG-Y format handler.

    SEG-Y is the Society of Exploration Geophysicists interchange
    format, widely used as a universal interoperability standard.
    """

    def __init__(self, path: str | Path):
        self.path = Path(path)
        if not self.path.exists():
            raise FileNotFoundError(f"SEG-Y file not found: {path}")

        self._text_header: str = ""
        self._binary_header: bytes = b""
        self._parse()

    def _parse(self) -> None:
        """Parse the 3200-byte EBCDIC text header and 400-byte binary header."""
        data = self.path.read_bytes()
        if len(data) < 3600:
            raise ValueError("File too small for SEG-Y header")

        # EBCDIC text header (bytes 0-3199)
        try:
            self._text_header = data[:3200].decode("cp037", errors="replace")
        except LookupError:
            self._text_header = data[:3200].decode("latin-1", errors="replace")

        self._binary_header = data[3200:3600]

    @property
    def text_header(self) -> str:
        return self._text_header

    def __repr__(self) -> str:
        return f"SEGYFile(path={self.path.name})"