Skip to content

Shared Upload Helpers

json_utils

JSON loading helpers shared across SHM modules.

Functions

load_json_data

load_json_data(path_to_data)

Load JSON data from disk using pathlib.

Parameters:

Name Type Description Default
path_to_data str | Path | None

Path to the JSON document, or None.

required

Returns:

Type Description
Any | None

Parsed JSON document, or None when no path is provided.

Examples:

>>> load_json_data(None) is None
True
Source code in src/owi/metadatabase/shm/json_utils.py
def load_json_data(path_to_data: str | Path | None) -> Any | None:
    """Load JSON data from disk using ``pathlib``.

    Parameters
    ----------
    path_to_data
        Path to the JSON document, or ``None``.

    Returns
    -------
    Any | None
        Parsed JSON document, or ``None`` when no path is provided.

    Examples
    --------
    >>> load_json_data(None) is None
    True
    """
    if path_to_data is None:
        return None

    path = Path(path_to_data)
    return json.loads(path.read_text(encoding="utf-8"))

signal_ids

Typed parsing for SHM signal identifiers.

Classes

LegacySignalIdentifier dataclass

LegacySignalIdentifier(
    raw,
    parts,
    subassembly,
    signal_type,
    lateral_position,
    angular_position,
    orientation,
)

Parsed representation of an SHM signal identifier.

Functions
to_legacy_dict
to_legacy_dict()

Return the historical dict shape used by archive payload code.

Source code in src/owi/metadatabase/shm/signal_ids.py
def to_legacy_dict(self) -> dict[str, str | int | None]:
    """Return the historical dict shape used by archive payload code."""
    data: dict[str, str | int | None] = {
        "sa": self.subassembly,
        "type": self.signal_type,
        "lat": self.lateral_position,
        "deg": self.angular_position,
    }
    if len(self.parts) > 4:
        data["orientation"] = self.orientation
    return data

Functions

parse_legacy_signal_id

parse_legacy_signal_id(signal_id)

Parse an SHM signal identifier into a typed model.

Source code in src/owi/metadatabase/shm/signal_ids.py
def parse_legacy_signal_id(signal_id: str) -> LegacySignalIdentifier | None:
    """Parse an SHM signal identifier into a typed model."""
    parts = tuple(signal_id.split("_"))
    if len(parts) < 4:
        return None

    return LegacySignalIdentifier(
        raw=signal_id,
        parts=parts,
        subassembly=parts[2],
        signal_type=parts[3],
        lateral_position=_parse_position(parts, 4, "LAT"),
        angular_position=_parse_position(parts, 5, "DEG"),
        orientation=_parse_orientation(parts),
    )

upload_context

Shared context models for SHM upload workflows.

Classes

SignalUploadContext dataclass

SignalUploadContext(
    site_id,
    asset_location_id,
    model_definition_id,
    permission_group_ids,
    subassembly_ids_by_type,
)

Resolved ids shared by signal upload payload builders.

Functions
subassembly_id_for
subassembly_id_for(subassembly_type)

Return the configured subassembly id for a subassembly token.

Source code in src/owi/metadatabase/shm/upload_context.py
def subassembly_id_for(self, subassembly_type: str) -> int | None:
    """Return the configured subassembly id for a subassembly token."""
    return self.subassembly_ids_by_type.get(subassembly_type)

payloads

Non-legacy payload helpers for SHM upload workflows.

Classes

SignalPayload dataclass

