Skip to content

Upload Orchestration

upload

Upload subpackage for SHM signal and sensor upload orchestration.

Re-exports all public symbols so from owi.metadatabase.shm.upload import X works unchanged.

Classes

ParentSignalLookupError

ParentSignalLookupError(message)

Bases: ShmUploadError

Raised when a derived signal refers to unresolved parent signals.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

ShmUploadError

ShmUploadError(message)

Bases: APIException

Base exception for SHM upload orchestration failures.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

UploadResultError

UploadResultError(message)

Bases: ShmUploadError

Raised when a backend mutation result does not include the expected id.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

AssetSignalUploadRequest dataclass

AssetSignalUploadRequest(
    projectsite,
    assetlocation,
    signals,
    derived_signals=None,
    permission_group_ids=None,
    sensor_serial_numbers_by_signal=None,
    temperature_compensation_signal_ids=None,
)

Input data for uploading one asset's SHM signals.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title.

required
assetlocation str

Parent SDK asset location title.

required
signals SignalConfigMap

Archive-compatible main signal data keyed by signal identifier.

required
derived_signals SignalConfigMap | None

Archive-compatible derived signal data keyed by derived signal identifier.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_signal Mapping[str, int] | None

Optional map from signal identifier to the backend SHM sensor identifier stored on signal history rows.

None
temperature_compensation_signal_ids Mapping[str, int] | None

Optional map from legacy temperature-compensation sensor token to backend SHM signal id.

None

Examples:

>>> request = AssetSignalUploadRequest(
...     projectsite="Project A",
...     assetlocation="Asset-01",
...     signals={},
... )
>>> request.result_key
'Project A/Asset-01'
Attributes
result_key property
result_key

Return a stable asset-scoped result key.

Functions
from_processing_result classmethod
from_processing_result(
    *,
    projectsite,
    assetlocation,
    processing_result,
    permission_group_ids=None,
    sensor_serial_numbers_by_signal=None,
    temperature_compensation_signal_ids=None,
)

Build an upload request from a processed signal-config result.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title.

required
assetlocation str

Parent SDK asset location title.

required
processing_result SignalProcessingResult

Processed signal and derived-signal records emitted by a processor.

required
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_signal Mapping[str, int] | None

Optional map from signal identifier to backend sensor serial number used for signal history rows.

None
temperature_compensation_signal_ids Mapping[str, int] | None

Optional map from legacy temperature-compensation sensor token to backend SHM signal id.

None

Returns:

Type Description
AssetSignalUploadRequest