SignalPayload(
    site,
    model_definition,
    asset_location,
    signal_type,
    signal_id,
    visibility_groups,
    sub_assembly=None,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for signal records.

SignalHistoryPayload dataclass

SignalHistoryPayload(
    signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    sensor_serial_number=None,
    status_approval="yes",
    legacy_signal_id=None,
)

Payload model for signal history records.

LeadCorrectionPayload dataclass

LeadCorrectionPayload(t_ref, coef)

Nested payload model for signal lead correction data.

SignalCalibrationData dataclass

SignalCalibrationData(
    offset=None,
    cwl=None,
    coefficients=None,
    t_ref=None,
    gauge_correction=None,
    lead_correction=None,
)

Nested payload model for signal calibration data.

SignalCalibrationPayload dataclass

SignalCalibrationPayload(
    signal_id,
    calibration_date,
    data,
    tempcomp_signal_id=None,
    status_approval="yes",
)

Payload model for signal calibration records.

DerivedSignalPayload dataclass

DerivedSignalPayload(
    site,
    model_definition,
    asset_location,
    sub_assembly,
    signal_type,
    derived_signal_id,
    visibility_groups,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for derived signal records.

DerivedSignalHistoryPayload dataclass

DerivedSignalHistoryPayload(
    derived_signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    status_approval="yes",
)

Payload model for derived signal history records.

DerivedSignalHistoryParentSignalsPatch dataclass

DerivedSignalHistoryParentSignalsPatch(parent_signals)

Patch payload for linking parent signals to a derived signal history.

DerivedSignalCalibrationData dataclass

DerivedSignalCalibrationData(
    yaw_parameter, yaw_offset, measurement_location=None
)

Nested payload model for derived signal calibration data.

DerivedSignalCalibrationPayload dataclass

DerivedSignalCalibrationPayload(
    derived_signal_id,
    calibration_date,
    data,
    status_approval="yes",
)

Payload model for derived signal calibration records.

SensorTypePayload dataclass

SensorTypePayload(
    name,
    type,
    type_extended,
    hardware_supplier,
    file=None,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor type records.

SensorPayload dataclass

SensorPayload(
    sensor_type_id,
    serial_number,
    cabinet,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor records.

SensorCalibrationPayload dataclass

SensorCalibrationPayload(
    sensor_serial_number, calibration_date, file
)

Payload model for sensor calibration records.

Functions

build_sensor_payloads

build_sensor_payloads(
    sensor_type_id,
    serial_numbers,
    cabinets,
    visibility_groups,
    visibility="usergroup",
)

Build sensor payload models from parallel columns.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_payloads(
    sensor_type_id: int,
    serial_numbers: Sequence[str | None],
    cabinets: Sequence[str | int | None],
    visibility_groups: Sequence[int] | None,
    visibility: str = "usergroup",
) -> list[SensorPayload]:
    """Build sensor payload models from parallel columns."""
    rows = _expand_columns({"serial_number": serial_numbers, "cabinet": cabinets})
    return [
        SensorPayload(
            sensor_type_id=sensor_type_id,
            serial_number=row["serial_number"],
            cabinet=row["cabinet"],
            visibility=visibility,
            visibility_groups=visibility_groups,
        )
        for row in rows
    ]

build_sensor_type_payloads

build_sensor_type_payloads(
    sensor_types_data,
    visibility_groups,
    path_to_images=None,
    visibility="usergroup",
)

Build sensor type payload models from raw records.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_type_payloads(
    sensor_types_data: Sequence[Mapping[str, Any]],
    visibility_groups: Sequence[int] | None,
    path_to_images: str | Path | None = None,
    visibility: str = "usergroup",
) -> list[SensorTypePayload]:
    """Build sensor type payload models from raw records."""
    payloads: list[SensorTypePayload] = []
    for entry in sensor_types_data:
        file_path: Path | None = None
        filename = entry.get("file")
        if filename is not None and path_to_images is not None:
            file_path = Path(path_to_images) / str(filename)
        payloads.append(
            SensorTypePayload(
                name=str(entry["name"]),
                type=str(entry["type"]),
                type_extended=str(entry["type_extended"]),
                hardware_supplier=str(entry["hardware_supplier"]),
                file=file_path,
                visibility=visibility,
                visibility_groups=visibility_groups,
            )
        )
    return payloads

build_sensor_calibration_payloads

build_sensor_calibration_payloads(
    signal_sensor_map,
    signal_calibration_map,
    path_to_datasheets,
)

Build sensor calibration payload models for one turbine.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_calibration_payloads(
    signal_sensor_map: Mapping[str, int],
    signal_calibration_map: Mapping[str, Mapping[str, str]],
    path_to_datasheets: str | Path,
) -> list[SensorCalibrationPayload]:
    """Build sensor calibration payload models for one turbine."""
    payloads: list[SensorCalibrationPayload] = []
    for signal_name, calibration in signal_calibration_map.items():
        sensor_id = signal_sensor_map.get(signal_name)
        if sensor_id is None:
            continue
        payloads.append(
            SensorCalibrationPayload(
                sensor_serial_number=sensor_id,
                calibration_date=calibration["date"],
                file=Path(path_to_datasheets) / calibration["filename"],
            )
        )
    return payloads

build_signal_main_payload

build_signal_main_payload(signal, signal_data, context)

Build the main signal payload from archive-style signal data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main signal payload from archive-style signal data."""
    if len(signal_data) <= 1:
        return None

    payload = SignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        signal_type=signal.signal_type,
        signal_id=signal.raw,
        sub_assembly=(
            context.subassembly_id_for(signal.subassembly) if signal.subassembly in {"TP", "TW", "MP"} else None
        ),
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=_legacy_signal_misc_data(signal_data),
        visibility_groups=context.permission_group_ids,
    )
    return payload.to_payload()

build_signal_status_payloads

build_signal_status_payloads(
    signal_id, signal_data, sensor_serial_number=None
)

Build signal status payloads from archive-style status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_status_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    sensor_serial_number: int | None = None,
) -> list[dict[str, Any]]:
    """Build signal status payloads from archive-style status rows."""
    statuses = signal_data.get("status")
    if not isinstance(statuses, Sequence) or isinstance(statuses, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for index, status in enumerate(statuses):
        if not isinstance(status, Mapping):
            continue
        status_row = cast(Mapping[str, Any], status)
        payloads.append(
            SignalHistoryPayload(
                signal_id=signal_id,
                activity_start_timestamp=cast(TimestampValue, status_row["time"]),
                is_latest_status=index == len(statuses) - 1,
                status=cast(str, status_row["status"]),
                sensor_serial_number=sensor_serial_number,
                legacy_signal_id=cast(Optional[str], status_row.get("name")),
            ).to_payload()
        )
    return payloads

build_signal_calibration_payloads

build_signal_calibration_payloads(
    signal_id, signal_data, tempcomp_signal_ids=None
)

Build signal calibration payloads from archive-style offset and CWL data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_calibration_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    tempcomp_signal_ids: Mapping[str, int] | None = None,
) -> list[dict[str, Any]]:
    """Build signal calibration payloads from archive-style offset and CWL data."""
    payloads: list[dict[str, Any]] = []

    offsets = signal_data.get("offset")
    if isinstance(offsets, Sequence) and not isinstance(offsets, (str, bytes)):
        for offset in offsets:
            if not isinstance(offset, Mapping):
                continue
            lead_correction = offset.get("lead_correction")
            tc_sensor = offset.get("TCSensor")
            payloads.append(
                SignalCalibrationPayload.from_offset(
                    signal_id=signal_id,
                    calibration_date=offset["time"],
                    offset=offset["offset"],
                    tempcomp_signal_id=(
                        tempcomp_signal_ids.get(tc_sensor)
                        if tempcomp_signal_ids is not None and isinstance(tc_sensor, str)
                        else None
                    ),
                    coefficients=offset.get("Coefficients"),
                    t_ref=offset.get("t_ref"),
                    gauge_correction=offset.get("gauge_correction"),
                    lead_correction=(
                        LeadCorrectionPayload(
                            t_ref=lead_correction["t_ref"],
                            coef=lead_correction["coef"],
                        )
                        if isinstance(lead_correction, Mapping)
                        else None
                    ),
                ).to_payload()
            )

    cwl_rows = signal_data.get("cwl")
    if isinstance(cwl_rows, Sequence) and not isinstance(cwl_rows, (str, bytes)):
        for cwl in cwl_rows:
            if not isinstance(cwl, Mapping):
                continue
            payloads.append(
                SignalCalibrationPayload.from_cwl(
                    signal_id=signal_id,
                    calibration_date=cwl["time"],
                    cwl=cwl["cwl"],
                ).to_payload()
            )

    return payloads

build_derived_signal_main_payload

build_derived_signal_main_payload(
    signal, signal_data, context
)

Build the main derived-signal payload from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main derived-signal payload from archive-style data."""
    if len(signal_data) <= 1:
        return None

    sub_assembly = context.subassembly_id_for(signal.subassembly)
    if sub_assembly is None:
        raise KeyError(f"Missing sub-assembly id for {signal.subassembly!r}")

    return DerivedSignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        sub_assembly=sub_assembly,
        signal_type=signal.signal_type,
        derived_signal_id=signal.raw,
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=signal_data.get("data"),
        visibility_groups=context.permission_group_ids,
    ).to_payload()

build_derived_signal_status_payload

build_derived_signal_status_payload(
    derived_signal_id, signal_data
)

Build the derived-signal status payload used before parent patching.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_status_payload(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> dict[str, Any]:
    """Build the derived-signal status payload used before parent patching."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)) or not calibrations:
        raise ValueError("Derived signal calibration rows are required to build a status payload.")

    first = calibrations[0]
    if not isinstance(first, Mapping):
        raise ValueError("Derived signal calibration rows must be mappings.")

    return DerivedSignalHistoryPayload(
        derived_signal_id=derived_signal_id,
        activity_start_timestamp=first["time"],
        is_latest_status=True,
        status="ok",
    ).to_payload()

build_derived_signal_parent_patch

build_derived_signal_parent_patch(parent_signal_ids)

Build the parent-signals patch payload for derived signal status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_parent_patch(parent_signal_ids: Sequence[int]) -> dict[str, list[int]]:
    """Build the parent-signals patch payload for derived signal status rows."""
    return DerivedSignalHistoryParentSignalsPatch(parent_signal_ids).to_payload()

build_derived_signal_calibration_payloads

build_derived_signal_calibration_payloads(
    derived_signal_id, signal_data
)

Build derived-signal calibration payloads from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_calibration_payloads(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> list[dict[str, Any]]:
    """Build derived-signal calibration payloads from archive-style data."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for calibration in calibrations:
        if not isinstance(calibration, Mapping):
            continue
        payloads.append(
            DerivedSignalCalibrationPayload.from_yaw_offset(
                derived_signal_id=derived_signal_id,
                calibration_date=calibration["time"],
                yaw_parameter=calibration["yaw_parameter"],
                yaw_offset=calibration["yaw_offset"],
                measurement_location=calibration.get("measurement_location"),
            ).to_payload()
        )
    return payloads