Asset-scoped upload request that preserves the archive-compatible payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import ProcessedSignalRecord, SignalProcessingResult
>>> signal = ProcessedSignalRecord()
>>> signal.add_status("01/01/1972 00:00", "ok")
>>> request = AssetSignalUploadRequest.from_processing_result(
...     projectsite="Project A",
...     assetlocation="Asset-01",
...     processing_result=SignalProcessingResult(signals={"SIG": signal}, derived_signals={}),
... )
>>> request.signals["SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/upload/models.py
@classmethod
def from_processing_result(
    cls,
    *,
    projectsite: str,
    assetlocation: str,
    processing_result: SignalProcessingResult,
    permission_group_ids: Sequence[int] | None = None,
    sensor_serial_numbers_by_signal: Mapping[str, int] | None = None,
    temperature_compensation_signal_ids: Mapping[str, int] | None = None,
) -> AssetSignalUploadRequest:
    """Build an upload request from a processed signal-config result.

    Parameters
    ----------
    projectsite
        Parent SDK project site title.
    assetlocation
        Parent SDK asset location title.
    processing_result
        Processed signal and derived-signal records emitted by a processor.
    permission_group_ids
        Visibility groups applied to created SHM objects.
    sensor_serial_numbers_by_signal
        Optional map from signal identifier to backend sensor serial
        number used for signal history rows.
    temperature_compensation_signal_ids
        Optional map from legacy temperature-compensation sensor token to
        backend SHM signal id.

    Returns
    -------
    AssetSignalUploadRequest
        Asset-scoped upload request that preserves the archive-compatible
        payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import ProcessedSignalRecord, SignalProcessingResult
    >>> signal = ProcessedSignalRecord()
    >>> signal.add_status("01/01/1972 00:00", "ok")
    >>> request = AssetSignalUploadRequest.from_processing_result(
    ...     projectsite="Project A",
    ...     assetlocation="Asset-01",
    ...     processing_result=SignalProcessingResult(signals={"SIG": signal}, derived_signals={}),
    ... )
    >>> request.signals["SIG"]["status"][0]["status"]
    'ok'
    """
    signals, derived_signals = processing_result.to_legacy_data()
    return cls(
        projectsite=projectsite,
        assetlocation=assetlocation,
        signals=signals,
        derived_signals=derived_signals or None,
        permission_group_ids=permission_group_ids,
        sensor_serial_numbers_by_signal=sensor_serial_numbers_by_signal,
        temperature_compensation_signal_ids=temperature_compensation_signal_ids,
    )

AssetSignalUploadResult dataclass

AssetSignalUploadResult(
    asset_key,
    signal_ids_by_name,
    derived_signal_ids_by_name,
    results_main,
    results_secondary,
    results_derived_main,
    results_derived_secondary,
)

Upload result for one asset.

Parameters:

Name Type Description Default
asset_key str

Stable asset-scoped result key in projectsite/assetlocation form.

required
signal_ids_by_name Mapping[str, int]

Backend ids for created main signals keyed by signal identifier.

required
derived_signal_ids_by_name Mapping[str, int]

Backend ids for created derived signals keyed by signal identifier.

required
results_main Sequence[dict[str, Any]]

Raw backend responses for main signal creation calls.

required
results_secondary Sequence[dict[str, Any]]

Raw backend responses for signal history and calibration calls.

required
results_derived_main Sequence[dict[str, Any]]

Raw backend responses for derived signal creation calls.

required
results_derived_secondary Sequence[dict[str, Any]]

Raw backend responses for derived history, parent patch, and calibration calls.

required
This
required
migrate
required

DerivedSignalCalibrationPayload dataclass

DerivedSignalCalibrationPayload(
    derived_signal_id,
    calibration_date,
    data,
    status_approval="yes",
)

Payload model for derived signal calibration records.

DerivedSignalHistoryPayload dataclass

DerivedSignalHistoryPayload(
    derived_signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    status_approval="yes",
)

Payload model for derived signal history records.

DerivedSignalPayload dataclass

DerivedSignalPayload(
    site,
    model_definition,
    asset_location,
    sub_assembly,
    signal_type,
    derived_signal_id,
    visibility_groups,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for derived signal records.

SensorCalibrationPayload dataclass

SensorCalibrationPayload(
    sensor_serial_number, calibration_date, file
)

Payload model for sensor calibration records.

SensorPayload dataclass

SensorPayload(
    sensor_type_id,
    serial_number,
    cabinet,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor records.

SensorTypePayload dataclass

SensorTypePayload(
    name,
    type,
    type_extended,
    hardware_supplier,
    file=None,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor type records.

SignalCalibrationPayload dataclass

SignalCalibrationPayload(
    signal_id,
    calibration_date,
    data,
    tempcomp_signal_id=None,
    status_approval="yes",
)

Payload model for signal calibration records.

SignalHistoryPayload dataclass

SignalHistoryPayload(
    signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    sensor_serial_number=None,
    status_approval="yes",
    legacy_signal_id=None,
)

Payload model for signal history records.

SignalPayload dataclass

SignalPayload(
    site,
    model_definition,
    asset_location,
    signal_type,
    signal_id,
    visibility_groups,
    sub_assembly=None,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for signal records.

ShmSignalUploadClient

Bases: Protocol

Protocol describing the SHM transport methods used by the uploader.

Functions
get_sensor_type
get_sensor_type(**kwargs)

Resolve one SHM sensor type record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def get_sensor_type(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor type record."""
    ...
get_sensor
get_sensor(**kwargs)

Resolve one SHM sensor record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def get_sensor(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor record."""
    ...
create_signal
create_signal(payload)

Create a signal record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_signal(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal record."""
    ...
get_signal
get_signal(signal_id, **kwargs)

Resolve a signal record by backend identifier.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def get_signal(self, signal_id: str, **kwargs: Any) -> dict[str, Any]:
    """Resolve a signal record by backend identifier."""
    ...
create_signal_history
create_signal_history(payload)

Create a signal history record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_signal_history(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal history record."""
    ...
create_signal_calibration
create_signal_calibration(payload)

Create a signal calibration record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_signal_calibration(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal calibration record."""
    ...
create_derived_signal
create_derived_signal(payload)

Create a derived signal record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_derived_signal(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal record."""
    ...
create_derived_signal_history
create_derived_signal_history(payload)

Create a derived signal history record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_derived_signal_history(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal history record."""
    ...
patch_derived_signal_history
patch_derived_signal_history(history_id, payload)

Patch a derived signal history record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def patch_derived_signal_history(
    self,
    history_id: int,
    payload: Mapping[str, Any],
) -> dict[str, Any]:
    """Patch a derived signal history record."""
    ...
create_derived_signal_calibration
create_derived_signal_calibration(payload)

Create a derived signal calibration record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_derived_signal_calibration(
    self,
    payload: Mapping[str, Any],
) -> dict[str, Any]:
    """Create a derived signal calibration record."""
    ...

SignalConfigUploadSource

Bases: Protocol

Protocol for processors that feed turbine-scoped upload data.

Functions
signals_process_data
signals_process_data()

Populate turbine-scoped signal dictionaries.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def signals_process_data(self) -> None:
    """Populate turbine-scoped signal dictionaries."""
    ...

ShmSensorUploadClient

Bases: Protocol

Protocol describing the SHM transport methods used by the sensor uploader.

Functions
get_sensor_type
get_sensor_type(**kwargs)

Resolve one SHM sensor type record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def get_sensor_type(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor type record."""
    ...
get_sensor
get_sensor(**kwargs)

Resolve one SHM sensor record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def get_sensor(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor record."""
    ...
create_sensor_type
create_sensor_type(payload, files=None)

Create a sensor type record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def create_sensor_type(self, payload: Mapping[str, Any], files: Mapping[str, Any] | None = None) -> dict[str, Any]:
    """Create a sensor type record."""
    ...
create_sensor
create_sensor(payload)

Create a sensor record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def create_sensor(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a sensor record."""
    ...
create_sensor_calibration
create_sensor_calibration(payload, files=None)

Create a sensor calibration record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def create_sensor_calibration(
    self, payload: Mapping[str, Any], files: Mapping[str, Any] | None = None
) -> dict[str, Any]:
    """Create a sensor calibration record."""
    ...

ShmSensorUploader

ShmSensorUploader(shm_api)

Upload sensor types, sensors, and sensor calibrations for SHM assets.

Parameters:

Name Type Description Default
shm_api ShmSensorUploadClient

SHM transport client that satisfies :class:ShmSensorUploadClient.

required
Source code in src/owi/metadatabase/shm/upload/sensors.py
def __init__(self, shm_api: ShmSensorUploadClient) -> None:
    self.shm_api = shm_api
Functions
upload_sensor_types
upload_sensor_types(
    sensor_types_data,
    permission_group_ids,
    path_to_images=None,
)

Upload sensor type records, optionally with image attachments.

Parameters:

Name Type Description Default
sensor_types_data Sequence[Mapping[str, Any]]

List of sensor type records (e.g. loaded from sensor_types.json).

required
permission_group_ids Sequence[int] | None

Permission groups applied to every sensor type.

required
path_to_images str | Path | None

Optional directory containing sensor type image files.

None

Returns:

Type Description
list[dict[str, Any]]

Raw backend responses for each created sensor type.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def upload_sensor_types(
    self,
    sensor_types_data: Sequence[Mapping[str, Any]],
    permission_group_ids: Sequence[int] | None,
    path_to_images: str | Path | None = None,
) -> list[dict[str, Any]]:
    """Upload sensor type records, optionally with image attachments.

    Parameters
    ----------
    sensor_types_data
        List of sensor type records (e.g. loaded from ``sensor_types.json``).
    permission_group_ids
        Permission groups applied to every sensor type.
    path_to_images
        Optional directory containing sensor type image files.

    Returns
    -------
    list[dict[str, Any]]
        Raw backend responses for each created sensor type.
    """
    payloads = build_sensor_type_payloads(
        sensor_types_data,
        visibility_groups=permission_group_ids,
        path_to_images=path_to_images,
    )
    return [self._upload_sensor_type(payload) for payload in payloads]
upload_sensors
upload_sensors(
    sensor_type_name,
    sensor_type_params,
    sensors_data,
    permission_group_ids,
    turbines=None,
)

Upload sensor records for a single sensor category across turbines.

Parameters:

Name Type Description Default
sensor_type_name str

Key identifying the sensor category within each turbine's data (e.g. "accelerometers").

required
sensor_type_params Mapping[str, str]

Query parameters used to resolve the backend sensor type id (e.g. {"name": "393B04"}).

required
sensors_data SensorsDataByTurbine

Per-turbine sensor data keyed by turbine identifier. Each turbine has categories mapping to {"serial_numbers": [...], "cabinets": [...]}.

required
permission_group_ids Sequence[int] | None

Permission groups applied to every sensor.

required
turbines Sequence[str] | None

Optional filter to upload only specific turbines. When None, all turbines in sensors_data are processed.

None

Returns:

Type Description
list[dict[str, Any]]

Raw backend responses for each created sensor.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def upload_sensors(
    self,
    sensor_type_name: str,
    sensor_type_params: Mapping[str, str],
    sensors_data: SensorsDataByTurbine,
    permission_group_ids: Sequence[int] | None,
    turbines: Sequence[str] | None = None,
) -> list[dict[str, Any]]:
    """Upload sensor records for a single sensor category across turbines.

    Parameters
    ----------
    sensor_type_name
        Key identifying the sensor category within each turbine's data
        (e.g. ``"accelerometers"``).
    sensor_type_params
        Query parameters used to resolve the backend sensor type id
        (e.g. ``{"name": "393B04"}``).
    sensors_data
        Per-turbine sensor data keyed by turbine identifier. Each turbine
        has categories mapping to ``{"serial_numbers": [...], "cabinets": [...]}``.
    permission_group_ids
        Permission groups applied to every sensor.
    turbines
        Optional filter to upload only specific turbines. When *None*,
        all turbines in ``sensors_data`` are processed.

    Returns
    -------
    list[dict[str, Any]]
        Raw backend responses for each created sensor.
    """
    sensor_type_result = self.shm_api.get_sensor_type(**dict(sensor_type_params))
    sensor_type_id = self._require_existing_result_id(
        sensor_type_result,
        label=f"sensor type '{sensor_type_name}'",
    )

    serial_numbers: list[str | None] = []
    cabinets: list[str | int | None] = []
    turbine_keys = turbines if turbines is not None else list(sensors_data.keys())

    for turbine in turbine_keys:
        data_turbine = sensors_data.get(turbine)
        self._collect_sensor_columns(
            data_turbine,
            sensor_type_name,
            serial_numbers,
            cabinets,
            turbine,
        )

    if not serial_numbers and not cabinets:
        return []

    payloads = build_sensor_payloads(
        sensor_type_id=sensor_type_id,
        serial_numbers=serial_numbers,
        cabinets=cabinets,
        visibility_groups=permission_group_ids,
    )
    return [self.shm_api.create_sensor(p.to_payload()) for p in payloads]
upload_sensor_calibrations
upload_sensor_calibrations(
    signal_sensor_map_data,
    signal_calibration_map_data,
    path_to_datasheets,
    turbines=None,
)

Upload sensor calibration records with optional PDF attachments.

Parameters:

Name Type Description Default
signal_sensor_map_data Mapping[str, Mapping[str, Mapping[str, Any]]]

Per-turbine signal-to-sensor mapping (keyed by turbine, then signal name, with sensor lookup params including sensor_type_id).

required
signal_calibration_map_data Mapping[str, Mapping[str, Mapping[str, str]]]

Per-turbine calibration data (keyed by turbine, then signal name, with date and filename fields).

required
path_to_datasheets str | Path

Directory containing calibration PDF files.

required
turbines Sequence[str] | None

Optional turbine filter. When None, all turbines are processed.

None

Returns:

Type Description
list[dict[str, Any]]

Raw backend responses for each created calibration.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def upload_sensor_calibrations(
    self,
    signal_sensor_map_data: Mapping[str, Mapping[str, Mapping[str, Any]]],
    signal_calibration_map_data: Mapping[str, Mapping[str, Mapping[str, str]]],
    path_to_datasheets: str | Path,
    turbines: Sequence[str] | None = None,
) -> list[dict[str, Any]]:
    """Upload sensor calibration records with optional PDF attachments.

    Parameters
    ----------
    signal_sensor_map_data
        Per-turbine signal-to-sensor mapping (keyed by turbine, then signal
        name, with sensor lookup params including ``sensor_type_id``).
    signal_calibration_map_data
        Per-turbine calibration data (keyed by turbine, then signal name,
        with ``date`` and ``filename`` fields).
    path_to_datasheets
        Directory containing calibration PDF files.
    turbines
        Optional turbine filter. When *None*, all turbines are processed.

    Returns
    -------
    list[dict[str, Any]]
        Raw backend responses for each created calibration.
    """
    results: list[dict[str, Any]] = []
    turbine_keys = turbines if turbines is not None else list(signal_sensor_map_data.keys())

    for turbine in turbine_keys:
        turbine_ss_map = signal_sensor_map_data.get(turbine)
        turbine_sc_map = signal_calibration_map_data.get(turbine)
        if turbine_ss_map is None:
            continue

        resolved_sensor_ids = self._resolve_sensor_ids_for_turbine(turbine_ss_map, turbine)

        if turbine_sc_map is not None:
            payloads = build_sensor_calibration_payloads(
                signal_sensor_map=resolved_sensor_ids,
                signal_calibration_map=turbine_sc_map,
                path_to_datasheets=path_to_datasheets,
            )
            for payload in payloads:
                results.append(self._upload_sensor_calibration(payload))

    return results

ShmSignalUploader

ShmSignalUploader(shm_api, lookup_service)

Upload archive-compatible SHM signal data for arbitrary wind-farm assets.

Parameters:

Name Type Description Default
shm_api ShmSignalUploadClient

SHM transport client, typically :class:owi.metadatabase.shm.ShmAPI.

required
lookup_service ParentSDKLookupService

Parent SDK lookup service used to resolve site, asset, and subassembly ids.

required
Source code in src/owi/metadatabase/shm/upload/signals.py
def __init__(
    self,
    shm_api: ShmSignalUploadClient,
    lookup_service: ParentSDKLookupService,
) -> None:
    self.shm_api = shm_api
    self.lookup_service = lookup_service
Functions
from_clients classmethod
from_clients(shm_api, locations_client, geometry_client)

Construct the uploader from SHM and parent SDK clients.

Parameters:

Name Type Description Default
shm_api ShmSignalUploadClient

SHM transport client used for backend mutations.

required
locations_client ParentLocationsLookupClient

Parent SDK client that resolves project and asset locations.

required
geometry_client ParentGeometryLookupClient

Parent SDK client that resolves subassemblies and model definitions.

required

Returns:

Type Description
ShmSignalUploader

Uploader wired to the canonical SHM lookup service.

Source code in src/owi/metadatabase/shm/upload/signals.py
@classmethod
def from_clients(
    cls,
    shm_api: ShmSignalUploadClient,
    locations_client: ParentLocationsLookupClient,
    geometry_client: ParentGeometryLookupClient,
) -> ShmSignalUploader:
    """Construct the uploader from SHM and parent SDK clients.

    Parameters
    ----------
    shm_api
        SHM transport client used for backend mutations.
    locations_client
        Parent SDK client that resolves project and asset locations.
    geometry_client
        Parent SDK client that resolves subassemblies and model
        definitions.

    Returns
    -------
    ShmSignalUploader
        Uploader wired to the canonical SHM lookup service.
    """
    return cls(
        shm_api=shm_api,
        lookup_service=ParentSDKLookupService(
            locations_client=locations_client,
            geometry_client=geometry_client,
        ),
    )
upload_asset
upload_asset(request)

Upload main and secondary SHM records for one asset.

Parameters:

Name Type Description Default
request AssetSignalUploadRequest

Asset-scoped upload request containing the archive-compatible main and derived signal mappings.

required

Returns:

Type Description
AssetSignalUploadResult

Created backend ids plus raw backend responses grouped by upload phase.

Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_asset(self, request: AssetSignalUploadRequest) -> AssetSignalUploadResult:
    """Upload main and secondary SHM records for one asset.

    Parameters
    ----------
    request
        Asset-scoped upload request containing the archive-compatible main
        and derived signal mappings.

    Returns
    -------
    AssetSignalUploadResult
        Created backend ids plus raw backend responses grouped by upload
        phase.
    """
    upload_context = self.lookup_service.get_signal_upload_context(
        projectsite=request.projectsite,
        assetlocation=request.assetlocation,
        permission_group_ids=request.permission_group_ids,
    )
    signal_ids_by_name, results_main = self._upload_main_signals(
        request.signals,
        upload_context,
    )
    results_secondary = self._upload_signal_secondary_data(
        request.signals,
        signal_ids_by_name=signal_ids_by_name,
        sensor_serial_numbers_by_signal=request.sensor_serial_numbers_by_signal,
        temperature_compensation_signal_ids=request.temperature_compensation_signal_ids,
    )

    derived_signal_ids_by_name: dict[str, int] = {}
    results_derived_main: list[dict[str, Any]] = []
    results_derived_secondary: list[dict[str, Any]] = []
    if request.derived_signals:
        derived_signal_ids_by_name, results_derived_main = self._upload_main_derived_signals(
            request.derived_signals,
            upload_context,
        )
        results_derived_secondary = self._upload_derived_signal_secondary_data(
            request.derived_signals,
            signal_ids_by_name=signal_ids_by_name,
            derived_signal_ids_by_name=derived_signal_ids_by_name,
        )

    return AssetSignalUploadResult(
        asset_key=request.result_key,
        signal_ids_by_name=signal_ids_by_name,
        derived_signal_ids_by_name=derived_signal_ids_by_name,
        results_main=results_main,
        results_secondary=results_secondary,
        results_derived_main=results_derived_main,
        results_derived_secondary=results_derived_secondary,
    )
upload_assets
upload_assets(requests)

Upload SHM signal data for multiple assets.

Parameters:

Name Type Description Default
requests Sequence[AssetSignalUploadRequest]

Asset-scoped upload requests to execute in order.

required

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by each request's stable result key.

Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_assets(
    self,
    requests: Sequence[AssetSignalUploadRequest],
) -> dict[str, AssetSignalUploadResult]:
    """Upload SHM signal data for multiple assets.

    Parameters
    ----------
    requests
        Asset-scoped upload requests to execute in order.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by each request's stable result key.
    """
    return {request.result_key: self.upload_asset(request) for request in requests}
upload_turbines
upload_turbines(
    *,
    projectsite,
    signals_by_turbine,
    derived_signals_by_turbine=None,
    assetlocations_by_turbine=None,
    permission_group_ids=None,
    sensor_serial_numbers_by_turbine=None,
    temperature_compensation_signal_ids_by_turbine=None,
)

Upload SHM signal data for multiple turbine-scoped config bundles.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title shared by the turbine batch.

required
signals_by_turbine SignalConfigMapByTurbine

Main signal mappings keyed by turbine identifier.

required
derived_signals_by_turbine SignalConfigMapByTurbine | None

Optional derived signal mappings keyed by turbine identifier.

None
assetlocations_by_turbine Mapping[str, str] | None

Optional turbine-to-asset-location override mapping.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of signal identifiers to sensor serial numbers used for signal history rows.

None
temperature_compensation_signal_ids_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of temperature-compensation tokens to backend SHM signal ids.

None

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by turbine identifier.

This keeps the response keyed by turbine while parent lookups use the
corresponding asset-location title.
Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_turbines(
    self,
    *,
    projectsite: str,
    signals_by_turbine: SignalConfigMapByTurbine,
    derived_signals_by_turbine: SignalConfigMapByTurbine | None = None,
    assetlocations_by_turbine: Mapping[str, str] | None = None,
    permission_group_ids: Sequence[int] | None = None,
    sensor_serial_numbers_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
    temperature_compensation_signal_ids_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
) -> dict[str, AssetSignalUploadResult]:
    """Upload SHM signal data for multiple turbine-scoped config bundles.

    Parameters
    ----------
    projectsite
        Parent SDK project site title shared by the turbine batch.
    signals_by_turbine
        Main signal mappings keyed by turbine identifier.
    derived_signals_by_turbine
        Optional derived signal mappings keyed by turbine identifier.
    assetlocations_by_turbine
        Optional turbine-to-asset-location override mapping.
    permission_group_ids
        Visibility groups applied to created SHM objects.
    sensor_serial_numbers_by_turbine
        Optional per-turbine mapping of signal identifiers to sensor serial
        numbers used for signal history rows.
    temperature_compensation_signal_ids_by_turbine
        Optional per-turbine mapping of temperature-compensation tokens to
        backend SHM signal ids.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by turbine identifier.

    This keeps the response keyed by turbine while parent lookups use the
    corresponding asset-location title.
    """
    results: dict[str, AssetSignalUploadResult] = {}
    for turbine, signals in signals_by_turbine.items():
        assetlocation = turbine
        if assetlocations_by_turbine is not None:
            assetlocation = assetlocations_by_turbine.get(turbine, turbine)

        derived_signals = None
        if derived_signals_by_turbine is not None:
            derived_signals = derived_signals_by_turbine.get(turbine)

        sensor_serial_numbers_by_signal = None
        if sensor_serial_numbers_by_turbine is not None:
            sensor_serial_numbers_by_signal = sensor_serial_numbers_by_turbine.get(turbine)

        temperature_compensation_signal_ids = None
        if temperature_compensation_signal_ids_by_turbine is not None:
            temperature_compensation_signal_ids = temperature_compensation_signal_ids_by_turbine.get(turbine)

        results[turbine] = self.upload_asset(
            AssetSignalUploadRequest(
                projectsite=projectsite,
                assetlocation=assetlocation,
                signals=signals,
                derived_signals=derived_signals,
                permission_group_ids=permission_group_ids,
                sensor_serial_numbers_by_signal=sensor_serial_numbers_by_signal,
                temperature_compensation_signal_ids=temperature_compensation_signal_ids,
            )
        )
    return results
upload_from_processor
upload_from_processor(
    *,
    projectsite,
    processor,
    assetlocations_by_turbine=None,
    permission_group_ids=None,
    sensor_serial_numbers_by_turbine=None,
    temperature_compensation_signal_ids_by_turbine=None,
)

Process turbine configs and upload them through the generic SHM seam.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title shared by the processor output.

required
processor SignalConfigUploadSource

Processor instance that populates signals_data and signals_derived_data.

required
assetlocations_by_turbine Mapping[str, str] | None

Optional turbine-to-asset-location override mapping.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of signal identifiers to sensor serial numbers used for signal history rows.

None
temperature_compensation_signal_ids_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of temperature-compensation tokens to backend SHM signal ids.

None

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by turbine identifier.

Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_from_processor(
    self,
    *,
    projectsite: str,
    processor: SignalConfigUploadSource,
    assetlocations_by_turbine: Mapping[str, str] | None = None,
    permission_group_ids: Sequence[int] | None = None,
    sensor_serial_numbers_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
    temperature_compensation_signal_ids_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
) -> dict[str, AssetSignalUploadResult]:
    """Process turbine configs and upload them through the generic SHM seam.

    Parameters
    ----------
    projectsite
        Parent SDK project site title shared by the processor output.
    processor
        Processor instance that populates ``signals_data`` and
        ``signals_derived_data``.
    assetlocations_by_turbine
        Optional turbine-to-asset-location override mapping.
    permission_group_ids
        Visibility groups applied to created SHM objects.
    sensor_serial_numbers_by_turbine
        Optional per-turbine mapping of signal identifiers to sensor serial
        numbers used for signal history rows.
    temperature_compensation_signal_ids_by_turbine
        Optional per-turbine mapping of temperature-compensation tokens to
        backend SHM signal ids.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by turbine identifier.
    """
    processor.signals_process_data()
    return self.upload_turbines(
        projectsite=projectsite,
        signals_by_turbine=processor.signals_data,
        derived_signals_by_turbine=processor.signals_derived_data,
        assetlocations_by_turbine=assetlocations_by_turbine,
        permission_group_ids=permission_group_ids,
        sensor_serial_numbers_by_turbine=sensor_serial_numbers_by_turbine,
        temperature_compensation_signal_ids_by_turbine=(temperature_compensation_signal_ids_by_turbine),
    )
upload_from_processor_files
upload_from_processor_files(
    *,
    projectsite,
    processor,
    path_signal_sensor_map=None,
    path_sensor_tc_map=None,
    assetlocations_by_turbine=None,
    permission_group_ids=None,
)

Process configs, resolve optional file maps, and upload by turbine.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title shared by the batch.

required
processor SignalConfigUploadSource

Processor that populates turbine-scoped signal mappings.

required
path_signal_sensor_map str | Path | None

Optional JSON file keyed by turbine and signal id with SHM sensor lookup parameters. When sensor_type_id is itself a mapping, the uploader resolves it through get_sensor_type() before the final sensor lookup.

None
path_sensor_tc_map str | Path | None

Optional JSON file keyed by turbine with temperature- compensation signal identifiers to resolve through get_signal().

None
assetlocations_by_turbine Mapping[str, str] | None

Optional turbine-to-asset-location override mapping.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by turbine identifier.

Examples:

>>> from unittest.mock import Mock
>>> uploader = ShmSignalUploader(shm_api=Mock(), lookup_service=Mock())
>>> processor = Mock()
>>> processor.signals_data = {}
>>> processor.signals_derived_data = {}
>>> uploader.upload_from_processor_files(projectsite="Project A", processor=processor)
{}
Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_from_processor_files(
    self,
    *,
    projectsite: str,
    processor: SignalConfigUploadSource,
    path_signal_sensor_map: str | Path | None = None,
    path_sensor_tc_map: str | Path | None = None,
    assetlocations_by_turbine: Mapping[str, str] | None = None,
    permission_group_ids: Sequence[int] | None = None,
) -> dict[str, AssetSignalUploadResult]:
    """Process configs, resolve optional file maps, and upload by turbine.

    Parameters
    ----------
    projectsite
        Parent SDK project site title shared by the batch.
    processor
        Processor that populates turbine-scoped signal mappings.
    path_signal_sensor_map
        Optional JSON file keyed by turbine and signal id with SHM sensor
        lookup parameters. When ``sensor_type_id`` is itself a mapping,
        the uploader resolves it through ``get_sensor_type()`` before the
        final sensor lookup.
    path_sensor_tc_map
        Optional JSON file keyed by turbine with temperature-
        compensation signal identifiers to resolve through
        ``get_signal()``.
    assetlocations_by_turbine
        Optional turbine-to-asset-location override mapping.
    permission_group_ids
        Visibility groups applied to created SHM objects.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by turbine identifier.

    Examples
    --------
    >>> from unittest.mock import Mock
    >>> uploader = ShmSignalUploader(shm_api=Mock(), lookup_service=Mock())
    >>> processor = Mock()
    >>> processor.signals_data = {}
    >>> processor.signals_derived_data = {}
    >>> uploader.upload_from_processor_files(projectsite="Project A", processor=processor)
    {}
    """
    processor.signals_process_data()
    sensor_serial_numbers_by_turbine = self._resolve_sensor_serial_numbers_by_turbine(path_signal_sensor_map)
    temperature_compensation_signal_ids_by_turbine = self._resolve_temperature_compensation_signal_ids_by_turbine(
        path_sensor_tc_map
    )
    return self.upload_turbines(
        projectsite=projectsite,
        signals_by_turbine=processor.signals_data,
        derived_signals_by_turbine=processor.signals_derived_data,
        assetlocations_by_turbine=assetlocations_by_turbine,
        permission_group_ids=permission_group_ids,
        sensor_serial_numbers_by_turbine=sensor_serial_numbers_by_turbine,
        temperature_compensation_signal_ids_by_turbine=(temperature_compensation_signal_ids_by_turbine),
    )

Functions

build_derived_signal_calibration_payloads

build_derived_signal_calibration_payloads(
    derived_signal_id, signal_data
)

Build derived-signal calibration payloads from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_calibration_payloads(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> list[dict[str, Any]]:
    """Build derived-signal calibration payloads from archive-style data."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for calibration in calibrations:
        if not isinstance(calibration, Mapping):
            continue
        payloads.append(
            DerivedSignalCalibrationPayload.from_yaw_offset(
                derived_signal_id=derived_signal_id,
                calibration_date=calibration["time"],
                yaw_parameter=calibration["yaw_parameter"],
                yaw_offset=calibration["yaw_offset"],
                measurement_location=calibration.get("measurement_location"),
            ).to_payload()
        )
    return payloads

build_derived_signal_main_payload

build_derived_signal_main_payload(
    signal, signal_data, context
)

Build the main derived-signal payload from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main derived-signal payload from archive-style data."""
    if len(signal_data) <= 1:
        return None

    sub_assembly = context.subassembly_id_for(signal.subassembly)
    if sub_assembly is None:
        raise KeyError(f"Missing sub-assembly id for {signal.subassembly!r}")

    return DerivedSignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        sub_assembly=sub_assembly,
        signal_type=signal.signal_type,
        derived_signal_id=signal.raw,
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=signal_data.get("data"),
        visibility_groups=context.permission_group_ids,
    ).to_payload()

build_derived_signal_parent_patch

build_derived_signal_parent_patch(parent_signal_ids)

Build the parent-signals patch payload for derived signal status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_parent_patch(parent_signal_ids: Sequence[int]) -> dict[str, list[int]]:
    """Build the parent-signals patch payload for derived signal status rows."""
    return DerivedSignalHistoryParentSignalsPatch(parent_signal_ids).to_payload()

build_derived_signal_status_payload

build_derived_signal_status_payload(
    derived_signal_id, signal_data
)

Build the derived-signal status payload used before parent patching.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_status_payload(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> dict[str, Any]:
    """Build the derived-signal status payload used before parent patching."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)) or not calibrations:
        raise ValueError("Derived signal calibration rows are required to build a status payload.")

    first = calibrations[0]
    if not isinstance(first, Mapping):
        raise ValueError("Derived signal calibration rows must be mappings.")

    return DerivedSignalHistoryPayload(
        derived_signal_id=derived_signal_id,
        activity_start_timestamp=first["time"],
        is_latest_status=True,
        status="ok",
    ).to_payload()

build_sensor_calibration_payloads

build_sensor_calibration_payloads(
    signal_sensor_map,
    signal_calibration_map,
    path_to_datasheets,
)

Build sensor calibration payload models for one turbine.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_calibration_payloads(
    signal_sensor_map: Mapping[str, int],
    signal_calibration_map: Mapping[str, Mapping[str, str]],
    path_to_datasheets: str | Path,
) -> list[SensorCalibrationPayload]:
    """Build sensor calibration payload models for one turbine."""
    payloads: list[SensorCalibrationPayload] = []
    for signal_name, calibration in signal_calibration_map.items():
        sensor_id = signal_sensor_map.get(signal_name)
        if sensor_id is None:
            continue
        payloads.append(
            SensorCalibrationPayload(
                sensor_serial_number=sensor_id,
                calibration_date=calibration["date"],
                file=Path(path_to_datasheets) / calibration["filename"],
            )
        )
    return payloads

build_sensor_payloads

build_sensor_payloads(
    sensor_type_id,
    serial_numbers,
    cabinets,
    visibility_groups,
    visibility="usergroup",
)

Build sensor payload models from parallel columns.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_payloads(
    sensor_type_id: int,
    serial_numbers: Sequence[str | None],
    cabinets: Sequence[str | int | None],
    visibility_groups: Sequence[int] | None,
    visibility: str = "usergroup",
) -> list[SensorPayload]:
    """Build sensor payload models from parallel columns."""
    rows = _expand_columns({"serial_number": serial_numbers, "cabinet": cabinets})
    return [
        SensorPayload(
            sensor_type_id=sensor_type_id,
            serial_number=row["serial_number"],
            cabinet=row["cabinet"],
            visibility=visibility,
            visibility_groups=visibility_groups,
        )
        for row in rows
    ]

build_sensor_type_payloads

build_sensor_type_payloads(
    sensor_types_data,
    visibility_groups,
    path_to_images=None,
    visibility="usergroup",
)

Build sensor type payload models from raw records.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_type_payloads(
    sensor_types_data: Sequence[Mapping[str, Any]],
    visibility_groups: Sequence[int] | None,
    path_to_images: str | Path | None = None,
    visibility: str = "usergroup",
) -> list[SensorTypePayload]:
    """Build sensor type payload models from raw records."""
    payloads: list[SensorTypePayload] = []
    for entry in sensor_types_data:
        file_path: Path | None = None
        filename = entry.get("file")
        if filename is not None and path_to_images is not None:
            file_path = Path(path_to_images) / str(filename)
        payloads.append(
            SensorTypePayload(
                name=str(entry["name"]),
                type=str(entry["type"]),
                type_extended=str(entry["type_extended"]),
                hardware_supplier=str(entry["hardware_supplier"]),
                file=file_path,
                visibility=visibility,
                visibility_groups=visibility_groups,
            )
        )
    return payloads

build_signal_calibration_payloads

build_signal_calibration_payloads(
    signal_id, signal_data, tempcomp_signal_ids=None
)

Build signal calibration payloads from archive-style offset and CWL data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_calibration_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    tempcomp_signal_ids: Mapping[str, int] | None = None,
) -> list[dict[str, Any]]:
    """Build signal calibration payloads from archive-style offset and CWL data."""
    payloads: list[dict[str, Any]] = []

    offsets = signal_data.get("offset")
    if isinstance(offsets, Sequence) and not isinstance(offsets, (str, bytes)):
        for offset in offsets:
            if not isinstance(offset, Mapping):
                continue
            lead_correction = offset.get("lead_correction")
            tc_sensor = offset.get("TCSensor")
            payloads.append(
                SignalCalibrationPayload.from_offset(
                    signal_id=signal_id,
                    calibration_date=offset["time"],
                    offset=offset["offset"],
                    tempcomp_signal_id=(
                        tempcomp_signal_ids.get(tc_sensor)
                        if tempcomp_signal_ids is not None and isinstance(tc_sensor, str)
                        else None
                    ),
                    coefficients=offset.get("Coefficients"),
                    t_ref=offset.get("t_ref"),
                    gauge_correction=offset.get("gauge_correction"),
                    lead_correction=(
                        LeadCorrectionPayload(
                            t_ref=lead_correction["t_ref"],
                            coef=lead_correction["coef"],
                        )
                        if isinstance(lead_correction, Mapping)
                        else None
                    ),
                ).to_payload()
            )

    cwl_rows = signal_data.get("cwl")
    if isinstance(cwl_rows, Sequence) and not isinstance(cwl_rows, (str, bytes)):
        for cwl in cwl_rows:
            if not isinstance(cwl, Mapping):
                continue
            payloads.append(
                SignalCalibrationPayload.from_cwl(
                    signal_id=signal_id,
                    calibration_date=cwl["time"],
                    cwl=cwl["cwl"],
                ).to_payload()
            )

    return payloads

build_signal_main_payload

build_signal_main_payload(signal, signal_data, context)

Build the main signal payload from archive-style signal data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main signal payload from archive-style signal data."""
    if len(signal_data) <= 1:
        return None

    payload = SignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        signal_type=signal.signal_type,
        signal_id=signal.raw,
        sub_assembly=(
            context.subassembly_id_for(signal.subassembly) if signal.subassembly in {"TP", "TW", "MP"} else None
        ),
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=_legacy_signal_misc_data(signal_data),
        visibility_groups=context.permission_group_ids,
    )
    return payload.to_payload()

build_signal_status_payloads

build_signal_status_payloads(
    signal_id, signal_data, sensor_serial_number=None
)

Build signal status payloads from archive-style status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_status_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    sensor_serial_number: int | None = None,
) -> list[dict[str, Any]]:
    """Build signal status payloads from archive-style status rows."""
    statuses = signal_data.get("status")
    if not isinstance(statuses, Sequence) or isinstance(statuses, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for index, status in enumerate(statuses):
        if not isinstance(status, Mapping):
            continue
        status_row = cast(Mapping[str, Any], status)
        payloads.append(
            SignalHistoryPayload(
                signal_id=signal_id,
                activity_start_timestamp=cast(TimestampValue, status_row["time"]),
                is_latest_status=index == len(statuses) - 1,
                status=cast(str, status_row["status"]),
                sensor_serial_number=sensor_serial_number,
                legacy_signal_id=cast(Optional[str], status_row.get("name")),
            ).to_payload()
        )
    return payloads