Skip to content

API Reference

Auto-generated documentation pulled from source code docstrings:

io

API client for the shm extension.

This module exposes :class:ShmAPI as the low-level entry point for SHM transport and persistence helpers.

Examples:

>>> from owi.metadatabase.shm import ShmAPI
>>> isinstance(ShmAPI(token="dummy"), ShmAPI)
True

Classes

ShmEndpoints dataclass

ShmEndpoints(
    api_subdir="/shm/routes/",
    sensor_type="sensortype",
    sensor="sensor",
    sensor_calibration="sensorcalibration",
    signal="signal",
    signal_history="signalhistory",
    signal_calibration="signalcalibration",
    derived_signal="derivedsignal",
    derived_signal_history="derivedsignalhistory",
    derived_signal_calibration="derivedsignalcalibration",
)

Centralized route names for the SHM backend.

Functions
mutation_path
mutation_path(endpoint)

Return a collection endpoint path with a trailing slash.

Source code in src/owi/metadatabase/shm/io.py
def mutation_path(self, endpoint: str) -> str:
    """Return a collection endpoint path with a trailing slash."""
    return endpoint.rstrip("/") + "/"
detail_path
detail_path(endpoint, object_id)

Return a detail endpoint path with a trailing slash.

Source code in src/owi/metadatabase/shm/io.py
def detail_path(self, endpoint: str, object_id: int) -> str:
    """Return a detail endpoint path with a trailing slash."""
    return f"{endpoint.rstrip('/')}/{object_id}/"

ShmAPI

ShmAPI(api_subdir=api_subdir, **kwargs)

Bases: API

Low-level API client for the SHM extension.

Parameters:

Name Type Description Default
api_subdir str

API sub-path appended to the base root.

"/shm/routes/"
**kwargs Any

Forwarded to :class:owi.metadatabase.io.API.

{}

Examples:

>>> api = ShmAPI(token="dummy")
>>> api.ping()
'ok'
Source code in src/owi/metadatabase/shm/io.py
def __init__(self, api_subdir: str = DEFAULT_SHM_ENDPOINTS.api_subdir, **kwargs: Any) -> None:
    self.endpoints: ShmEndpoints = kwargs.pop("endpoints", DEFAULT_SHM_ENDPOINTS)
    super().__init__(**kwargs)
    self.base_api_root = self.api_root
    self.api_root = self.api_root + api_subdir
Functions
ping
ping()

Return a basic health response.

Examples:

>>> api = ShmAPI(token="dummy")
>>> api.ping()
'ok'
Source code in src/owi/metadatabase/shm/io.py
def ping(self) -> str:
    """Return a basic health response.

    Examples
    --------
    >>> api = ShmAPI(token="dummy")
    >>> api.ping()
    'ok'
    """
    return "ok"
get_signal
get_signal(signal_id, **kwargs)

Return a single SHM signal by its backend signal identifier.

Parameters:

Name Type Description Default
signal_id str

Backend-facing SHM signal identifier.

required
**kwargs QueryValue

Additional query parameters forwarded to the SHM route.

{}

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary containing data, exists, id, and response.

Examples:

>>> from unittest.mock import patch
>>> api = ShmAPI(token="dummy")
>>> with patch.object(
...     ShmAPI,
...     "process_data",
...     return_value=(pd.DataFrame([{"id": 7, "signal_id": "SG-01"}]), {"existance": True, "id": 7}),
... ):
...     result = api.get_signal("SG-01")
>>> result["id"]
7
Source code in src/owi/metadatabase/shm/io.py
def get_signal(self, signal_id: str, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM signal by its backend signal identifier.

    Parameters
    ----------
    signal_id
        Backend-facing SHM signal identifier.
    **kwargs
        Additional query parameters forwarded to the SHM route.

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary containing ``data``,
        ``exists``, ``id``, and ``response``.

    Examples
    --------
    >>> from unittest.mock import patch
    >>> api = ShmAPI(token="dummy")
    >>> with patch.object(
    ...     ShmAPI,
    ...     "process_data",
    ...     return_value=(pd.DataFrame([{"id": 7, "signal_id": "SG-01"}]), {"existance": True, "id": 7}),
    ... ):
    ...     result = api.get_signal("SG-01")
    >>> result["id"]
    7
    """
    return self._get_resource(self.endpoints.signal, signal_id=signal_id, **kwargs)
get_sensor_type
get_sensor_type(**kwargs)

Return a single SHM sensor type by query parameters.

Parameters:

Name Type Description Default
**kwargs QueryValue

Query parameters forwarded to the SHM sensor-type route.

{}

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary containing data, exists, id, and response.

Source code in src/owi/metadatabase/shm/io.py
def get_sensor_type(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM sensor type by query parameters.

    Parameters
    ----------
    **kwargs
        Query parameters forwarded to the SHM sensor-type route.

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary containing ``data``,
        ``exists``, ``id``, and ``response``.
    """
    return self._get_resource(self.endpoints.sensor_type, **kwargs)
get_sensor
get_sensor(**kwargs)

Return a single SHM sensor by query parameters.

Parameters:

Name Type Description Default
**kwargs QueryValue

Query parameters forwarded to the SHM sensor route.

{}

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary containing data, exists, id, and response.

Source code in src/owi/metadatabase/shm/io.py
def get_sensor(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM sensor by query parameters.

    Parameters
    ----------
    **kwargs
        Query parameters forwarded to the SHM sensor route.

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary containing ``data``,
        ``exists``, ``id``, and ``response``.
    """
    return self._get_resource(self.endpoints.sensor, **kwargs)
get_sensor_calibration
get_sensor_calibration(**kwargs)

Return a single SHM sensor calibration by query parameters.

Source code in src/owi/metadatabase/shm/io.py
def get_sensor_calibration(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM sensor calibration by query parameters."""
    return self._get_resource(self.endpoints.sensor_calibration, **kwargs)
get_signal_history
get_signal_history(**kwargs)

Return a single SHM signal history row by query parameters.

Source code in src/owi/metadatabase/shm/io.py
def get_signal_history(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM signal history row by query parameters."""
    return self._get_resource(self.endpoints.signal_history, **kwargs)
get_signal_calibration
get_signal_calibration(**kwargs)

Return a single SHM signal calibration by query parameters.

Source code in src/owi/metadatabase/shm/io.py
def get_signal_calibration(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM signal calibration by query parameters."""
    return self._get_resource(self.endpoints.signal_calibration, **kwargs)
get_derived_signal
get_derived_signal(**kwargs)

Return a single SHM derived signal by query parameters.

Source code in src/owi/metadatabase/shm/io.py
def get_derived_signal(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM derived signal by query parameters."""
    return self._get_resource(self.endpoints.derived_signal, **kwargs)
get_derived_signal_history
get_derived_signal_history(**kwargs)

Return a single SHM derived signal history row by query parameters.

Source code in src/owi/metadatabase/shm/io.py
def get_derived_signal_history(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM derived signal history row by query parameters."""
    return self._get_resource(self.endpoints.derived_signal_history, **kwargs)
get_derived_signal_calibration
get_derived_signal_calibration(**kwargs)

Return a single SHM derived signal calibration by query parameters.

Source code in src/owi/metadatabase/shm/io.py
def get_derived_signal_calibration(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return a single SHM derived signal calibration by query parameters."""
    return self._get_resource(self.endpoints.derived_signal_calibration, **kwargs)
create_signal
create_signal(payload)

Create a signal record.

Examples:

>>> from unittest.mock import patch
>>> api = ShmAPI(token="dummy")
>>> with patch.object(ShmAPI, "_mutate_resource", return_value={"id": 12, "exists": True}) as mocker:
...     result = api.create_signal({"signal_id": "SG-01"})
>>> mocker.assert_called_once_with(api.endpoints.signal, {"signal_id": "SG-01"})
>>> result["id"]
12
Source code in src/owi/metadatabase/shm/io.py
def create_signal(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal record.

    Examples
    --------
    >>> from unittest.mock import patch
    >>> api = ShmAPI(token="dummy")
    >>> with patch.object(ShmAPI, "_mutate_resource", return_value={"id": 12, "exists": True}) as mocker:
    ...     result = api.create_signal({"signal_id": "SG-01"})
    >>> mocker.assert_called_once_with(api.endpoints.signal, {"signal_id": "SG-01"})
    >>> result["id"]
    12
    """
    return self._mutate_resource(self.endpoints.signal, payload)
create_signal_history
create_signal_history(payload)

Create a signal history record.

Source code in src/owi/metadatabase/shm/io.py
def create_signal_history(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal history record."""
    return self._mutate_resource(self.endpoints.signal_history, payload)
create_signal_calibration
create_signal_calibration(payload)

Create a signal calibration record.

Source code in src/owi/metadatabase/shm/io.py
def create_signal_calibration(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal calibration record."""
    return self._mutate_resource(self.endpoints.signal_calibration, payload)
create_derived_signal
create_derived_signal(payload)

Create a derived signal record.

Source code in src/owi/metadatabase/shm/io.py
def create_derived_signal(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal record."""
    return self._mutate_resource(self.endpoints.derived_signal, payload)
create_derived_signal_history
create_derived_signal_history(payload)

Create a derived signal history record.

Source code in src/owi/metadatabase/shm/io.py
def create_derived_signal_history(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal history record."""
    return self._mutate_resource(self.endpoints.derived_signal_history, payload)
patch_derived_signal_history
patch_derived_signal_history(history_id, payload)

Patch a derived signal history record by id.

Source code in src/owi/metadatabase/shm/io.py
def patch_derived_signal_history(self, history_id: int, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Patch a derived signal history record by id."""
    return self._mutate_resource(
        self.endpoints.derived_signal_history,
        payload,
        object_id=history_id,
        method="patch",
    )
create_derived_signal_calibration
create_derived_signal_calibration(payload)

Create a derived signal calibration record.

Source code in src/owi/metadatabase/shm/io.py
def create_derived_signal_calibration(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal calibration record."""
    return self._mutate_resource(self.endpoints.derived_signal_calibration, payload)
list_sensor_types
list_sensor_types(**kwargs)

Return all SHM sensor types matching the query parameters.

Parameters:

Name Type Description Default
**kwargs QueryValue

Query parameters forwarded to the sensor-type list route.

{}

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary containing data, exists, and response.

Source code in src/owi/metadatabase/shm/io.py
def list_sensor_types(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM sensor types matching the query parameters.

    Parameters
    ----------
    **kwargs
        Query parameters forwarded to the sensor-type list route.

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary containing ``data``,
        ``exists``, and ``response``.
    """
    return self._list_resource(self.endpoints.sensor_type, **kwargs)
list_sensors
list_sensors(**kwargs)

Return all SHM sensors matching the query parameters.

Parameters:

Name Type Description Default
**kwargs QueryValue

Query parameters forwarded to the sensor list route.

{}

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary containing data, exists, and response.

Source code in src/owi/metadatabase/shm/io.py
def list_sensors(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM sensors matching the query parameters.

    Parameters
    ----------
    **kwargs
        Query parameters forwarded to the sensor list route.

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary containing ``data``,
        ``exists``, and ``response``.
    """
    return self._list_resource(self.endpoints.sensor, **kwargs)
list_sensor_calibrations
list_sensor_calibrations(**kwargs)

Return all SHM sensor calibrations matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_sensor_calibrations(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM sensor calibrations matching the query parameters."""
    return self._list_resource(self.endpoints.sensor_calibration, **kwargs)
list_signals
list_signals(**kwargs)

Return all SHM signals matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_signals(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM signals matching the query parameters."""
    return self._list_resource(self.endpoints.signal, **kwargs)
list_signal_history
list_signal_history(**kwargs)

Return all SHM signal history rows matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_signal_history(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM signal history rows matching the query parameters."""
    return self._list_resource(self.endpoints.signal_history, **kwargs)
list_signal_calibrations
list_signal_calibrations(**kwargs)

Return all SHM signal calibrations matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_signal_calibrations(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM signal calibrations matching the query parameters."""
    return self._list_resource(self.endpoints.signal_calibration, **kwargs)
list_derived_signals
list_derived_signals(**kwargs)

Return all SHM derived signals matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_derived_signals(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM derived signals matching the query parameters."""
    return self._list_resource(self.endpoints.derived_signal, **kwargs)
list_derived_signal_history
list_derived_signal_history(**kwargs)

Return all SHM derived signal history rows matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_derived_signal_history(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM derived signal history rows matching the query parameters."""
    return self._list_resource(self.endpoints.derived_signal_history, **kwargs)
list_derived_signal_calibrations
list_derived_signal_calibrations(**kwargs)

Return all SHM derived signal calibrations matching the query parameters.

Source code in src/owi/metadatabase/shm/io.py
def list_derived_signal_calibrations(self, **kwargs: QueryValue) -> dict[str, Any]:
    """Return all SHM derived signal calibrations matching the query parameters."""
    return self._list_resource(self.endpoints.derived_signal_calibration, **kwargs)
create_sensor_type
create_sensor_type(payload, files=None)

Create a sensor type record, optionally with an image attachment.

Parameters:

Name Type Description Default
payload Mapping[str, Any]

Form fields for the sensor type resource.

required
files Mapping[str, Any] | None

Optional file mapping (e.g. {"photo": open_file}).

None

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary.

Source code in src/owi/metadatabase/shm/io.py
def create_sensor_type(
    self,
    payload: Mapping[str, Any],
    files: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
    """Create a sensor type record, optionally with an image attachment.

    Parameters
    ----------
    payload
        Form fields for the sensor type resource.
    files
        Optional file mapping (e.g. ``{"photo": open_file}``).

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary.
    """
    if files:
        return self._mutate_multipart_resource(self.endpoints.sensor_type, payload, files=files)
    return self._mutate_resource(self.endpoints.sensor_type, payload)
create_sensor
create_sensor(payload)

Create a sensor record.

Parameters:

Name Type Description Default
payload Mapping[str, Any]

JSON payload for the sensor resource.

required

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary.

Source code in src/owi/metadatabase/shm/io.py
def create_sensor(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a sensor record.

    Parameters
    ----------
    payload
        JSON payload for the sensor resource.

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary.
    """
    return self._mutate_resource(self.endpoints.sensor, payload)
create_sensor_calibration
create_sensor_calibration(payload, files=None)

Create a sensor calibration record, optionally with a PDF attachment.

Parameters:

Name Type Description Default
payload Mapping[str, Any]

Form fields for the sensor calibration resource.

required
files Mapping[str, Any] | None

Optional file mapping (e.g. {"datasheet": open_file}).

None

Returns:

Type Description
dict[str, Any]

Parent-SDK-style result dictionary.

Source code in src/owi/metadatabase/shm/io.py
def create_sensor_calibration(
    self,
    payload: Mapping[str, Any],
    files: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
    """Create a sensor calibration record, optionally with a PDF attachment.

    Parameters
    ----------
    payload
        Form fields for the sensor calibration resource.
    files
        Optional file mapping (e.g. ``{"datasheet": open_file}``).

    Returns
    -------
    dict[str, Any]
        Parent-SDK-style result dictionary.
    """
    if files:
        return self._mutate_multipart_resource(self.endpoints.sensor_calibration, payload, files=files)
    return self._mutate_resource(self.endpoints.sensor_calibration, payload)

lookup

Parent-SDK lookup services for SHM workflows.

This module centralizes the parent SDK lookups required by SHM upload and orchestration flows while keeping transport concerns outside the workflows themselves.

Classes

ParentLocationsLookupClient

Bases: Protocol

Protocol for parent SDK location lookups used by SHM services.

Functions
get_projectsite_detail
get_projectsite_detail(projectsite, **kwargs)

Return a single project site lookup response.

Source code in src/owi/metadatabase/shm/lookup.py
def get_projectsite_detail(self, projectsite: str, **kwargs: Any) -> dict[str, Any]:
    """Return a single project site lookup response."""
get_assetlocation_detail
get_assetlocation_detail(
    assetlocation, projectsite=None, **kwargs
)

Return a single asset location lookup response.

Source code in src/owi/metadatabase/shm/lookup.py
def get_assetlocation_detail(
    self,
    assetlocation: str,
    projectsite: str | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Return a single asset location lookup response."""

ParentGeometryLookupClient

Bases: Protocol

Protocol for parent SDK geometry lookups used by SHM services.

Functions
get_subassemblies
get_subassemblies(
    projectsite=None,
    assetlocation=None,
    subassembly_type=None,
    model_definition=None,
)

Return a subassembly lookup response.

Source code in src/owi/metadatabase/shm/lookup.py
def get_subassemblies(
    self,
    projectsite: str | None = None,
    assetlocation: str | None = None,
    subassembly_type: str | None = None,
    model_definition: str | None = None,
) -> dict[str, Any]:
    """Return a subassembly lookup response."""

LookupRecord dataclass

LookupRecord(data, record_id=None)

Normalized lookup record returned by SHM services.

AssetLookupContext dataclass

AssetLookupContext(
    site, asset, subassemblies, model_definition
)

Resolved lookup context for an SHM asset workflow.

ShmLookupError

ShmLookupError(message)

Bases: APIException

Base exception for SHM lookup service failures.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

ProjectSiteLookupError

ProjectSiteLookupError(message)

Bases: ShmLookupError

Raised when a project site lookup cannot be resolved.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

AssetLocationLookupError

AssetLocationLookupError(message)

Bases: ShmLookupError

Raised when an asset location lookup cannot be resolved.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

SubassembliesLookupError

SubassembliesLookupError(message)

Bases: ShmLookupError

Raised when a subassembly lookup cannot be resolved.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

ModelDefinitionLookupError

ModelDefinitionLookupError(message)

Bases: ShmLookupError

Raised when a SHM model definition cannot be derived from subassemblies.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

SignalUploadContextError

SignalUploadContextError(message)

Bases: ShmLookupError

Raised when parent lookup data cannot be translated into upload ids.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

ParentSDKLookupService

ParentSDKLookupService(locations_client, geometry_client)

Resolve parent-SDK lookup data for SHM workflows.

Parameters:

Name Type Description Default
locations_client ParentLocationsLookupClient

Parent SDK client that resolves project and asset location details.

required
geometry_client ParentGeometryLookupClient

Parent SDK client that resolves geometry subassemblies.

required
Source code in src/owi/metadatabase/shm/lookup.py
def __init__(
    self,
    locations_client: ParentLocationsLookupClient,
    geometry_client: ParentGeometryLookupClient,
) -> None:
    self.locations_client = locations_client
    self.geometry_client = geometry_client
Functions
get_projectsite
get_projectsite(projectsite, **kwargs)

Resolve a project site detail lookup.

Source code in src/owi/metadatabase/shm/lookup.py
def get_projectsite(self, projectsite: str, **kwargs: Any) -> LookupRecord:
    """Resolve a project site detail lookup."""
    result = self.locations_client.get_projectsite_detail(projectsite=projectsite, **kwargs)
    return self._build_record(
        result=result,
        label=f"project site '{projectsite}'",
        error_type=ProjectSiteLookupError,
    )
get_assetlocation
get_assetlocation(
    assetlocation, projectsite=None, **kwargs
)

Resolve an asset location detail lookup.

Source code in src/owi/metadatabase/shm/lookup.py
def get_assetlocation(
    self,
    assetlocation: str,
    projectsite: str | None = None,
    **kwargs: Any,
) -> LookupRecord:
    """Resolve an asset location detail lookup."""
    result = self.locations_client.get_assetlocation_detail(
        assetlocation=assetlocation,
        projectsite=projectsite,
        **kwargs,
    )
    label = f"asset location '{assetlocation}'"
    if projectsite is not None:
        label += f" in project site '{projectsite}'"
    return self._build_record(
        result=result,
        label=label,
        error_type=AssetLocationLookupError,
    )
get_subassemblies
get_subassemblies(
    assetlocation, projectsite=None, **kwargs
)

Resolve a subassembly lookup.

Source code in src/owi/metadatabase/shm/lookup.py
def get_subassemblies(
    self,
    assetlocation: str,
    projectsite: str | None = None,
    **kwargs: Any,
) -> LookupRecord:
    """Resolve a subassembly lookup."""
    result = self.geometry_client.get_subassemblies(
        projectsite=projectsite,
        assetlocation=assetlocation,
        **kwargs,
    )
    label = f"subassemblies for asset location '{assetlocation}'"
    if projectsite is not None:
        label += f" in project site '{projectsite}'"
    return self._build_record(
        result=result,
        label=label,
        error_type=SubassembliesLookupError,
    )
get_asset_context
get_asset_context(projectsite, assetlocation)

Resolve the lookup context needed for an SHM asset workflow.

Parameters:

Name Type Description Default
projectsite str | None

Parent SDK project site title. When omitted, the service derives it from the asset-location lookup data.

required
assetlocation str

Parent SDK asset location title.

required

Returns:

Type Description
AssetLookupContext

Typed lookup records plus the resolved model definition.

Examples:

>>> from unittest.mock import Mock
>>> locations_client = Mock()
>>> geometry_client = Mock()
>>> locations_client.get_assetlocation_detail.return_value = {
...     "data": pd.DataFrame([{"id": 11, "projectsite_name": "Project A"}]),
...     "exists": True,
...     "id": 11,
... }
>>> locations_client.get_projectsite_detail.return_value = {
...     "data": pd.DataFrame([{"id": 10, "title": "Project A"}]),
...     "exists": True,
...     "id": 10,
... }
>>> geometry_client.get_subassemblies.return_value = {
...     "data": pd.DataFrame([{"id": 40, "subassembly_type": "TP", "model_definition": "MD-01"}]),
...     "exists": True,
... }
>>> geometry_client.get_modeldefinition_id.return_value = {"id": 99}
>>> service = ParentSDKLookupService(locations_client=locations_client, geometry_client=geometry_client)
>>> context = service.get_asset_context(projectsite=None, assetlocation="Asset-01")
>>> (context.site.record_id, context.asset.record_id, context.model_definition)
(10, 11, 99)
Source code in src/owi/metadatabase/shm/lookup.py
def get_asset_context(
    self,
    projectsite: str | None,
    assetlocation: str,
) -> AssetLookupContext:
    """Resolve the lookup context needed for an SHM asset workflow.

    Parameters
    ----------
    projectsite
        Parent SDK project site title. When omitted, the service
        derives it from the asset-location lookup data.
    assetlocation
        Parent SDK asset location title.

    Returns
    -------
    AssetLookupContext
        Typed lookup records plus the resolved model definition.

    Examples
    --------
    >>> from unittest.mock import Mock
    >>> locations_client = Mock()
    >>> geometry_client = Mock()
    >>> locations_client.get_assetlocation_detail.return_value = {
    ...     "data": pd.DataFrame([{"id": 11, "projectsite_name": "Project A"}]),
    ...     "exists": True,
    ...     "id": 11,
    ... }
    >>> locations_client.get_projectsite_detail.return_value = {
    ...     "data": pd.DataFrame([{"id": 10, "title": "Project A"}]),
    ...     "exists": True,
    ...     "id": 10,
    ... }
    >>> geometry_client.get_subassemblies.return_value = {
    ...     "data": pd.DataFrame([{"id": 40, "subassembly_type": "TP", "model_definition": "MD-01"}]),
    ...     "exists": True,
    ... }
    >>> geometry_client.get_modeldefinition_id.return_value = {"id": 99}
    >>> service = ParentSDKLookupService(locations_client=locations_client, geometry_client=geometry_client)
    >>> context = service.get_asset_context(projectsite=None, assetlocation="Asset-01")
    >>> (context.site.record_id, context.asset.record_id, context.model_definition)
    (10, 11, 99)
    """
    asset = self.get_assetlocation(assetlocation=assetlocation, projectsite=projectsite)
    resolved_projectsite = projectsite or self._resolve_projectsite_name(asset, assetlocation)
    site = self.get_projectsite(projectsite=resolved_projectsite)
    subassemblies = self.get_subassemblies(assetlocation=assetlocation, projectsite=resolved_projectsite)
    model_definition = self.get_model_definition(
        subassemblies=subassemblies,
        assetlocation=assetlocation,
        projectsite=resolved_projectsite,
    )
    return AssetLookupContext(
        site=site,
        asset=asset,
        subassemblies=subassemblies,
        model_definition=model_definition,
    )
get_signal_upload_context
get_signal_upload_context(
    projectsite, assetlocation, permission_group_ids=None
)

Resolve the payload-builder context for SHM signal uploads.

Parameters:

Name Type Description Default
projectsite str | None

Parent SDK project site title. When omitted, the service derives it from the asset-location lookup data.

required
assetlocation str

Parent SDK asset location title.

required
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM records.

None

Returns:

Type Description
SignalUploadContext

Upload context compatible with legacy payload builders.

Examples:

>>> from unittest.mock import Mock
>>> locations_client = Mock()
>>> geometry_client = Mock()
>>> locations_client.get_projectsite_detail.return_value = {
...     "data": pd.DataFrame([{"id": 10, "title": "Project A"}]),
...     "exists": True,
...     "id": 10,
... }
>>> locations_client.get_assetlocation_detail.return_value = {
...     "data": pd.DataFrame([{"id": 11, "title": "Asset-01"}]),
...     "exists": True,
...     "id": 11,
... }
>>> geometry_client.get_subassemblies.return_value = {
...     "data": pd.DataFrame(
...         [
...             {"id": 40, "subassembly_type": "TP", "model_definition": "MD-01"},
...             {"id": 41, "subassembly_type": "TW", "model_definition": "MD-01"},
...         ]
...     ),
...     "exists": True,
... }
>>> service = ParentSDKLookupService(locations_client=locations_client, geometry_client=geometry_client)
>>> context = service.get_signal_upload_context("Project A", "Asset-01", permission_group_ids=[7])
>>> context.site_id, context.asset_location_id, context.subassembly_id_for("TP")
(10, 11, 40)
Source code in src/owi/metadatabase/shm/lookup.py
def get_signal_upload_context(
    self,
    projectsite: str | None,
    assetlocation: str,
    permission_group_ids: Sequence[int] | None = None,
) -> SignalUploadContext:
    """Resolve the payload-builder context for SHM signal uploads.

    Parameters
    ----------
    projectsite
        Parent SDK project site title. When omitted, the service
        derives it from the asset-location lookup data.
    assetlocation
        Parent SDK asset location title.
    permission_group_ids
        Visibility groups applied to created SHM records.

    Returns
    -------
    SignalUploadContext
        Upload context compatible with legacy payload builders.

    Examples
    --------
    >>> from unittest.mock import Mock
    >>> locations_client = Mock()
    >>> geometry_client = Mock()
    >>> locations_client.get_projectsite_detail.return_value = {
    ...     "data": pd.DataFrame([{"id": 10, "title": "Project A"}]),
    ...     "exists": True,
    ...     "id": 10,
    ... }
    >>> locations_client.get_assetlocation_detail.return_value = {
    ...     "data": pd.DataFrame([{"id": 11, "title": "Asset-01"}]),
    ...     "exists": True,
    ...     "id": 11,
    ... }
    >>> geometry_client.get_subassemblies.return_value = {
    ...     "data": pd.DataFrame(
    ...         [
    ...             {"id": 40, "subassembly_type": "TP", "model_definition": "MD-01"},
    ...             {"id": 41, "subassembly_type": "TW", "model_definition": "MD-01"},
    ...         ]
    ...     ),
    ...     "exists": True,
    ... }
    >>> service = ParentSDKLookupService(locations_client=locations_client, geometry_client=geometry_client)
    >>> context = service.get_signal_upload_context("Project A", "Asset-01", permission_group_ids=[7])
    >>> context.site_id, context.asset_location_id, context.subassembly_id_for("TP")
    (10, 11, 40)
    """
    asset_context = self.get_asset_context(
        projectsite=projectsite,
        assetlocation=assetlocation,
    )
    return self.build_signal_upload_context(
        asset_context=asset_context,
        permission_group_ids=permission_group_ids,
    )
build_signal_upload_context staticmethod
build_signal_upload_context(
    asset_context, permission_group_ids=None
)

Translate parent lookup records into upload payload ids.

Parameters:

Name Type Description Default
asset_context AssetLookupContext

Normalized parent SDK lookup context.

required
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM records.

None

Returns:

Type Description
SignalUploadContext

Upload context compatible with legacy payload builders.

Raises:

Type Description
SignalUploadContextError

If required parent lookup ids or subassembly columns are missing.

Examples:

>>> asset_context = AssetLookupContext(
...     site=LookupRecord(pd.DataFrame([{"id": 10}]), record_id=10),
...     asset=LookupRecord(pd.DataFrame([{"id": 11}]), record_id=11),
...     subassemblies=LookupRecord(
...         pd.DataFrame(
...             [
...                 {"id": 40, "subassembly_type": "TP", "model_definition": "MD-01"},
...                 {"id": 41, "subassembly_type": "TW", "model_definition": "MD-01"},
...             ]
...         )
...     ),
...     model_definition="MD-01",
... )
>>> upload_context = ParentSDKLookupService.build_signal_upload_context(asset_context, [3, 5])
>>> upload_context.permission_group_ids
[3, 5]
Source code in src/owi/metadatabase/shm/lookup.py
@staticmethod
def build_signal_upload_context(
    asset_context: AssetLookupContext,
    permission_group_ids: Sequence[int] | None = None,
) -> SignalUploadContext:
    """Translate parent lookup records into upload payload ids.

    Parameters
    ----------
    asset_context
        Normalized parent SDK lookup context.
    permission_group_ids
        Visibility groups applied to created SHM records.

    Returns
    -------
    SignalUploadContext
        Upload context compatible with legacy payload builders.

    Raises
    ------
    SignalUploadContextError
        If required parent lookup ids or subassembly columns are missing.

    Examples
    --------
    >>> asset_context = AssetLookupContext(
    ...     site=LookupRecord(pd.DataFrame([{"id": 10}]), record_id=10),
    ...     asset=LookupRecord(pd.DataFrame([{"id": 11}]), record_id=11),
    ...     subassemblies=LookupRecord(
    ...         pd.DataFrame(
    ...             [
    ...                 {"id": 40, "subassembly_type": "TP", "model_definition": "MD-01"},
    ...                 {"id": 41, "subassembly_type": "TW", "model_definition": "MD-01"},
    ...             ]
    ...         )
    ...     ),
    ...     model_definition="MD-01",
    ... )
    >>> upload_context = ParentSDKLookupService.build_signal_upload_context(asset_context, [3, 5])
    >>> upload_context.permission_group_ids
    [3, 5]
    """
    if asset_context.site.record_id is None:
        raise SignalUploadContextError("Project site lookup did not provide a record id.")
    if asset_context.asset.record_id is None:
        raise SignalUploadContextError("Asset location lookup did not provide a record id.")

    return SignalUploadContext(
        site_id=int(asset_context.site.record_id),
        asset_location_id=int(asset_context.asset.record_id),
        model_definition_id=asset_context.model_definition,
        permission_group_ids=(list(permission_group_ids) if permission_group_ids is not None else None),
        subassembly_ids_by_type=ParentSDKLookupService._build_subassembly_ids_by_type(asset_context.subassemblies),
    )
get_model_definition
get_model_definition(
    subassemblies, assetlocation, projectsite
)

Resolve the model definition reference used by SHM payload builders.

The lookup prefers the transition-piece model definition present on the subassembly rows and, when the parent geometry client exposes get_modeldefinition_id(), upgrades a model-definition title into the corresponding backend id.

Source code in src/owi/metadatabase/shm/lookup.py
def get_model_definition(
    self,
    subassemblies: LookupRecord,
    assetlocation: str,
    projectsite: str,
) -> int | str:
    """Resolve the model definition reference used by SHM payload builders.

    The lookup prefers the transition-piece model definition present on
    the subassembly rows and, when the parent geometry client exposes
    ``get_modeldefinition_id()``, upgrades a model-definition title into
    the corresponding backend id.
    """
    model_definition = self.get_transition_piece_model_definition(subassemblies=subassemblies)
    if isinstance(model_definition, int):
        return model_definition

    get_modeldefinition_id = getattr(self.geometry_client, "get_modeldefinition_id", None)
    if not callable(get_modeldefinition_id):
        return model_definition

    try:
        result = get_modeldefinition_id(
            assetlocation=assetlocation,
            projectsite=projectsite,
            model_definition=model_definition,
        )
    except ValueError as exc:
        raise ModelDefinitionLookupError(str(exc)) from exc

    if not isinstance(result, Mapping):
        return model_definition

    record_id = result.get("id")
    normalized_record_id = self._normalize_model_definition(record_id)
    if normalized_record_id is None:
        return model_definition

    if isinstance(normalized_record_id, int):
        return normalized_record_id

    try:
        return int(normalized_record_id)
    except (TypeError, ValueError):
        return model_definition
get_transition_piece_model_definition staticmethod
get_transition_piece_model_definition(subassemblies)

Extract the transition-piece model definition from subassemblies.

Source code in src/owi/metadatabase/shm/lookup.py
@staticmethod
def get_transition_piece_model_definition(
    subassemblies: LookupRecord,
) -> int | str:
    """Extract the transition-piece model definition from subassemblies."""
    if "subassembly_type" not in subassemblies.data or "model_definition" not in subassemblies.data:
        raise ModelDefinitionLookupError(
            "Subassembly lookup data must contain 'subassembly_type' and 'model_definition' columns."
        )

    transition_pieces = subassemblies.data[subassemblies.data["subassembly_type"] == "TP"]
    if transition_pieces.empty:
        raise ModelDefinitionLookupError("No transition-piece subassembly found in lookup result.")

    model_definitions = [
        normalized
        for value in transition_pieces["model_definition"].tolist()
        for normalized in [ParentSDKLookupService._normalize_model_definition(value)]
        if normalized is not None
    ]
    unique_definitions = list(dict.fromkeys(model_definitions))
    if not unique_definitions:
        raise ModelDefinitionLookupError("Transition-piece subassemblies do not define a model definition.")
    if len(unique_definitions) > 1:
        raise ModelDefinitionLookupError(
            "Transition-piece subassemblies map to multiple model definitions; the backend data is ambiguous."
        )
    return unique_definitions[0]

models

Pydantic models for typed SHM resources.

Classes

ShmEntityName

Bases: str, Enum

Supported SHM entity names.

ShmBaseModel

Bases: BaseModel

Base Pydantic configuration for SHM models.

ShmResourceRecord

Bases: ShmBaseModel

Base resource model shared by SHM entity records.

ShmQuery

Bases: BaseModel

Validated wrapper for backend filter payloads.

Functions
to_backend_filters
to_backend_filters()

Return backend-compatible filter arguments.

Source code in src/owi/metadatabase/shm/models.py
def to_backend_filters(self) -> dict[str, Any]:
    """Return backend-compatible filter arguments."""
    return dict(self.backend_filters)

SensorTypeRecord

Bases: ShmResourceRecord

Typed SHM sensor-type record.

SensorRecord

Bases: ShmResourceRecord

Typed SHM sensor record.

SensorCalibrationRecord

Bases: ShmResourceRecord

Typed SHM sensor calibration record.

SignalRecord

Bases: ShmResourceRecord

Typed SHM signal record.

SignalHistoryRecord

Bases: ShmResourceRecord

Typed SHM signal history record.

SignalCalibrationRecord

Bases: ShmResourceRecord

Typed SHM signal calibration record.

DerivedSignalRecord

Bases: ShmResourceRecord

Typed SHM derived-signal record.

DerivedSignalHistoryRecord

Bases: ShmResourceRecord

Typed SHM derived-signal history record.

DerivedSignalCalibrationRecord

Bases: ShmResourceRecord

Typed SHM derived-signal calibration record.

serializers

Serializers for typed SHM resources.

Classes

ShmEntitySerializer

ShmEntitySerializer(record_model, *, json_fields=())

Bases: Generic[TShmRecord]

Generic serializer for a single SHM entity type.

Source code in src/owi/metadatabase/shm/serializers.py
def __init__(self, record_model: type[TShmRecord], *, json_fields: tuple[str, ...] = ()) -> None:
    self.record_model = record_model
    self.json_fields = json_fields
Functions
to_payload
to_payload(obj)

Serialize a resource model or mapping into a backend payload.

Source code in src/owi/metadatabase/shm/serializers.py
def to_payload(self, obj: TShmRecord | BaseModel | Mapping[str, Any]) -> dict[str, Any]:
    """Serialize a resource model or mapping into a backend payload."""
    if isinstance(obj, BaseModel):
        return obj.model_dump(mode="json", exclude_none=True)
    return {key: value for key, value in _normalize_mapping(dict(obj)).items() if value is not None}
from_mapping
from_mapping(mapping)

Deserialize a backend row into a typed resource model.

Source code in src/owi/metadatabase/shm/serializers.py
def from_mapping(self, mapping: Mapping[str, Any]) -> TShmRecord:
    """Deserialize a backend row into a typed resource model."""
    normalized = _normalize_mapping(dict(mapping))
    for field_name in self.json_fields:
        normalized[field_name] = _normalize_json_field(normalized.get(field_name))
    return self.record_model.model_validate(normalized)

registry

Entity registry for typed SHM resources.

Classes

ShmEntityDefinition dataclass

ShmEntityDefinition(
    name, endpoint, record_model, serializer
)

Typed description of one SHM resource.

ShmEntityRegistry

ShmEntityRegistry()

Simple in-process registry for SHM entities.

Source code in src/owi/metadatabase/shm/registry.py
def __init__(self) -> None:
    self._registry: dict[ShmEntityName, ShmEntityDefinition] = {}
Functions
register
register(definition)

Register one SHM entity definition.

Source code in src/owi/metadatabase/shm/registry.py
def register(self, definition: ShmEntityDefinition) -> ShmEntityDefinition:
    """Register one SHM entity definition."""
    self._registry[definition.name] = definition
    return definition
get
get(entity_name)

Return the configured entity definition.

Source code in src/owi/metadatabase/shm/registry.py
def get(self, entity_name: ShmEntityName | str) -> ShmEntityDefinition:
    """Return the configured entity definition."""
    resolved_name = entity_name if isinstance(entity_name, ShmEntityName) else ShmEntityName(entity_name)
    try:
        return self._registry[resolved_name]
    except KeyError as exc:
        raise KeyError(f"Unknown SHM entity: {resolved_name}") from exc
names
names()

Return registered entity names.

Source code in src/owi/metadatabase/shm/registry.py
def names(self) -> list[str]:
    """Return registered entity names."""
    return sorted(name.value for name in self._registry)

services

Service facades for typed SHM entities.

Classes

ApiShmRepository

ApiShmRepository(api=None)

Repository adapter built on top of :class:ShmAPI.

Source code in src/owi/metadatabase/shm/services/core.py
def __init__(self, api: ShmAPI | None = None) -> None:
    self.api = api or ShmAPI(token="dummy")
    self._list_methods = {
        ShmEntityName.SENSOR_TYPE: self.api.list_sensor_types,
        ShmEntityName.SENSOR: self.api.list_sensors,
        ShmEntityName.SENSOR_CALIBRATION: self.api.list_sensor_calibrations,
        ShmEntityName.SIGNAL: self.api.list_signals,
        ShmEntityName.SIGNAL_HISTORY: self.api.list_signal_history,
        ShmEntityName.SIGNAL_CALIBRATION: self.api.list_signal_calibrations,
        ShmEntityName.DERIVED_SIGNAL: self.api.list_derived_signals,
        ShmEntityName.DERIVED_SIGNAL_HISTORY: self.api.list_derived_signal_history,
        ShmEntityName.DERIVED_SIGNAL_CALIBRATION: self.api.list_derived_signal_calibrations,
    }
    self._get_methods = {
        ShmEntityName.SENSOR_TYPE: self.api.get_sensor_type,
        ShmEntityName.SENSOR: self.api.get_sensor,
        ShmEntityName.SENSOR_CALIBRATION: self.api.get_sensor_calibration,
        ShmEntityName.SIGNAL: self.api.get_signal,
        ShmEntityName.SIGNAL_HISTORY: self.api.get_signal_history,
        ShmEntityName.SIGNAL_CALIBRATION: self.api.get_signal_calibration,
        ShmEntityName.DERIVED_SIGNAL: self.api.get_derived_signal,
        ShmEntityName.DERIVED_SIGNAL_HISTORY: self.api.get_derived_signal_history,
        ShmEntityName.DERIVED_SIGNAL_CALIBRATION: self.api.get_derived_signal_calibration,
    }
    self._create_methods = {
        ShmEntityName.SENSOR_TYPE: self.api.create_sensor_type,
        ShmEntityName.SENSOR: self.api.create_sensor,
        ShmEntityName.SENSOR_CALIBRATION: self.api.create_sensor_calibration,
        ShmEntityName.SIGNAL: self.api.create_signal,
        ShmEntityName.SIGNAL_HISTORY: self.api.create_signal_history,
        ShmEntityName.SIGNAL_CALIBRATION: self.api.create_signal_calibration,
        ShmEntityName.DERIVED_SIGNAL: self.api.create_derived_signal,
        ShmEntityName.DERIVED_SIGNAL_HISTORY: self.api.create_derived_signal_history,
        ShmEntityName.DERIVED_SIGNAL_CALIBRATION: self.api.create_derived_signal_calibration,
    }
Functions
list_records
list_records(entity_name, **filters)

Return backend rows for a collection query.

Source code in src/owi/metadatabase/shm/services/core.py
def list_records(self, entity_name: ShmEntityName | str, **filters: Any) -> pd.DataFrame:
    """Return backend rows for a collection query."""
    resolved_name = self._resolve_name(entity_name)
    return cast(pd.DataFrame, self._list_methods[resolved_name](**filters)["data"])
get_record
get_record(entity_name, **filters)

Return the raw backend response for a single-resource query.

Source code in src/owi/metadatabase/shm/services/core.py
def get_record(self, entity_name: ShmEntityName | str, **filters: Any) -> Mapping[str, Any]:
    """Return the raw backend response for a single-resource query."""
    resolved_name = self._resolve_name(entity_name)
    if resolved_name is ShmEntityName.SIGNAL and "signal_id" in filters:
        signal_id = str(filters.pop("signal_id"))
        return self.api.get_signal(signal_id, **filters)
    return self._get_methods[resolved_name](**filters)
create_record
create_record(entity_name, payload, files=None)

Create a resource through the configured SHM API client.

Source code in src/owi/metadatabase/shm/services/core.py
def create_record(
    self,
    entity_name: ShmEntityName | str,
    payload: Mapping[str, Any],
    files: Mapping[str, Any] | None = None,
) -> Mapping[str, Any]:
    """Create a resource through the configured SHM API client."""
    resolved_name = self._resolve_name(entity_name)
    if files is not None:
        if resolved_name is ShmEntityName.SENSOR_TYPE:
            return self.api.create_sensor_type(dict(payload), files=files)
        if resolved_name is ShmEntityName.SENSOR_CALIBRATION:
            return self.api.create_sensor_calibration(dict(payload), files=files)
    create_method = self._create_methods[resolved_name]
    return create_method(dict(payload))

SensorService

SensorService(entity_service=None)

Convenience service for sensor-domain SHM entities.

Source code in src/owi/metadatabase/shm/services/core.py
def __init__(self, entity_service: ShmEntityService | None = None) -> None:
    self.entity_service = entity_service or ShmEntityService()

ShmEntityService

ShmEntityService(repository=None, registry=None)

Facade for typed SHM retrieval and creation.

Source code in src/owi/metadatabase/shm/services/core.py
def __init__(
    self,
    repository: ShmRepositoryProtocol | None = None,
    registry: EntityRegistryProtocol | None = None,
) -> None:
    self.repository = repository or ApiShmRepository()
    self.registry = registry or default_registry
Functions
list_records
list_records(entity_name, filters=None)

Return typed resources for a collection query.

Source code in src/owi/metadatabase/shm/services/core.py
def list_records(
    self,
    entity_name: ShmEntityName | str,
    filters: ShmQuery | Mapping[str, Any] | None = None,
) -> list[ShmResourceRecord]:
    """Return typed resources for a collection query."""
    query = self._coerce_query(entity_name, filters)
    definition = self.registry.get(query.entity or self._resolve_name(entity_name))
    frame = self.repository.list_records(definition.name, **query.to_backend_filters())
    return [
        cast(
            ShmResourceRecord,
            definition.serializer.from_mapping(cast(dict[str, Any], row)),
        )
        for row in frame.to_dict(orient="records")
    ]
get_record
get_record(entity_name, filters=None)

Return a single typed resource when it exists.

Source code in src/owi/metadatabase/shm/services/core.py
def get_record(
    self,
    entity_name: ShmEntityName | str,
    filters: ShmQuery | Mapping[str, Any] | None = None,
) -> ShmResourceRecord | None:
    """Return a single typed resource when it exists."""
    query = self._coerce_query(entity_name, filters)
    definition = self.registry.get(query.entity or self._resolve_name(entity_name))
    result = self.repository.get_record(definition.name, **query.to_backend_filters())
    frame = result.get("data")
    if not result.get("exists") or not isinstance(frame, pd.DataFrame) or frame.empty:
        return None
    return cast(ShmResourceRecord, definition.serializer.from_mapping(frame.iloc[0].to_dict()))
create_record
create_record(entity_name, payload, files=None)

Create and deserialize one SHM resource.

Source code in src/owi/metadatabase/shm/services/core.py
def create_record(
    self,
    entity_name: ShmEntityName | str,
    payload: Mapping[str, Any] | ShmResourceRecord,
    files: Mapping[str, Any] | None = None,
) -> ShmResourceRecord | None:
    """Create and deserialize one SHM resource."""
    resolved_name = self._resolve_name(entity_name)
    definition = self.registry.get(resolved_name)
    serialized_payload = definition.serializer.to_payload(payload)
    result = self.repository.create_record(resolved_name, serialized_payload, files=files)
    frame = result.get("data")
    if not result.get("exists") or not isinstance(frame, pd.DataFrame) or frame.empty:
        return None
    return cast(ShmResourceRecord, definition.serializer.from_mapping(frame.iloc[0].to_dict()))

SignalService

SignalService(entity_service=None)

Convenience service for signal-domain SHM entities.

Source code in src/owi/metadatabase/shm/services/core.py
def __init__(self, entity_service: ShmEntityService | None = None) -> None:
    self.entity_service = entity_service or ShmEntityService()

processing

Signal processing subpackage.

Re-exports all public symbols so from owi.metadatabase.shm.processing import X continues to work unchanged.

Classes

ConfigDiscovery

Bases: ABC

Discover farm configuration files from a filesystem path.

Functions
discover abstractmethod
discover(path_configs, turbines=None)

Return a turbine-to-config-path mapping.

Parameters:

Name Type Description Default
path_configs str | Path

Filesystem path to a directory of configuration files or a single configuration file.

required
turbines Sequence[str] | None

Optional subset of turbine identifiers to retain from the discovered files.

None

Returns:

Type Description
dict[str, Path]

Mapping from turbine identifier to configuration file path.

Source code in src/owi/metadatabase/shm/processing/discovery.py
@abstractmethod
def discover(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
) -> dict[str, Path]:
    """Return a turbine-to-config-path mapping.

    Parameters
    ----------
    path_configs
        Filesystem path to a directory of configuration files or a single
        configuration file.
    turbines
        Optional subset of turbine identifiers to retain from the
        discovered files.

    Returns
    -------
    dict[str, Path]
        Mapping from turbine identifier to configuration file path.
    """

JsonStemConfigDiscovery dataclass

JsonStemConfigDiscovery(suffix='.json')

Bases: ConfigDiscovery

Discover JSON configuration files by stem name.

Parameters:

Name Type Description Default
suffix str

File suffix treated as a valid configuration file.

'.json'

Examples:

>>> JsonStemConfigDiscovery().suffix
'.json'
Functions
discover
discover(path_configs, turbines=None)

Return available JSON config files keyed by turbine name.

Parameters:

Name Type Description Default
path_configs str | Path

Directory containing configuration files or a single JSON config path.

required
turbines Sequence[str] | None

Optional subset of turbine stems to retain from the discovered files.

None

Returns:

Type Description
dict[str, Path]

Mapping from turbine stem to configuration path.

Raises:

Type Description
ValueError

If the path does not resolve to usable JSON files or the requested turbine subset is empty.

Source code in src/owi/metadatabase/shm/processing/discovery.py
def discover(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
) -> dict[str, Path]:
    """Return available JSON config files keyed by turbine name.

    Parameters
    ----------
    path_configs
        Directory containing configuration files or a single JSON config
        path.
    turbines
        Optional subset of turbine stems to retain from the discovered
        files.

    Returns
    -------
    dict[str, Path]
        Mapping from turbine stem to configuration path.

    Raises
    ------
    ValueError
        If the path does not resolve to usable JSON files or the requested
        turbine subset is empty.
    """
    root = Path(path_configs)
    if root.is_dir():
        available = {
            path.stem: path for path in sorted(root.iterdir()) if path.is_file() and path.suffix == self.suffix
        }
    elif root.is_file() and root.suffix == self.suffix:
        available = {root.stem: root}
    else:
        raise ValueError(f"Could not discover configuration files from {root}.")

    if turbines is None:
        return available

    selected = {turbine: available[turbine] for turbine in turbines if turbine in available}
    missing = [turbine for turbine in turbines if turbine not in available]
    if missing:
        warnings.warn(
            "Some turbines from the provided list are not found in the "
            f"configurations directory. Using available turbines: {list(selected)}",
            stacklevel=2,
        )
    if not selected:
        raise ValueError("No valid turbines found in the provided list.")
    return selected

DelimitedSignalKeyParser dataclass

DelimitedSignalKeyParser(signal_prefixes, separator='/')

Parse delimited signal-property keys.

Parameters:

Name Type Description Default
signal_prefixes tuple[str, ...]

Raw key prefixes that belong to direct signal properties.

required
separator str

Separator between the signal identifier and the property name.

'/'

Examples:

>>> parser = DelimitedSignalKeyParser(signal_prefixes=("WF", "X/", "Y/", "Z/"))
>>> parser.parse("WF_WTG_TP_STRAIN/status")
SignalEventKey(signal_name='WF_WTG_TP_STRAIN', property_name='status')
>>> parser.parse("acceleration/yaw_transformation") is None
True
Functions
matches
matches(raw_key)

Return True when the raw key belongs to a direct signal.

Parameters:

Name Type Description Default
raw_key str

Raw configuration key to test.

required

Returns:

Type Description
bool

Whether the key starts with one of the configured signal prefixes.

Source code in src/owi/metadatabase/shm/processing/parsing.py
def matches(self, raw_key: str) -> bool:
    """Return ``True`` when the raw key belongs to a direct signal.

    Parameters
    ----------
    raw_key
        Raw configuration key to test.

    Returns
    -------
    bool
        Whether the key starts with one of the configured signal prefixes.
    """
    return raw_key.startswith(self.signal_prefixes)
parse
parse(raw_key)

Parse a raw key into a signal/property pair.

Parameters:

Name Type Description Default
raw_key str

Raw configuration key containing a signal name and property name separated by :attr:separator.

required

Returns:

Type Description
SignalEventKey or None

Parsed key, or None when the key does not match or lacks a separator.

Source code in src/owi/metadatabase/shm/processing/parsing.py
def parse(self, raw_key: str) -> SignalEventKey | None:
    """Parse a raw key into a signal/property pair.

    Parameters
    ----------
    raw_key
        Raw configuration key containing a signal name and property name
        separated by :attr:`separator`.

    Returns
    -------
    SignalEventKey or None
        Parsed key, or *None* when the key does not match or lacks a
        separator.
    """
    if not self.matches(raw_key) or self.separator not in raw_key:
        return None

    signal_name, property_name = raw_key.split(self.separator, maxsplit=1)
    if not signal_name or not property_name:
        return None
    return SignalEventKey(signal_name=signal_name, property_name=property_name)

SignalEventKey dataclass

SignalEventKey(signal_name, property_name)

Parsed signal-property key.

Parameters:

Name Type Description Default
signal_name str

Canonical signal identifier.

required
property_name str

Property name carried by the raw configuration key.

required

ConfiguredSignalConfigProcessor

ConfiguredSignalConfigProcessor(
    path_configs, processor_spec, turbines=None
)

Bases: SignalConfigProcessor

Signal processor backed by an explicit farm spec.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm signal configuration events.

required
processor_spec SignalProcessorSpec

Explicit processor specification that defines parsing, derivation, and discovery behavior.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None

Examples:

>>> from owi.metadatabase.shm.processing import DelimitedSignalKeyParser, SignalProcessorSpec
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def __init__(
    self,
    path_configs: str | Path,
    processor_spec: SignalProcessorSpec,
    turbines: Sequence[str] | None = None,
) -> None:
    self._processor_spec = processor_spec
    super().__init__(path_configs=path_configs, turbines=turbines)
Functions
process_events
process_events(events)

Transform raw configuration events into typed signal records.

Parameters:

Name Type Description Default
events Sequence[Mapping[str, Any]]

Ordered raw configuration events loaded from one farm config.

required

Returns:

Type Description
SignalProcessingResult

Typed signal and derived-signal records that can be converted to the archive-compatible uploader payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import (
...     ConfiguredSignalConfigProcessor,
...     DelimitedSignalKeyParser,
...     SignalProcessorSpec,
... )
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def process_events(self, events: Sequence[Mapping[str, Any]]) -> SignalProcessingResult:
    """Transform raw configuration events into typed signal records.

    Parameters
    ----------
    events
        Ordered raw configuration events loaded from one farm config.

    Returns
    -------
    SignalProcessingResult
        Typed signal and derived-signal records that can be converted to
        the archive-compatible uploader payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import (
    ...     ConfiguredSignalConfigProcessor,
    ...     DelimitedSignalKeyParser,
    ...     SignalProcessorSpec,
    ... )
    >>> spec = SignalProcessorSpec(
    ...     farm_name="Demo",
    ...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
    ...     derived_signal_strategies={},
    ... )
    >>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
    >>> result = processor.process_events([{"WF_SIG/status": "ok"}])
    >>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
    'ok'
    """
    signals: dict[str, ProcessedSignalRecord] = {}
    derived_signals: dict[str, ProcessedDerivedSignalRecord] = {}
    current_time = self.processor_spec.default_initial_time

    for index, event in enumerate(events):
        current_time = self._resolve_event_time(
            event,
            index=index,
            current_time=current_time,
        )
        for raw_key, value in event.items():
            signal_key = self.processor_spec.signal_key_parser.parse(raw_key)
            if signal_key is not None:
                self._apply_signal_property(
                    signals=signals,
                    signal_key=signal_key,
                    value=value,
                    timestamp=current_time,
                )
                continue

            strategy = self.processor_spec.derived_signal_strategies.get(raw_key)
            if strategy is None:
                continue
            payload = _coerce_mapping(value, context=raw_key)
            self._apply_derived_updates(
                derived_signals=derived_signals,
                event_key=raw_key,
                updates=strategy.emit_updates(raw_key, payload),
                timestamp=current_time,
            )

    self._postprocess_signals(signals)
    return SignalProcessingResult(signals=signals, derived_signals=derived_signals)
signal_preprocess_data
signal_preprocess_data(path_config)

Process one configuration file into archive-compatible mappings.

Parameters:

Name Type Description Default
path_config str | Path

JSON configuration file to load and process.

required

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings ready for uploader seams.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signal_preprocess_data(
    self,
    path_config: str | Path,
) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Process one configuration file into archive-compatible mappings.

    Parameters
    ----------
    path_config
        JSON configuration file to load and process.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings ready for uploader seams.
    """
    events = self._load_events(path_config)
    return self.process_events(events).to_legacy_data()
signals_process_data
signals_process_data()

Process all discovered configuration files under path_configs.

The processed results are stored on :attr:signals_data and :attr:signals_derived_data, keyed by turbine stem.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signals_process_data(self) -> None:
    """Process all discovered configuration files under ``path_configs``.

    The processed results are stored on :attr:`signals_data` and
    :attr:`signals_derived_data`, keyed by turbine stem.
    """
    config_paths = self.processor_spec.config_discovery.discover(
        self.path_configs,
        turbines=self.turbines,
    )
    self.turbines = list(config_paths)
    for turbine, config_path in config_paths.items():
        signals_data, derived_data = self.signal_preprocess_data(config_path)
        self.signals_data[turbine] = signals_data
        self.signals_derived_data[turbine] = derived_data
build_processor_spec
build_processor_spec()

Return the explicit processor spec passed to the constructor.

Returns:

Type Description
SignalProcessorSpec

The specification supplied at construction time.

Source code in src/owi/metadatabase/shm/processing/processor.py
def build_processor_spec(self) -> SignalProcessorSpec:
    """Return the explicit processor spec passed to the constructor.

    Returns
    -------
    SignalProcessorSpec
        The specification supplied at construction time.
    """
    return self._processor_spec
from_yaml_spec classmethod
from_yaml_spec(
    *, path_configs, processor_spec_path, turbines=None
)

Construct a configured processor from a YAML-backed processor spec.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm configuration events.

required
processor_spec_path str | Path

Path to a YAML processor specification file.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None

Returns:

Type Description
ConfiguredSignalConfigProcessor

Processor loaded with the given YAML spec.

Source code in src/owi/metadatabase/shm/processing/processor.py
@classmethod
def from_yaml_spec(
    cls,
    *,
    path_configs: str | Path,
    processor_spec_path: str | Path,
    turbines: Sequence[str] | None = None,
) -> ConfiguredSignalConfigProcessor:
    """Construct a configured processor from a YAML-backed processor spec.

    Parameters
    ----------
    path_configs
        Directory or JSON file containing farm configuration events.
    processor_spec_path
        Path to a YAML processor specification file.
    turbines
        Optional subset of turbine stems to process during discovery.

    Returns
    -------
    ConfiguredSignalConfigProcessor
        Processor loaded with the given YAML spec.
    """
    return cls(
        path_configs=path_configs,
        processor_spec=load_signal_processor_spec(processor_spec_path),
        turbines=turbines,
    )

DefaultSignalConfigProcessor

DefaultSignalConfigProcessor(
    path_configs, turbines=None, processor_spec=None
)

Bases: ConfiguredSignalConfigProcessor

Built-in specialization of the generic signal processor.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing default wind-farm configuration events.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None
processor_spec SignalProcessorSpec | None

Optional override for the built-in default processor specification.

None
Source code in src/owi/metadatabase/shm/processing/processor.py
def __init__(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
    processor_spec: SignalProcessorSpec | None = None,
) -> None:
    super().__init__(
        path_configs=path_configs,
        turbines=turbines,
        processor_spec=processor_spec or load_default_signal_processor_spec(),
    )
Functions
build_processor_spec
build_processor_spec()

Return the explicit processor spec passed to the constructor.

Returns:

Type Description
SignalProcessorSpec

The specification supplied at construction time.

Source code in src/owi/metadatabase/shm/processing/processor.py
def build_processor_spec(self) -> SignalProcessorSpec:
    """Return the explicit processor spec passed to the constructor.

    Returns
    -------
    SignalProcessorSpec
        The specification supplied at construction time.
    """
    return self._processor_spec
process_events
process_events(events)

Transform raw configuration events into typed signal records.

Parameters:

Name Type Description Default
events Sequence[Mapping[str, Any]]

Ordered raw configuration events loaded from one farm config.

required

Returns:

Type Description
SignalProcessingResult

Typed signal and derived-signal records that can be converted to the archive-compatible uploader payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import (
...     ConfiguredSignalConfigProcessor,
...     DelimitedSignalKeyParser,
...     SignalProcessorSpec,
... )
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def process_events(self, events: Sequence[Mapping[str, Any]]) -> SignalProcessingResult:
    """Transform raw configuration events into typed signal records.

    Parameters
    ----------
    events
        Ordered raw configuration events loaded from one farm config.

    Returns
    -------
    SignalProcessingResult
        Typed signal and derived-signal records that can be converted to
        the archive-compatible uploader payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import (
    ...     ConfiguredSignalConfigProcessor,
    ...     DelimitedSignalKeyParser,
    ...     SignalProcessorSpec,
    ... )
    >>> spec = SignalProcessorSpec(
    ...     farm_name="Demo",
    ...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
    ...     derived_signal_strategies={},
    ... )
    >>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
    >>> result = processor.process_events([{"WF_SIG/status": "ok"}])
    >>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
    'ok'
    """
    signals: dict[str, ProcessedSignalRecord] = {}
    derived_signals: dict[str, ProcessedDerivedSignalRecord] = {}
    current_time = self.processor_spec.default_initial_time

    for index, event in enumerate(events):
        current_time = self._resolve_event_time(
            event,
            index=index,
            current_time=current_time,
        )
        for raw_key, value in event.items():
            signal_key = self.processor_spec.signal_key_parser.parse(raw_key)
            if signal_key is not None:
                self._apply_signal_property(
                    signals=signals,
                    signal_key=signal_key,
                    value=value,
                    timestamp=current_time,
                )
                continue

            strategy = self.processor_spec.derived_signal_strategies.get(raw_key)
            if strategy is None:
                continue
            payload = _coerce_mapping(value, context=raw_key)
            self._apply_derived_updates(
                derived_signals=derived_signals,
                event_key=raw_key,
                updates=strategy.emit_updates(raw_key, payload),
                timestamp=current_time,
            )

    self._postprocess_signals(signals)
    return SignalProcessingResult(signals=signals, derived_signals=derived_signals)
signal_preprocess_data
signal_preprocess_data(path_config)

Process one configuration file into archive-compatible mappings.

Parameters:

Name Type Description Default
path_config str | Path

JSON configuration file to load and process.

required

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings ready for uploader seams.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signal_preprocess_data(
    self,
    path_config: str | Path,
) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Process one configuration file into archive-compatible mappings.

    Parameters
    ----------
    path_config
        JSON configuration file to load and process.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings ready for uploader seams.
    """
    events = self._load_events(path_config)
    return self.process_events(events).to_legacy_data()
signals_process_data
signals_process_data()

Process all discovered configuration files under path_configs.

The processed results are stored on :attr:signals_data and :attr:signals_derived_data, keyed by turbine stem.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signals_process_data(self) -> None:
    """Process all discovered configuration files under ``path_configs``.

    The processed results are stored on :attr:`signals_data` and
    :attr:`signals_derived_data`, keyed by turbine stem.
    """
    config_paths = self.processor_spec.config_discovery.discover(
        self.path_configs,
        turbines=self.turbines,
    )
    self.turbines = list(config_paths)
    for turbine, config_path in config_paths.items():
        signals_data, derived_data = self.signal_preprocess_data(config_path)
        self.signals_data[turbine] = signals_data
        self.signals_derived_data[turbine] = derived_data
from_yaml_spec classmethod
from_yaml_spec(
    *, path_configs, processor_spec_path, turbines=None
)

Construct a configured processor from a YAML-backed processor spec.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm configuration events.

required
processor_spec_path str | Path

Path to a YAML processor specification file.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None

Returns:

Type Description
ConfiguredSignalConfigProcessor

Processor loaded with the given YAML spec.

Source code in src/owi/metadatabase/shm/processing/processor.py
@classmethod
def from_yaml_spec(
    cls,
    *,
    path_configs: str | Path,
    processor_spec_path: str | Path,
    turbines: Sequence[str] | None = None,
) -> ConfiguredSignalConfigProcessor:
    """Construct a configured processor from a YAML-backed processor spec.

    Parameters
    ----------
    path_configs
        Directory or JSON file containing farm configuration events.
    processor_spec_path
        Path to a YAML processor specification file.
    turbines
        Optional subset of turbine stems to process during discovery.

    Returns
    -------
    ConfiguredSignalConfigProcessor
        Processor loaded with the given YAML spec.
    """
    return cls(
        path_configs=path_configs,
        processor_spec=load_signal_processor_spec(processor_spec_path),
        turbines=turbines,
    )

SignalConfigProcessor

SignalConfigProcessor(path_configs, turbines=None)

Bases: ABC

ABC-backed base class for wind-farm signal config processors.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm signal configuration events.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None
Notes

Subclasses provide the farm-specific :class:SignalProcessorSpec used by the generic processing pipeline.

Source code in src/owi/metadatabase/shm/processing/processor.py
def __init__(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
) -> None:
    self.path_configs = Path(path_configs)
    self.turbines = list(turbines) if turbines is not None else None
    self.signals_data: dict[str, LegacySignalMap] = {}
    self.signals_derived_data: dict[str, LegacySignalMap] = {}
    self.processor_spec = self.build_processor_spec()
Functions
build_processor_spec abstractmethod
build_processor_spec()

Return the farm-specific processor specification.

Returns:

Type Description
SignalProcessorSpec

Specification controlling signal key parsing, derived-signal strategies, and postprocessors.

Source code in src/owi/metadatabase/shm/processing/processor.py
@abstractmethod
def build_processor_spec(self) -> SignalProcessorSpec:
    """Return the farm-specific processor specification.

    Returns
    -------
    SignalProcessorSpec
        Specification controlling signal key parsing, derived-signal
        strategies, and postprocessors.
    """
process_events
process_events(events)

Transform raw configuration events into typed signal records.

Parameters:

Name Type Description Default
events Sequence[Mapping[str, Any]]

Ordered raw configuration events loaded from one farm config.

required

Returns:

Type Description
SignalProcessingResult

Typed signal and derived-signal records that can be converted to the archive-compatible uploader payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import (
...     ConfiguredSignalConfigProcessor,
...     DelimitedSignalKeyParser,
...     SignalProcessorSpec,
... )
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def process_events(self, events: Sequence[Mapping[str, Any]]) -> SignalProcessingResult:
    """Transform raw configuration events into typed signal records.

    Parameters
    ----------
    events
        Ordered raw configuration events loaded from one farm config.

    Returns
    -------
    SignalProcessingResult
        Typed signal and derived-signal records that can be converted to
        the archive-compatible uploader payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import (
    ...     ConfiguredSignalConfigProcessor,
    ...     DelimitedSignalKeyParser,
    ...     SignalProcessorSpec,
    ... )
    >>> spec = SignalProcessorSpec(
    ...     farm_name="Demo",
    ...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
    ...     derived_signal_strategies={},
    ... )
    >>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
    >>> result = processor.process_events([{"WF_SIG/status": "ok"}])
    >>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
    'ok'
    """
    signals: dict[str, ProcessedSignalRecord] = {}
    derived_signals: dict[str, ProcessedDerivedSignalRecord] = {}
    current_time = self.processor_spec.default_initial_time

    for index, event in enumerate(events):
        current_time = self._resolve_event_time(
            event,
            index=index,
            current_time=current_time,
        )
        for raw_key, value in event.items():
            signal_key = self.processor_spec.signal_key_parser.parse(raw_key)
            if signal_key is not None:
                self._apply_signal_property(
                    signals=signals,
                    signal_key=signal_key,
                    value=value,
                    timestamp=current_time,
                )
                continue

            strategy = self.processor_spec.derived_signal_strategies.get(raw_key)
            if strategy is None:
                continue
            payload = _coerce_mapping(value, context=raw_key)
            self._apply_derived_updates(
                derived_signals=derived_signals,
                event_key=raw_key,
                updates=strategy.emit_updates(raw_key, payload),
                timestamp=current_time,
            )

    self._postprocess_signals(signals)
    return SignalProcessingResult(signals=signals, derived_signals=derived_signals)
signal_preprocess_data
signal_preprocess_data(path_config)

Process one configuration file into archive-compatible mappings.

Parameters:

Name Type Description Default
path_config str | Path

JSON configuration file to load and process.

required

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings ready for uploader seams.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signal_preprocess_data(
    self,
    path_config: str | Path,
) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Process one configuration file into archive-compatible mappings.

    Parameters
    ----------
    path_config
        JSON configuration file to load and process.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings ready for uploader seams.
    """
    events = self._load_events(path_config)
    return self.process_events(events).to_legacy_data()
signals_process_data
signals_process_data()

Process all discovered configuration files under path_configs.

The processed results are stored on :attr:signals_data and :attr:signals_derived_data, keyed by turbine stem.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signals_process_data(self) -> None:
    """Process all discovered configuration files under ``path_configs``.

    The processed results are stored on :attr:`signals_data` and
    :attr:`signals_derived_data`, keyed by turbine stem.
    """
    config_paths = self.processor_spec.config_discovery.discover(
        self.path_configs,
        turbines=self.turbines,
    )
    self.turbines = list(config_paths)
    for turbine, config_path in config_paths.items():
        signals_data, derived_data = self.signal_preprocess_data(config_path)
        self.signals_data[turbine] = signals_data
        self.signals_derived_data[turbine] = derived_data

ProcessedDerivedSignalRecord dataclass

ProcessedDerivedSignalRecord(
    data_fields=dict(),
    calibration_rows=list(),
    parent_signals=(),
)

Typed in-memory representation of one processed derived signal.

Examples:

>>> record = ProcessedDerivedSignalRecord()
>>> record.ensure_source_name("strain/bending_moment", {"suffix": "N"})
>>> record.set_parent_signals(["SIG_A", "SIG_B"])
>>> record.add_calibration("01/01/1972 00:00", {"yaw_offset": 2.0})
>>> sorted(record.to_legacy_dict())
['calibration', 'data', 'parent_signals']
Functions
ensure_source_name
ensure_source_name(source_name, extra_fields=None)

Initialize immutable source metadata for the derived signal.

Parameters:

Name Type Description Default
source_name str

Event key that produced the derived signal.

required
extra_fields Mapping[str, Any] | None

Optional metadata merged into the legacy data mapping the first time the source name is set.

None
Source code in src/owi/metadatabase/shm/processing/records.py
def ensure_source_name(
    self,
    source_name: str,
    extra_fields: Mapping[str, Any] | None = None,
) -> None:
    """Initialize immutable source metadata for the derived signal.

    Parameters
    ----------
    source_name
        Event key that produced the derived signal.
    extra_fields
        Optional metadata merged into the legacy ``data`` mapping the first
        time the source name is set.
    """
    if not self.data_fields:
        self.data_fields = {"name": source_name}
        if extra_fields:
            self.data_fields.update(extra_fields)
set_parent_signals
set_parent_signals(parent_signals)

Set parent signals when they are first known.

Parameters:

Name Type Description Default
parent_signals Sequence[str]

Ordered parent signal identifiers for the derived signal.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def set_parent_signals(self, parent_signals: Sequence[str]) -> None:
    """Set parent signals when they are first known.

    Parameters
    ----------
    parent_signals
        Ordered parent signal identifiers for the derived signal.
    """
    if not self.parent_signals:
        self.parent_signals = tuple(parent_signals)
add_calibration
add_calibration(timestamp, calibration_fields)

Append a derived-signal calibration row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the calibration.

required
calibration_fields Mapping[str, Any]

Calibration fields emitted by the derived-signal strategy.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_calibration(self, timestamp: str, calibration_fields: Mapping[str, Any]) -> None:
    """Append a derived-signal calibration row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the calibration.
    calibration_fields
        Calibration fields emitted by the derived-signal strategy.
    """
    row = {"time": timestamp}
    row.update(calibration_fields)
    self.calibration_rows.append(row)
to_legacy_dict
to_legacy_dict()

Return the uploader-facing legacy mapping.

Returns:

Type Description
LegacyRecord

Archive-compatible mapping consumed by uploader payload builders.

Source code in src/owi/metadatabase/shm/processing/records.py
def to_legacy_dict(self) -> LegacyRecord:
    """Return the uploader-facing legacy mapping.

    Returns
    -------
    LegacyRecord
        Archive-compatible mapping consumed by uploader payload builders.
    """
    data: LegacyRecord = {}
    if self.data_fields:
        data["data"] = dict(self.data_fields)
    if self.calibration_rows:
        data["calibration"] = [dict(row) for row in self.calibration_rows]
    if self.parent_signals:
        data["parent_signals"] = list(self.parent_signals)
    return data

ProcessedSignalRecord dataclass

ProcessedSignalRecord(
    scalar_fields=dict(),
    status_rows=list(),
    offset_rows=list(),
    cwl_rows=list(),
)

Typed in-memory representation of one processed signal.

Parameters:

Name Type Description Default
scalar_fields dict[str, Any]

Arbitrary scalar properties stored on the signal.

dict()
status_rows list[dict[str, Any]]

Collected status event rows.

list()
offset_rows list[dict[str, Any]]

Collected offset event rows.

list()
cwl_rows list[dict[str, Any]]

Collected CWL event rows.

list()

Examples:

>>> record = ProcessedSignalRecord()
>>> record.add_status("01/01/1972 00:00", "ok")
>>> record.to_legacy_dict()["status"][0]["status"]
'ok'
Functions
set_scalar
set_scalar(property_name, value)

Store a scalar property on the signal.

Parameters:

Name Type Description Default
property_name str

Scalar field name from the raw configuration event.

required
value Any

Value to persist on the signal record.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def set_scalar(self, property_name: str, value: Any) -> None:
    """Store a scalar property on the signal.

    Parameters
    ----------
    property_name
        Scalar field name from the raw configuration event.
    value
        Value to persist on the signal record.
    """
    self.scalar_fields[property_name] = value
add_status
add_status(timestamp, status)

Append a status row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the status.

required
status Any

Status value to store.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_status(self, timestamp: str, status: Any) -> None:
    """Append a status row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the status.
    status
        Status value to store.
    """
    self.status_rows.append({"time": timestamp, "status": status})
add_status_alias
add_status_alias(timestamp, alias_name)

Append a status row that carries a legacy alias name.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the alias.

required
alias_name str

Legacy signal name that points at this record.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_status_alias(self, timestamp: str, alias_name: str) -> None:
    """Append a status row that carries a legacy alias name.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the alias.
    alias_name
        Legacy signal name that points at this record.
    """
    self.status_rows.append({"time": timestamp, "name": alias_name})
add_offset
add_offset(timestamp, offset)

Append an offset row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the offset.

required
offset Any

Offset value to store.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_offset(self, timestamp: str, offset: Any) -> None:
    """Append an offset row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the offset.
    offset
        Offset value to store.
    """
    self.offset_rows.append({"time": timestamp, "offset": offset})
add_cwl
add_cwl(timestamp, cwl)

Append a CWL row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the CWL value.

required
cwl Any

CWL value to store.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_cwl(self, timestamp: str, cwl: Any) -> None:
    """Append a CWL row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the CWL value.
    cwl
        CWL value to store.
    """
    self.cwl_rows.append({"time": timestamp, "cwl": cwl})
to_legacy_dict
to_legacy_dict()

Return the uploader-facing legacy mapping.

Returns:

Type Description
LegacyRecord

Archive-compatible mapping consumed by uploader payload builders.

Source code in src/owi/metadatabase/shm/processing/records.py
def to_legacy_dict(self) -> LegacyRecord:
    """Return the uploader-facing legacy mapping.

    Returns
    -------
    LegacyRecord
        Archive-compatible mapping consumed by uploader payload builders.
    """
    data = dict(self.scalar_fields)
    if self.status_rows:
        data["status"] = [dict(row) for row in self.status_rows]
    if self.offset_rows:
        data["offset"] = [dict(row) for row in self.offset_rows]
    if self.cwl_rows:
        data["cwl"] = [dict(row) for row in self.cwl_rows]
    return data

SignalProcessingResult dataclass

SignalProcessingResult(signals, derived_signals)

Processed signal and derived-signal records.

Examples:

>>> signal = ProcessedSignalRecord()
>>> signal.add_status("01/01/1972 00:00", "ok")
>>> result = SignalProcessingResult(signals={"SIG": signal}, derived_signals={})
>>> result.to_legacy_data()[0]["SIG"]["status"][0]["status"]
'ok'
Functions
to_legacy_data
to_legacy_data()

Return archive-compatible dicts for uploader seams.

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings in the uploader-facing archive shape.

Source code in src/owi/metadatabase/shm/processing/records.py
def to_legacy_data(self) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Return archive-compatible dicts for uploader seams.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings in the uploader-facing
        archive shape.
    """
    return (
        {name: record.to_legacy_dict() for name, record in self.signals.items()},
        {name: record.to_legacy_dict() for name, record in self.derived_signals.items()},
    )

SignalProcessorSpec dataclass

SignalProcessorSpec(
    farm_name,
    signal_key_parser,
    derived_signal_strategies,
    config_discovery=JsonStemConfigDiscovery(),
    postprocessors=(),
    time_field="time",
    default_initial_time="01/01/1972 00:00",
)

Farm-specific configuration for signal processing.

The YAML structure expected by this spec is designed to be flexible enough to cover a wide range of use cases while remaining human-friendly and avoiding excessive nesting. The top-level keys are:

  • farm_name: A human-readable farm identifier used by the caller.
  • signal_key_parser: A configuration for the signal key parser, which recognizes direct signal-property keys in the input data.
  • derived_signal_strategies: A mapping from raw event keys to derived-signal strategies, which define how to generate derived signals based on specific events in the input data.
  • config_discovery: A strategy used to discover configuration files on disk, allowing the processor to locate and load necessary configurations for processing signals.
  • postprocessors: A list of pure normalization hooks applied after all events are processed, enabling additional transformations or clean-up steps on the processed signals.
  • time_field: The event field used to update the active timestamp during processing.
  • default_initial_time: The timestamp assigned to the first event when the payload omits one, ensuring that all events have a valid timestamp for processing.

Parameters:

Name Type Description Default
farm_name str

Human-readable farm identifier used by the caller.

required
signal_key_parser DelimitedSignalKeyParser

Parser that recognizes direct signal-property keys.

required
derived_signal_strategies Mapping[str, DerivedSignalStrategy]

Mapping from raw event keys to derived-signal strategies.

required
config_discovery ConfigDiscovery

Strategy used to discover configuration files on disk.

JsonStemConfigDiscovery()
postprocessors tuple[SignalPostprocessor, ...]

Pure normalization hooks applied after all events are processed.

()
time_field str

Event field used to update the active timestamp.

'time'
default_initial_time str

Timestamp assigned to the first event when the payload omits one.

'01/01/1972 00:00'

Examples:

>>> spec = SignalProcessorSpec(
...     farm_name="Demo Farm",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> spec.default_initial_time
'01/01/1972 00:00'

YAML example ~~~~~~~~~~~~

.. code-block:: yaml farm_name: Demo Farm signal_key_parser: kind: delimited signal_prefixes: ["WF_"] derived_signal_strategies: {} config_discovery: kind: json_stem postprocessors: [] time_field: time default_initial_time: '01/01/1972 00:00'

DerivedSignalStrategy

Bases: ABC

Strategy for translating one event into derived-signal updates.

Implementations keep farm-specific derived-signal semantics outside the generic processor loop.

Functions
emit_updates abstractmethod
emit_updates(event_key, payload)

Build derived-signal updates for one event payload.

Parameters:

Name Type Description Default
event_key str

Raw event key that selected the strategy.

required
payload Mapping[str, Any]

Mapping stored under the raw event key.

required

Returns:

Type Description
list[DerivedSignalUpdate]

Derived-signal mutations emitted for the event.

Source code in src/owi/metadatabase/shm/processing/strategies.py
@abstractmethod
def emit_updates(
    self,
    event_key: str,
    payload: Mapping[str, Any],
) -> list[DerivedSignalUpdate]:
    """Build derived-signal updates for one event payload.

    Parameters
    ----------
    event_key
        Raw event key that selected the strategy.
    payload
        Mapping stored under the raw event key.

    Returns
    -------
    list[DerivedSignalUpdate]
        Derived-signal mutations emitted for the event.
    """

LevelBasedDerivedSignalStrategy dataclass

LevelBasedDerivedSignalStrategy(
    suffixes,
    signal_name_builder=_default_level_signal_name,
    parent_signals_builder=_parent_signals_from_level,
    calibration_fields_builder=_yaw_calibration_fields,
    data_builder=None,
    levels_key="levels",
)

Bases: DerivedSignalStrategy

Expand a level-based event into derived signals.

Parameters:

Name Type Description Default
suffixes tuple[str, ...]

Suffixes appended to each level identifier.

required
signal_name_builder SignalNameBuilder

Callback used to derive the final signal name for a level/suffix pair.

_default_level_signal_name
parent_signals_builder ParentSignalsBuilder

Callback that returns parent signal identifiers for a level.

_parent_signals_from_level
calibration_fields_builder CalibrationFieldsBuilder

Callback that returns calibration data for a level.

_yaw_calibration_fields
data_builder DerivedDataBuilder | None

Optional callback for extra metadata stored under data.

None

Examples:

>>> strategy = LevelBasedDerivedSignalStrategy(
...     suffixes=("FA",),
...     parent_signals_builder=lambda payload, level: tuple(payload[level]),
...     calibration_fields_builder=lambda payload, level: {"yaw_offset": payload["yaw_offset"]},
... )
>>> updates = strategy.emit_updates(
...     "acceleration/yaw_transformation",
...     {"levels": ["SIG_A"], "yaw_offset": 2.0, "SIG_A": ["PARENT_1", "PARENT_2"]},
... )
>>> updates[0].signal_name
'SIG_A_FA'
>>> updates[0].parent_signals
('PARENT_1', 'PARENT_2')
Functions
emit_updates
emit_updates(event_key, payload)

Build derived-signal updates for a level-based payload.

Parameters:

Name Type Description Default
event_key str

Raw event key that triggered the strategy. The value is accepted for interface parity and is not used directly by the default implementation.

required
payload Mapping[str, Any]

Mapping that must contain the configured levels_key plus the fields required by the configured callbacks.

required

Returns:

Type Description
list[DerivedSignalUpdate]

One update per level and configured suffix.

Source code in src/owi/metadatabase/shm/processing/strategies.py
def emit_updates(
    self,
    event_key: str,
    payload: Mapping[str, Any],
) -> list[DerivedSignalUpdate]:
    """Build derived-signal updates for a level-based payload.

    Parameters
    ----------
    event_key
        Raw event key that triggered the strategy. The value is accepted
        for interface parity and is not used directly by the default
        implementation.
    payload
        Mapping that must contain the configured ``levels_key`` plus the
        fields required by the configured callbacks.

    Returns
    -------
    list[DerivedSignalUpdate]
        One update per level and configured suffix.
    """
    del event_key
    levels = _coerce_string_sequence(payload.get(self.levels_key), context=self.levels_key)
    updates: list[DerivedSignalUpdate] = []
    for level in levels:
        for suffix in self.suffixes:
            updates.append(
                DerivedSignalUpdate(
                    signal_name=self.signal_name_builder(level, suffix),
                    parent_signals=tuple(self.parent_signals_builder(payload, level)),
                    calibration_fields=dict(self.calibration_fields_builder(payload, level)),
                    data_fields=(dict(self.data_builder(payload, level)) if self.data_builder is not None else {}),
                )
            )
    return updates

Functions

default_signal_processor_spec

default_signal_processor_spec()

Return the built-in default processor spec for wind-farm signal configs.

Returns:

Type Description
SignalProcessorSpec

Pre-loaded default specification.

Examples:

>>> spec = default_signal_processor_spec()
>>> tuple(spec.derived_signal_strategies)
('acceleration/yaw_transformation', 'strain/bending_moment')
Source code in src/owi/metadatabase/shm/processing/spec.py
def default_signal_processor_spec() -> SignalProcessorSpec:
    """Return the built-in default processor spec for wind-farm signal configs.

    Returns
    -------
    SignalProcessorSpec
        Pre-loaded default specification.

    Examples
    --------
    >>> spec = default_signal_processor_spec()
    >>> tuple(spec.derived_signal_strategies)
    ('acceleration/yaw_transformation', 'strain/bending_moment')
    """
    return load_default_signal_processor_spec()

get_default_signal_processor_spec_path

get_default_signal_processor_spec_path()

Return the packaged YAML path for the built-in default processor spec.

Returns:

Type Description
Path

Absolute path to default_signal_processor.yaml shipped with the package.

Source code in src/owi/metadatabase/shm/processing/spec.py
def get_default_signal_processor_spec_path() -> Path:
    """Return the packaged YAML path for the built-in default processor spec.

    Returns
    -------
    Path
        Absolute path to ``default_signal_processor.yaml`` shipped with the
        package.
    """
    return Path(__file__).parent.parent / "config" / "default_signal_processor.yaml"

load_default_signal_processor_spec

load_default_signal_processor_spec()

Load the built-in default processor spec from its YAML document.

Returns:

Type Description
SignalProcessorSpec

Processor specification loaded from the packaged YAML file.

Source code in src/owi/metadatabase/shm/processing/spec.py
def load_default_signal_processor_spec() -> SignalProcessorSpec:
    """Load the built-in default processor spec from its YAML document.

    Returns
    -------
    SignalProcessorSpec
        Processor specification loaded from the packaged YAML file.
    """
    return load_signal_processor_spec(get_default_signal_processor_spec_path())

load_signal_processor_spec

load_signal_processor_spec(path)

Load a signal processor spec from a YAML document.

The YAML document must conform to the structure expected by SignalProcessorSpec.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML document describing the processor spec.

required

Returns:

Type Description
SignalProcessorSpec

Parsed processor specification.

Source code in src/owi/metadatabase/shm/processing/spec.py
def load_signal_processor_spec(path: str | Path) -> SignalProcessorSpec:
    """Load a signal processor spec from a YAML document.

    The YAML document must conform to the structure expected by
    ``SignalProcessorSpec``.

    Parameters
    ----------
    path
        Path to the YAML document describing the processor spec.

    Returns
    -------
    SignalProcessorSpec
        Parsed processor specification.
    """
    raw_data = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
    config = _coerce_mapping(raw_data, context=str(path))

    raw_signal_key_parser = _coerce_mapping(
        config.get("signal_key_parser"),
        context="signal_key_parser",
    )
    raw_derived_signal_strategies = _coerce_mapping(
        config.get("derived_signal_strategies", {}),
        context="derived_signal_strategies",
    )
    raw_config_discovery = config.get("config_discovery", {"kind": "json_stem"})
    raw_postprocessors = config.get("postprocessors", ())

    return SignalProcessorSpec(
        farm_name=_coerce_string(config.get("farm_name"), context="farm_name"),
        signal_key_parser=_build_signal_key_parser_from_config(raw_signal_key_parser),
        derived_signal_strategies={
            event_key: _build_derived_signal_strategy_from_config(
                event_key,
                _coerce_mapping(
                    raw_strategy,
                    context=f"derived_signal_strategies.{event_key}",
                ),
            )
            for event_key, raw_strategy in raw_derived_signal_strategies.items()
        },
        config_discovery=_build_config_discovery_from_config(
            _coerce_mapping(raw_config_discovery, context="config_discovery")
        ),
        postprocessors=tuple(
            _resolve_registry_value(
                registry=_SIGNAL_POSTPROCESSORS,
                raw_name=postprocessor_name,
                context="postprocessors",
            )
            for postprocessor_name in _coerce_string_sequence(
                raw_postprocessors,
                context="postprocessors",
            )
        ),
        time_field=_coerce_string(config.get("time_field", "time"), context="time_field"),
        default_initial_time=_coerce_string(
            config.get("default_initial_time", "01/01/1972 00:00"),
            context="default_initial_time",
        ),
    )

upload

Upload subpackage for SHM signal and sensor upload orchestration.

Re-exports all public symbols so from owi.metadatabase.shm.upload import X works unchanged.

Classes

ParentSignalLookupError

ParentSignalLookupError(message)

Bases: ShmUploadError

Raised when a derived signal refers to unresolved parent signals.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

ShmUploadError

ShmUploadError(message)

Bases: APIException

Base exception for SHM upload orchestration failures.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

UploadResultError

UploadResultError(message)

Bases: ShmUploadError

Raised when a backend mutation result does not include the expected id.

Source code in .venv/lib/python3.14/site-packages/owi/metadatabase/_utils/exceptions.py
def __init__(self, message: str) -> None:
    self.message = message
    super().__init__(self.message)

AssetSignalUploadRequest dataclass

AssetSignalUploadRequest(
    projectsite,
    assetlocation,
    signals,
    derived_signals=None,
    permission_group_ids=None,
    sensor_serial_numbers_by_signal=None,
    temperature_compensation_signal_ids=None,
)

Input data for uploading one asset's SHM signals.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title.

required
assetlocation str

Parent SDK asset location title.

required
signals SignalConfigMap

Archive-compatible main signal data keyed by signal identifier.

required
derived_signals SignalConfigMap | None

Archive-compatible derived signal data keyed by derived signal identifier.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_signal Mapping[str, int] | None

Optional map from signal identifier to the backend SHM sensor identifier stored on signal history rows.

None
temperature_compensation_signal_ids Mapping[str, int] | None

Optional map from legacy temperature-compensation sensor token to backend SHM signal id.

None

Examples:

>>> request = AssetSignalUploadRequest(
...     projectsite="Project A",
...     assetlocation="Asset-01",
...     signals={},
... )
>>> request.result_key
'Project A/Asset-01'
Attributes
result_key property
result_key

Return a stable asset-scoped result key.

Functions
from_processing_result classmethod
from_processing_result(
    *,
    projectsite,
    assetlocation,
    processing_result,
    permission_group_ids=None,
    sensor_serial_numbers_by_signal=None,
    temperature_compensation_signal_ids=None,
)

Build an upload request from a processed signal-config result.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title.

required
assetlocation str

Parent SDK asset location title.

required
processing_result SignalProcessingResult

Processed signal and derived-signal records emitted by a processor.

required
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_signal Mapping[str, int] | None

Optional map from signal identifier to backend sensor serial number used for signal history rows.

None
temperature_compensation_signal_ids Mapping[str, int] | None

Optional map from legacy temperature-compensation sensor token to backend SHM signal id.

None

Returns:

Type Description
AssetSignalUploadRequest

Asset-scoped upload request that preserves the archive-compatible payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import ProcessedSignalRecord, SignalProcessingResult
>>> signal = ProcessedSignalRecord()
>>> signal.add_status("01/01/1972 00:00", "ok")
>>> request = AssetSignalUploadRequest.from_processing_result(
...     projectsite="Project A",
...     assetlocation="Asset-01",
...     processing_result=SignalProcessingResult(signals={"SIG": signal}, derived_signals={}),
... )
>>> request.signals["SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/upload/models.py
@classmethod
def from_processing_result(
    cls,
    *,
    projectsite: str,
    assetlocation: str,
    processing_result: SignalProcessingResult,
    permission_group_ids: Sequence[int] | None = None,
    sensor_serial_numbers_by_signal: Mapping[str, int] | None = None,
    temperature_compensation_signal_ids: Mapping[str, int] | None = None,
) -> AssetSignalUploadRequest:
    """Build an upload request from a processed signal-config result.

    Parameters
    ----------
    projectsite
        Parent SDK project site title.
    assetlocation
        Parent SDK asset location title.
    processing_result
        Processed signal and derived-signal records emitted by a processor.
    permission_group_ids
        Visibility groups applied to created SHM objects.
    sensor_serial_numbers_by_signal
        Optional map from signal identifier to backend sensor serial
        number used for signal history rows.
    temperature_compensation_signal_ids
        Optional map from legacy temperature-compensation sensor token to
        backend SHM signal id.

    Returns
    -------
    AssetSignalUploadRequest
        Asset-scoped upload request that preserves the archive-compatible
        payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import ProcessedSignalRecord, SignalProcessingResult
    >>> signal = ProcessedSignalRecord()
    >>> signal.add_status("01/01/1972 00:00", "ok")
    >>> request = AssetSignalUploadRequest.from_processing_result(
    ...     projectsite="Project A",
    ...     assetlocation="Asset-01",
    ...     processing_result=SignalProcessingResult(signals={"SIG": signal}, derived_signals={}),
    ... )
    >>> request.signals["SIG"]["status"][0]["status"]
    'ok'
    """
    signals, derived_signals = processing_result.to_legacy_data()
    return cls(
        projectsite=projectsite,
        assetlocation=assetlocation,
        signals=signals,
        derived_signals=derived_signals or None,
        permission_group_ids=permission_group_ids,
        sensor_serial_numbers_by_signal=sensor_serial_numbers_by_signal,
        temperature_compensation_signal_ids=temperature_compensation_signal_ids,
    )

AssetSignalUploadResult dataclass

AssetSignalUploadResult(
    asset_key,
    signal_ids_by_name,
    derived_signal_ids_by_name,
    results_main,
    results_secondary,
    results_derived_main,
    results_derived_secondary,
)

Upload result for one asset.

Parameters:

Name Type Description Default
asset_key str

Stable asset-scoped result key in projectsite/assetlocation form.

required
signal_ids_by_name Mapping[str, int]

Backend ids for created main signals keyed by signal identifier.

required
derived_signal_ids_by_name Mapping[str, int]

Backend ids for created derived signals keyed by signal identifier.

required
results_main Sequence[dict[str, Any]]

Raw backend responses for main signal creation calls.

required
results_secondary Sequence[dict[str, Any]]

Raw backend responses for signal history and calibration calls.

required
results_derived_main Sequence[dict[str, Any]]

Raw backend responses for derived signal creation calls.

required
results_derived_secondary Sequence[dict[str, Any]]

Raw backend responses for derived history, parent patch, and calibration calls.

required
This
required
migrate
required

DerivedSignalCalibrationPayload dataclass

DerivedSignalCalibrationPayload(
    derived_signal_id,
    calibration_date,
    data,
    status_approval="yes",
)

Payload model for derived signal calibration records.

DerivedSignalHistoryPayload dataclass

DerivedSignalHistoryPayload(
    derived_signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    status_approval="yes",
)

Payload model for derived signal history records.

DerivedSignalPayload dataclass

DerivedSignalPayload(
    site,
    model_definition,
    asset_location,
    sub_assembly,
    signal_type,
    derived_signal_id,
    visibility_groups,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for derived signal records.

SensorCalibrationPayload dataclass

SensorCalibrationPayload(
    sensor_serial_number, calibration_date, file
)

Payload model for sensor calibration records.

SensorPayload dataclass

SensorPayload(
    sensor_type_id,
    serial_number,
    cabinet,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor records.

SensorTypePayload dataclass

SensorTypePayload(
    name,
    type,
    type_extended,
    hardware_supplier,
    file=None,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor type records.

SignalCalibrationPayload dataclass

SignalCalibrationPayload(
    signal_id,
    calibration_date,
    data,
    tempcomp_signal_id=None,
    status_approval="yes",
)

Payload model for signal calibration records.

SignalHistoryPayload dataclass

SignalHistoryPayload(
    signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    sensor_serial_number=None,
    status_approval="yes",
    legacy_signal_id=None,
)

Payload model for signal history records.

SignalPayload dataclass

SignalPayload(
    site,
    model_definition,
    asset_location,
    signal_type,
    signal_id,
    visibility_groups,
    sub_assembly=None,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for signal records.

ShmSignalUploadClient

Bases: Protocol

Protocol describing the SHM transport methods used by the uploader.

Functions
get_sensor_type
get_sensor_type(**kwargs)

Resolve one SHM sensor type record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def get_sensor_type(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor type record."""
    ...
get_sensor
get_sensor(**kwargs)

Resolve one SHM sensor record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def get_sensor(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor record."""
    ...
create_signal
create_signal(payload)

Create a signal record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_signal(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal record."""
    ...
get_signal
get_signal(signal_id, **kwargs)

Resolve a signal record by backend identifier.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def get_signal(self, signal_id: str, **kwargs: Any) -> dict[str, Any]:
    """Resolve a signal record by backend identifier."""
    ...
create_signal_history
create_signal_history(payload)

Create a signal history record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_signal_history(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal history record."""
    ...
create_signal_calibration
create_signal_calibration(payload)

Create a signal calibration record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_signal_calibration(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a signal calibration record."""
    ...
create_derived_signal
create_derived_signal(payload)

Create a derived signal record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_derived_signal(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal record."""
    ...
create_derived_signal_history
create_derived_signal_history(payload)

Create a derived signal history record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_derived_signal_history(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a derived signal history record."""
    ...
patch_derived_signal_history
patch_derived_signal_history(history_id, payload)

Patch a derived signal history record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def patch_derived_signal_history(
    self,
    history_id: int,
    payload: Mapping[str, Any],
) -> dict[str, Any]:
    """Patch a derived signal history record."""
    ...
create_derived_signal_calibration
create_derived_signal_calibration(payload)

Create a derived signal calibration record.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def create_derived_signal_calibration(
    self,
    payload: Mapping[str, Any],
) -> dict[str, Any]:
    """Create a derived signal calibration record."""
    ...

SignalConfigUploadSource

Bases: Protocol

Protocol for processors that feed turbine-scoped upload data.

Functions
signals_process_data
signals_process_data()

Populate turbine-scoped signal dictionaries.

Source code in src/owi/metadatabase/shm/upload/protocols.py
def signals_process_data(self) -> None:
    """Populate turbine-scoped signal dictionaries."""
    ...

ShmSensorUploadClient

Bases: Protocol

Protocol describing the SHM transport methods used by the sensor uploader.

Functions
get_sensor_type
get_sensor_type(**kwargs)

Resolve one SHM sensor type record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def get_sensor_type(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor type record."""
    ...
get_sensor
get_sensor(**kwargs)

Resolve one SHM sensor record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def get_sensor(self, **kwargs: Any) -> dict[str, Any]:
    """Resolve one SHM sensor record."""
    ...
create_sensor_type
create_sensor_type(payload, files=None)

Create a sensor type record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def create_sensor_type(self, payload: Mapping[str, Any], files: Mapping[str, Any] | None = None) -> dict[str, Any]:
    """Create a sensor type record."""
    ...
create_sensor
create_sensor(payload)

Create a sensor record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def create_sensor(self, payload: Mapping[str, Any]) -> dict[str, Any]:
    """Create a sensor record."""
    ...
create_sensor_calibration
create_sensor_calibration(payload, files=None)

Create a sensor calibration record.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def create_sensor_calibration(
    self, payload: Mapping[str, Any], files: Mapping[str, Any] | None = None
) -> dict[str, Any]:
    """Create a sensor calibration record."""
    ...

ShmSensorUploader

ShmSensorUploader(shm_api)

Upload sensor types, sensors, and sensor calibrations for SHM assets.

Parameters:

Name Type Description Default
shm_api ShmSensorUploadClient

SHM transport client that satisfies :class:ShmSensorUploadClient.

required
Source code in src/owi/metadatabase/shm/upload/sensors.py
def __init__(self, shm_api: ShmSensorUploadClient) -> None:
    self.shm_api = shm_api
Functions
upload_sensor_types
upload_sensor_types(
    sensor_types_data,
    permission_group_ids,
    path_to_images=None,
)

Upload sensor type records, optionally with image attachments.

Parameters:

Name Type Description Default
sensor_types_data Sequence[Mapping[str, Any]]

List of sensor type records (e.g. loaded from sensor_types.json).

required
permission_group_ids Sequence[int] | None

Permission groups applied to every sensor type.

required
path_to_images str | Path | None

Optional directory containing sensor type image files.

None

Returns:

Type Description
list[dict[str, Any]]

Raw backend responses for each created sensor type.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def upload_sensor_types(
    self,
    sensor_types_data: Sequence[Mapping[str, Any]],
    permission_group_ids: Sequence[int] | None,
    path_to_images: str | Path | None = None,
) -> list[dict[str, Any]]:
    """Upload sensor type records, optionally with image attachments.

    Parameters
    ----------
    sensor_types_data
        List of sensor type records (e.g. loaded from ``sensor_types.json``).
    permission_group_ids
        Permission groups applied to every sensor type.
    path_to_images
        Optional directory containing sensor type image files.

    Returns
    -------
    list[dict[str, Any]]
        Raw backend responses for each created sensor type.
    """
    payloads = build_sensor_type_payloads(
        sensor_types_data,
        visibility_groups=permission_group_ids,
        path_to_images=path_to_images,
    )
    return [self._upload_sensor_type(payload) for payload in payloads]
upload_sensors
upload_sensors(
    sensor_type_name,
    sensor_type_params,
    sensors_data,
    permission_group_ids,
    turbines=None,
)

Upload sensor records for a single sensor category across turbines.

Parameters:

Name Type Description Default
sensor_type_name str

Key identifying the sensor category within each turbine's data (e.g. "accelerometers").

required
sensor_type_params Mapping[str, str]

Query parameters used to resolve the backend sensor type id (e.g. {"name": "393B04"}).

required
sensors_data SensorsDataByTurbine

Per-turbine sensor data keyed by turbine identifier. Each turbine has categories mapping to {"serial_numbers": [...], "cabinets": [...]}.

required
permission_group_ids Sequence[int] | None

Permission groups applied to every sensor.

required
turbines Sequence[str] | None

Optional filter to upload only specific turbines. When None, all turbines in sensors_data are processed.

None

Returns:

Type Description
list[dict[str, Any]]

Raw backend responses for each created sensor.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def upload_sensors(
    self,
    sensor_type_name: str,
    sensor_type_params: Mapping[str, str],
    sensors_data: SensorsDataByTurbine,
    permission_group_ids: Sequence[int] | None,
    turbines: Sequence[str] | None = None,
) -> list[dict[str, Any]]:
    """Upload sensor records for a single sensor category across turbines.

    Parameters
    ----------
    sensor_type_name
        Key identifying the sensor category within each turbine's data
        (e.g. ``"accelerometers"``).
    sensor_type_params
        Query parameters used to resolve the backend sensor type id
        (e.g. ``{"name": "393B04"}``).
    sensors_data
        Per-turbine sensor data keyed by turbine identifier. Each turbine
        has categories mapping to ``{"serial_numbers": [...], "cabinets": [...]}``.
    permission_group_ids
        Permission groups applied to every sensor.
    turbines
        Optional filter to upload only specific turbines. When *None*,
        all turbines in ``sensors_data`` are processed.

    Returns
    -------
    list[dict[str, Any]]
        Raw backend responses for each created sensor.
    """
    sensor_type_result = self.shm_api.get_sensor_type(**dict(sensor_type_params))
    sensor_type_id = self._require_existing_result_id(
        sensor_type_result,
        label=f"sensor type '{sensor_type_name}'",
    )

    serial_numbers: list[str | None] = []
    cabinets: list[str | int | None] = []
    turbine_keys = turbines if turbines is not None else list(sensors_data.keys())

    for turbine in turbine_keys:
        data_turbine = sensors_data.get(turbine)
        self._collect_sensor_columns(
            data_turbine,
            sensor_type_name,
            serial_numbers,
            cabinets,
            turbine,
        )

    if not serial_numbers and not cabinets:
        return []

    payloads = build_sensor_payloads(
        sensor_type_id=sensor_type_id,
        serial_numbers=serial_numbers,
        cabinets=cabinets,
        visibility_groups=permission_group_ids,
    )
    return [self.shm_api.create_sensor(p.to_payload()) for p in payloads]
upload_sensor_calibrations
upload_sensor_calibrations(
    signal_sensor_map_data,
    signal_calibration_map_data,
    path_to_datasheets,
    turbines=None,
)

Upload sensor calibration records with optional PDF attachments.

Parameters:

Name Type Description Default
signal_sensor_map_data Mapping[str, Mapping[str, Mapping[str, Any]]]

Per-turbine signal-to-sensor mapping (keyed by turbine, then signal name, with sensor lookup params including sensor_type_id).

required
signal_calibration_map_data Mapping[str, Mapping[str, Mapping[str, str]]]

Per-turbine calibration data (keyed by turbine, then signal name, with date and filename fields).

required
path_to_datasheets str | Path

Directory containing calibration PDF files.

required
turbines Sequence[str] | None

Optional turbine filter. When None, all turbines are processed.

None

Returns:

Type Description
list[dict[str, Any]]

Raw backend responses for each created calibration.

Source code in src/owi/metadatabase/shm/upload/sensors.py
def upload_sensor_calibrations(
    self,
    signal_sensor_map_data: Mapping[str, Mapping[str, Mapping[str, Any]]],
    signal_calibration_map_data: Mapping[str, Mapping[str, Mapping[str, str]]],
    path_to_datasheets: str | Path,
    turbines: Sequence[str] | None = None,
) -> list[dict[str, Any]]:
    """Upload sensor calibration records with optional PDF attachments.

    Parameters
    ----------
    signal_sensor_map_data
        Per-turbine signal-to-sensor mapping (keyed by turbine, then signal
        name, with sensor lookup params including ``sensor_type_id``).
    signal_calibration_map_data
        Per-turbine calibration data (keyed by turbine, then signal name,
        with ``date`` and ``filename`` fields).
    path_to_datasheets
        Directory containing calibration PDF files.
    turbines
        Optional turbine filter. When *None*, all turbines are processed.

    Returns
    -------
    list[dict[str, Any]]
        Raw backend responses for each created calibration.
    """
    results: list[dict[str, Any]] = []
    turbine_keys = turbines if turbines is not None else list(signal_sensor_map_data.keys())

    for turbine in turbine_keys:
        turbine_ss_map = signal_sensor_map_data.get(turbine)
        turbine_sc_map = signal_calibration_map_data.get(turbine)
        if turbine_ss_map is None:
            continue

        resolved_sensor_ids = self._resolve_sensor_ids_for_turbine(turbine_ss_map, turbine)

        if turbine_sc_map is not None:
            payloads = build_sensor_calibration_payloads(
                signal_sensor_map=resolved_sensor_ids,
                signal_calibration_map=turbine_sc_map,
                path_to_datasheets=path_to_datasheets,
            )
            for payload in payloads:
                results.append(self._upload_sensor_calibration(payload))

    return results

ShmSignalUploader

ShmSignalUploader(shm_api, lookup_service)

Upload archive-compatible SHM signal data for arbitrary wind-farm assets.

Parameters:

Name Type Description Default
shm_api ShmSignalUploadClient

SHM transport client, typically :class:owi.metadatabase.shm.ShmAPI.

required
lookup_service ParentSDKLookupService

Parent SDK lookup service used to resolve site, asset, and subassembly ids.

required
Source code in src/owi/metadatabase/shm/upload/signals.py
def __init__(
    self,
    shm_api: ShmSignalUploadClient,
    lookup_service: ParentSDKLookupService,
) -> None:
    self.shm_api = shm_api
    self.lookup_service = lookup_service
Functions
from_clients classmethod
from_clients(shm_api, locations_client, geometry_client)

Construct the uploader from SHM and parent SDK clients.

Parameters:

Name Type Description Default
shm_api ShmSignalUploadClient

SHM transport client used for backend mutations.

required
locations_client ParentLocationsLookupClient

Parent SDK client that resolves project and asset locations.

required
geometry_client ParentGeometryLookupClient

Parent SDK client that resolves subassemblies and model definitions.

required

Returns:

Type Description
ShmSignalUploader

Uploader wired to the canonical SHM lookup service.

Source code in src/owi/metadatabase/shm/upload/signals.py
@classmethod
def from_clients(
    cls,
    shm_api: ShmSignalUploadClient,
    locations_client: ParentLocationsLookupClient,
    geometry_client: ParentGeometryLookupClient,
) -> ShmSignalUploader:
    """Construct the uploader from SHM and parent SDK clients.

    Parameters
    ----------
    shm_api
        SHM transport client used for backend mutations.
    locations_client
        Parent SDK client that resolves project and asset locations.
    geometry_client
        Parent SDK client that resolves subassemblies and model
        definitions.

    Returns
    -------
    ShmSignalUploader
        Uploader wired to the canonical SHM lookup service.
    """
    return cls(
        shm_api=shm_api,
        lookup_service=ParentSDKLookupService(
            locations_client=locations_client,
            geometry_client=geometry_client,
        ),
    )
upload_asset
upload_asset(request)

Upload main and secondary SHM records for one asset.

Parameters:

Name Type Description Default
request AssetSignalUploadRequest

Asset-scoped upload request containing the archive-compatible main and derived signal mappings.

required

Returns:

Type Description
AssetSignalUploadResult

Created backend ids plus raw backend responses grouped by upload phase.

Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_asset(self, request: AssetSignalUploadRequest) -> AssetSignalUploadResult:
    """Upload main and secondary SHM records for one asset.

    Parameters
    ----------
    request
        Asset-scoped upload request containing the archive-compatible main
        and derived signal mappings.

    Returns
    -------
    AssetSignalUploadResult
        Created backend ids plus raw backend responses grouped by upload
        phase.
    """
    upload_context = self.lookup_service.get_signal_upload_context(
        projectsite=request.projectsite,
        assetlocation=request.assetlocation,
        permission_group_ids=request.permission_group_ids,
    )
    signal_ids_by_name, results_main = self._upload_main_signals(
        request.signals,
        upload_context,
    )
    results_secondary = self._upload_signal_secondary_data(
        request.signals,
        signal_ids_by_name=signal_ids_by_name,
        sensor_serial_numbers_by_signal=request.sensor_serial_numbers_by_signal,
        temperature_compensation_signal_ids=request.temperature_compensation_signal_ids,
    )

    derived_signal_ids_by_name: dict[str, int] = {}
    results_derived_main: list[dict[str, Any]] = []
    results_derived_secondary: list[dict[str, Any]] = []
    if request.derived_signals:
        derived_signal_ids_by_name, results_derived_main = self._upload_main_derived_signals(
            request.derived_signals,
            upload_context,
        )
        results_derived_secondary = self._upload_derived_signal_secondary_data(
            request.derived_signals,
            signal_ids_by_name=signal_ids_by_name,
            derived_signal_ids_by_name=derived_signal_ids_by_name,
        )

    return AssetSignalUploadResult(
        asset_key=request.result_key,
        signal_ids_by_name=signal_ids_by_name,
        derived_signal_ids_by_name=derived_signal_ids_by_name,
        results_main=results_main,
        results_secondary=results_secondary,
        results_derived_main=results_derived_main,
        results_derived_secondary=results_derived_secondary,
    )
upload_assets
upload_assets(requests)

Upload SHM signal data for multiple assets.

Parameters:

Name Type Description Default
requests Sequence[AssetSignalUploadRequest]

Asset-scoped upload requests to execute in order.

required

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by each request's stable result key.

Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_assets(
    self,
    requests: Sequence[AssetSignalUploadRequest],
) -> dict[str, AssetSignalUploadResult]:
    """Upload SHM signal data for multiple assets.

    Parameters
    ----------
    requests
        Asset-scoped upload requests to execute in order.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by each request's stable result key.
    """
    return {request.result_key: self.upload_asset(request) for request in requests}
upload_turbines
upload_turbines(
    *,
    projectsite,
    signals_by_turbine,
    derived_signals_by_turbine=None,
    assetlocations_by_turbine=None,
    permission_group_ids=None,
    sensor_serial_numbers_by_turbine=None,
    temperature_compensation_signal_ids_by_turbine=None,
)

Upload SHM signal data for multiple turbine-scoped config bundles.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title shared by the turbine batch.

required
signals_by_turbine SignalConfigMapByTurbine

Main signal mappings keyed by turbine identifier.

required
derived_signals_by_turbine SignalConfigMapByTurbine | None

Optional derived signal mappings keyed by turbine identifier.

None
assetlocations_by_turbine Mapping[str, str] | None

Optional turbine-to-asset-location override mapping.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of signal identifiers to sensor serial numbers used for signal history rows.

None
temperature_compensation_signal_ids_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of temperature-compensation tokens to backend SHM signal ids.

None

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by turbine identifier.

This keeps the response keyed by turbine while parent lookups use the
corresponding asset-location title.
Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_turbines(
    self,
    *,
    projectsite: str,
    signals_by_turbine: SignalConfigMapByTurbine,
    derived_signals_by_turbine: SignalConfigMapByTurbine | None = None,
    assetlocations_by_turbine: Mapping[str, str] | None = None,
    permission_group_ids: Sequence[int] | None = None,
    sensor_serial_numbers_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
    temperature_compensation_signal_ids_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
) -> dict[str, AssetSignalUploadResult]:
    """Upload SHM signal data for multiple turbine-scoped config bundles.

    Parameters
    ----------
    projectsite
        Parent SDK project site title shared by the turbine batch.
    signals_by_turbine
        Main signal mappings keyed by turbine identifier.
    derived_signals_by_turbine
        Optional derived signal mappings keyed by turbine identifier.
    assetlocations_by_turbine
        Optional turbine-to-asset-location override mapping.
    permission_group_ids
        Visibility groups applied to created SHM objects.
    sensor_serial_numbers_by_turbine
        Optional per-turbine mapping of signal identifiers to sensor serial
        numbers used for signal history rows.
    temperature_compensation_signal_ids_by_turbine
        Optional per-turbine mapping of temperature-compensation tokens to
        backend SHM signal ids.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by turbine identifier.

    This keeps the response keyed by turbine while parent lookups use the
    corresponding asset-location title.
    """
    results: dict[str, AssetSignalUploadResult] = {}
    for turbine, signals in signals_by_turbine.items():
        assetlocation = turbine
        if assetlocations_by_turbine is not None:
            assetlocation = assetlocations_by_turbine.get(turbine, turbine)

        derived_signals = None
        if derived_signals_by_turbine is not None:
            derived_signals = derived_signals_by_turbine.get(turbine)

        sensor_serial_numbers_by_signal = None
        if sensor_serial_numbers_by_turbine is not None:
            sensor_serial_numbers_by_signal = sensor_serial_numbers_by_turbine.get(turbine)

        temperature_compensation_signal_ids = None
        if temperature_compensation_signal_ids_by_turbine is not None:
            temperature_compensation_signal_ids = temperature_compensation_signal_ids_by_turbine.get(turbine)

        results[turbine] = self.upload_asset(
            AssetSignalUploadRequest(
                projectsite=projectsite,
                assetlocation=assetlocation,
                signals=signals,
                derived_signals=derived_signals,
                permission_group_ids=permission_group_ids,
                sensor_serial_numbers_by_signal=sensor_serial_numbers_by_signal,
                temperature_compensation_signal_ids=temperature_compensation_signal_ids,
            )
        )
    return results
upload_from_processor
upload_from_processor(
    *,
    projectsite,
    processor,
    assetlocations_by_turbine=None,
    permission_group_ids=None,
    sensor_serial_numbers_by_turbine=None,
    temperature_compensation_signal_ids_by_turbine=None,
)

Process turbine configs and upload them through the generic SHM seam.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title shared by the processor output.

required
processor SignalConfigUploadSource

Processor instance that populates signals_data and signals_derived_data.

required
assetlocations_by_turbine Mapping[str, str] | None

Optional turbine-to-asset-location override mapping.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None
sensor_serial_numbers_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of signal identifiers to sensor serial numbers used for signal history rows.

None
temperature_compensation_signal_ids_by_turbine Mapping[str, Mapping[str, int]] | None

Optional per-turbine mapping of temperature-compensation tokens to backend SHM signal ids.

None

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by turbine identifier.

Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_from_processor(
    self,
    *,
    projectsite: str,
    processor: SignalConfigUploadSource,
    assetlocations_by_turbine: Mapping[str, str] | None = None,
    permission_group_ids: Sequence[int] | None = None,
    sensor_serial_numbers_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
    temperature_compensation_signal_ids_by_turbine: Mapping[str, Mapping[str, int]] | None = None,
) -> dict[str, AssetSignalUploadResult]:
    """Process turbine configs and upload them through the generic SHM seam.

    Parameters
    ----------
    projectsite
        Parent SDK project site title shared by the processor output.
    processor
        Processor instance that populates ``signals_data`` and
        ``signals_derived_data``.
    assetlocations_by_turbine
        Optional turbine-to-asset-location override mapping.
    permission_group_ids
        Visibility groups applied to created SHM objects.
    sensor_serial_numbers_by_turbine
        Optional per-turbine mapping of signal identifiers to sensor serial
        numbers used for signal history rows.
    temperature_compensation_signal_ids_by_turbine
        Optional per-turbine mapping of temperature-compensation tokens to
        backend SHM signal ids.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by turbine identifier.
    """
    processor.signals_process_data()
    return self.upload_turbines(
        projectsite=projectsite,
        signals_by_turbine=processor.signals_data,
        derived_signals_by_turbine=processor.signals_derived_data,
        assetlocations_by_turbine=assetlocations_by_turbine,
        permission_group_ids=permission_group_ids,
        sensor_serial_numbers_by_turbine=sensor_serial_numbers_by_turbine,
        temperature_compensation_signal_ids_by_turbine=(temperature_compensation_signal_ids_by_turbine),
    )
upload_from_processor_files
upload_from_processor_files(
    *,
    projectsite,
    processor,
    path_signal_sensor_map=None,
    path_sensor_tc_map=None,
    assetlocations_by_turbine=None,
    permission_group_ids=None,
)

Process configs, resolve optional file maps, and upload by turbine.

Parameters:

Name Type Description Default
projectsite str

Parent SDK project site title shared by the batch.

required
processor SignalConfigUploadSource

Processor that populates turbine-scoped signal mappings.

required
path_signal_sensor_map str | Path | None

Optional JSON file keyed by turbine and signal id with SHM sensor lookup parameters. When sensor_type_id is itself a mapping, the uploader resolves it through get_sensor_type() before the final sensor lookup.

None
path_sensor_tc_map str | Path | None

Optional JSON file keyed by turbine with temperature- compensation signal identifiers to resolve through get_signal().

None
assetlocations_by_turbine Mapping[str, str] | None

Optional turbine-to-asset-location override mapping.

None
permission_group_ids Sequence[int] | None

Visibility groups applied to created SHM objects.

None

Returns:

Type Description
dict[str, AssetSignalUploadResult]

Upload results keyed by turbine identifier.

Examples:

>>> from unittest.mock import Mock
>>> uploader = ShmSignalUploader(shm_api=Mock(), lookup_service=Mock())
>>> processor = Mock()
>>> processor.signals_data = {}
>>> processor.signals_derived_data = {}
>>> uploader.upload_from_processor_files(projectsite="Project A", processor=processor)
{}
Source code in src/owi/metadatabase/shm/upload/signals.py
def upload_from_processor_files(
    self,
    *,
    projectsite: str,
    processor: SignalConfigUploadSource,
    path_signal_sensor_map: str | Path | None = None,
    path_sensor_tc_map: str | Path | None = None,
    assetlocations_by_turbine: Mapping[str, str] | None = None,
    permission_group_ids: Sequence[int] | None = None,
) -> dict[str, AssetSignalUploadResult]:
    """Process configs, resolve optional file maps, and upload by turbine.

    Parameters
    ----------
    projectsite
        Parent SDK project site title shared by the batch.
    processor
        Processor that populates turbine-scoped signal mappings.
    path_signal_sensor_map
        Optional JSON file keyed by turbine and signal id with SHM sensor
        lookup parameters. When ``sensor_type_id`` is itself a mapping,
        the uploader resolves it through ``get_sensor_type()`` before the
        final sensor lookup.
    path_sensor_tc_map
        Optional JSON file keyed by turbine with temperature-
        compensation signal identifiers to resolve through
        ``get_signal()``.
    assetlocations_by_turbine
        Optional turbine-to-asset-location override mapping.
    permission_group_ids
        Visibility groups applied to created SHM objects.

    Returns
    -------
    dict[str, AssetSignalUploadResult]
        Upload results keyed by turbine identifier.

    Examples
    --------
    >>> from unittest.mock import Mock
    >>> uploader = ShmSignalUploader(shm_api=Mock(), lookup_service=Mock())
    >>> processor = Mock()
    >>> processor.signals_data = {}
    >>> processor.signals_derived_data = {}
    >>> uploader.upload_from_processor_files(projectsite="Project A", processor=processor)
    {}
    """
    processor.signals_process_data()
    sensor_serial_numbers_by_turbine = self._resolve_sensor_serial_numbers_by_turbine(path_signal_sensor_map)
    temperature_compensation_signal_ids_by_turbine = self._resolve_temperature_compensation_signal_ids_by_turbine(
        path_sensor_tc_map
    )
    return self.upload_turbines(
        projectsite=projectsite,
        signals_by_turbine=processor.signals_data,
        derived_signals_by_turbine=processor.signals_derived_data,
        assetlocations_by_turbine=assetlocations_by_turbine,
        permission_group_ids=permission_group_ids,
        sensor_serial_numbers_by_turbine=sensor_serial_numbers_by_turbine,
        temperature_compensation_signal_ids_by_turbine=(temperature_compensation_signal_ids_by_turbine),
    )

Functions

build_derived_signal_calibration_payloads

build_derived_signal_calibration_payloads(
    derived_signal_id, signal_data
)

Build derived-signal calibration payloads from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_calibration_payloads(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> list[dict[str, Any]]:
    """Build derived-signal calibration payloads from archive-style data."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for calibration in calibrations:
        if not isinstance(calibration, Mapping):
            continue
        payloads.append(
            DerivedSignalCalibrationPayload.from_yaw_offset(
                derived_signal_id=derived_signal_id,
                calibration_date=calibration["time"],
                yaw_parameter=calibration["yaw_parameter"],
                yaw_offset=calibration["yaw_offset"],
                measurement_location=calibration.get("measurement_location"),
            ).to_payload()
        )
    return payloads

build_derived_signal_main_payload

build_derived_signal_main_payload(
    signal, signal_data, context
)

Build the main derived-signal payload from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main derived-signal payload from archive-style data."""
    if len(signal_data) <= 1:
        return None

    sub_assembly = context.subassembly_id_for(signal.subassembly)
    if sub_assembly is None:
        raise KeyError(f"Missing sub-assembly id for {signal.subassembly!r}")

    return DerivedSignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        sub_assembly=sub_assembly,
        signal_type=signal.signal_type,
        derived_signal_id=signal.raw,
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=signal_data.get("data"),
        visibility_groups=context.permission_group_ids,
    ).to_payload()

build_derived_signal_parent_patch

build_derived_signal_parent_patch(parent_signal_ids)

Build the parent-signals patch payload for derived signal status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_parent_patch(parent_signal_ids: Sequence[int]) -> dict[str, list[int]]:
    """Build the parent-signals patch payload for derived signal status rows."""
    return DerivedSignalHistoryParentSignalsPatch(parent_signal_ids).to_payload()

build_derived_signal_status_payload

build_derived_signal_status_payload(
    derived_signal_id, signal_data
)

Build the derived-signal status payload used before parent patching.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_status_payload(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> dict[str, Any]:
    """Build the derived-signal status payload used before parent patching."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)) or not calibrations:
        raise ValueError("Derived signal calibration rows are required to build a status payload.")

    first = calibrations[0]
    if not isinstance(first, Mapping):
        raise ValueError("Derived signal calibration rows must be mappings.")

    return DerivedSignalHistoryPayload(
        derived_signal_id=derived_signal_id,
        activity_start_timestamp=first["time"],
        is_latest_status=True,
        status="ok",
    ).to_payload()

build_sensor_calibration_payloads

build_sensor_calibration_payloads(
    signal_sensor_map,
    signal_calibration_map,
    path_to_datasheets,
)

Build sensor calibration payload models for one turbine.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_calibration_payloads(
    signal_sensor_map: Mapping[str, int],
    signal_calibration_map: Mapping[str, Mapping[str, str]],
    path_to_datasheets: str | Path,
) -> list[SensorCalibrationPayload]:
    """Build sensor calibration payload models for one turbine."""
    payloads: list[SensorCalibrationPayload] = []
    for signal_name, calibration in signal_calibration_map.items():
        sensor_id = signal_sensor_map.get(signal_name)
        if sensor_id is None:
            continue
        payloads.append(
            SensorCalibrationPayload(
                sensor_serial_number=sensor_id,
                calibration_date=calibration["date"],
                file=Path(path_to_datasheets) / calibration["filename"],
            )
        )
    return payloads

build_sensor_payloads

build_sensor_payloads(
    sensor_type_id,
    serial_numbers,
    cabinets,
    visibility_groups,
    visibility="usergroup",
)

Build sensor payload models from parallel columns.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_payloads(
    sensor_type_id: int,
    serial_numbers: Sequence[str | None],
    cabinets: Sequence[str | int | None],
    visibility_groups: Sequence[int] | None,
    visibility: str = "usergroup",
) -> list[SensorPayload]:
    """Build sensor payload models from parallel columns."""
    rows = _expand_columns({"serial_number": serial_numbers, "cabinet": cabinets})
    return [
        SensorPayload(
            sensor_type_id=sensor_type_id,
            serial_number=row["serial_number"],
            cabinet=row["cabinet"],
            visibility=visibility,
            visibility_groups=visibility_groups,
        )
        for row in rows
    ]

build_sensor_type_payloads

build_sensor_type_payloads(
    sensor_types_data,
    visibility_groups,
    path_to_images=None,
    visibility="usergroup",
)

Build sensor type payload models from raw records.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_type_payloads(
    sensor_types_data: Sequence[Mapping[str, Any]],
    visibility_groups: Sequence[int] | None,
    path_to_images: str | Path | None = None,
    visibility: str = "usergroup",
) -> list[SensorTypePayload]:
    """Build sensor type payload models from raw records."""
    payloads: list[SensorTypePayload] = []
    for entry in sensor_types_data:
        file_path: Path | None = None
        filename = entry.get("file")
        if filename is not None and path_to_images is not None:
            file_path = Path(path_to_images) / str(filename)
        payloads.append(
            SensorTypePayload(
                name=str(entry["name"]),
                type=str(entry["type"]),
                type_extended=str(entry["type_extended"]),
                hardware_supplier=str(entry["hardware_supplier"]),
                file=file_path,
                visibility=visibility,
                visibility_groups=visibility_groups,
            )
        )
    return payloads

build_signal_calibration_payloads

build_signal_calibration_payloads(
    signal_id, signal_data, tempcomp_signal_ids=None
)

Build signal calibration payloads from archive-style offset and CWL data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_calibration_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    tempcomp_signal_ids: Mapping[str, int] | None = None,
) -> list[dict[str, Any]]:
    """Build signal calibration payloads from archive-style offset and CWL data."""
    payloads: list[dict[str, Any]] = []

    offsets = signal_data.get("offset")
    if isinstance(offsets, Sequence) and not isinstance(offsets, (str, bytes)):
        for offset in offsets:
            if not isinstance(offset, Mapping):
                continue
            lead_correction = offset.get("lead_correction")
            tc_sensor = offset.get("TCSensor")
            payloads.append(
                SignalCalibrationPayload.from_offset(
                    signal_id=signal_id,
                    calibration_date=offset["time"],
                    offset=offset["offset"],
                    tempcomp_signal_id=(
                        tempcomp_signal_ids.get(tc_sensor)
                        if tempcomp_signal_ids is not None and isinstance(tc_sensor, str)
                        else None
                    ),
                    coefficients=offset.get("Coefficients"),
                    t_ref=offset.get("t_ref"),
                    gauge_correction=offset.get("gauge_correction"),
                    lead_correction=(
                        LeadCorrectionPayload(
                            t_ref=lead_correction["t_ref"],
                            coef=lead_correction["coef"],
                        )
                        if isinstance(lead_correction, Mapping)
                        else None
                    ),
                ).to_payload()
            )

    cwl_rows = signal_data.get("cwl")
    if isinstance(cwl_rows, Sequence) and not isinstance(cwl_rows, (str, bytes)):
        for cwl in cwl_rows:
            if not isinstance(cwl, Mapping):
                continue
            payloads.append(
                SignalCalibrationPayload.from_cwl(
                    signal_id=signal_id,
                    calibration_date=cwl["time"],
                    cwl=cwl["cwl"],
                ).to_payload()
            )

    return payloads

build_signal_main_payload

build_signal_main_payload(signal, signal_data, context)

Build the main signal payload from archive-style signal data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main signal payload from archive-style signal data."""
    if len(signal_data) <= 1:
        return None

    payload = SignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        signal_type=signal.signal_type,
        signal_id=signal.raw,
        sub_assembly=(
            context.subassembly_id_for(signal.subassembly) if signal.subassembly in {"TP", "TW", "MP"} else None
        ),
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=_legacy_signal_misc_data(signal_data),
        visibility_groups=context.permission_group_ids,
    )
    return payload.to_payload()

build_signal_status_payloads

build_signal_status_payloads(
    signal_id, signal_data, sensor_serial_number=None
)

Build signal status payloads from archive-style status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_status_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    sensor_serial_number: int | None = None,
) -> list[dict[str, Any]]:
    """Build signal status payloads from archive-style status rows."""
    statuses = signal_data.get("status")
    if not isinstance(statuses, Sequence) or isinstance(statuses, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for index, status in enumerate(statuses):
        if not isinstance(status, Mapping):
            continue
        status_row = cast(Mapping[str, Any], status)
        payloads.append(
            SignalHistoryPayload(
                signal_id=signal_id,
                activity_start_timestamp=cast(TimestampValue, status_row["time"]),
                is_latest_status=index == len(statuses) - 1,
                status=cast(str, status_row["status"]),
                sensor_serial_number=sensor_serial_number,
                legacy_signal_id=cast(Optional[str], status_row.get("name")),
            ).to_payload()
        )
    return payloads

json_utils

JSON loading helpers shared across SHM modules.

Functions

load_json_data

load_json_data(path_to_data)

Load JSON data from disk using pathlib.

Parameters:

Name Type Description Default
path_to_data str | Path | None

Path to the JSON document, or None.

required

Returns:

Type Description
Any | None

Parsed JSON document, or None when no path is provided.

Examples:

>>> load_json_data(None) is None
True
Source code in src/owi/metadatabase/shm/json_utils.py
def load_json_data(path_to_data: str | Path | None) -> Any | None:
    """Load JSON data from disk using ``pathlib``.

    Parameters
    ----------
    path_to_data
        Path to the JSON document, or ``None``.

    Returns
    -------
    Any | None
        Parsed JSON document, or ``None`` when no path is provided.

    Examples
    --------
    >>> load_json_data(None) is None
    True
    """
    if path_to_data is None:
        return None

    path = Path(path_to_data)
    return json.loads(path.read_text(encoding="utf-8"))

signal_ids

Typed parsing for SHM signal identifiers.

Classes

LegacySignalIdentifier dataclass

LegacySignalIdentifier(
    raw,
    parts,
    subassembly,
    signal_type,
    lateral_position,
    angular_position,
    orientation,
)

Parsed representation of an SHM signal identifier.

Functions
to_legacy_dict
to_legacy_dict()

Return the historical dict shape used by archive payload code.

Source code in src/owi/metadatabase/shm/signal_ids.py
def to_legacy_dict(self) -> dict[str, str | int | None]:
    """Return the historical dict shape used by archive payload code."""
    data: dict[str, str | int | None] = {
        "sa": self.subassembly,
        "type": self.signal_type,
        "lat": self.lateral_position,
        "deg": self.angular_position,
    }
    if len(self.parts) > 4:
        data["orientation"] = self.orientation
    return data

Functions

parse_legacy_signal_id

parse_legacy_signal_id(signal_id)

Parse an SHM signal identifier into a typed model.

Source code in src/owi/metadatabase/shm/signal_ids.py
def parse_legacy_signal_id(signal_id: str) -> LegacySignalIdentifier | None:
    """Parse an SHM signal identifier into a typed model."""
    parts = tuple(signal_id.split("_"))
    if len(parts) < 4:
        return None

    return LegacySignalIdentifier(
        raw=signal_id,
        parts=parts,
        subassembly=parts[2],
        signal_type=parts[3],
        lateral_position=_parse_position(parts, 4, "LAT"),
        angular_position=_parse_position(parts, 5, "DEG"),
        orientation=_parse_orientation(parts),
    )

upload_context

Shared context models for SHM upload workflows.

Classes

SignalUploadContext dataclass

SignalUploadContext(
    site_id,
    asset_location_id,
    model_definition_id,
    permission_group_ids,
    subassembly_ids_by_type,
)

Resolved ids shared by signal upload payload builders.

Functions
subassembly_id_for
subassembly_id_for(subassembly_type)

Return the configured subassembly id for a subassembly token.

Source code in src/owi/metadatabase/shm/upload_context.py
def subassembly_id_for(self, subassembly_type: str) -> int | None:
    """Return the configured subassembly id for a subassembly token."""
    return self.subassembly_ids_by_type.get(subassembly_type)

payloads

Non-legacy payload helpers for SHM upload workflows.

Classes

SignalPayload dataclass

SignalPayload(
    site,
    model_definition,
    asset_location,
    signal_type,
    signal_id,
    visibility_groups,
    sub_assembly=None,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for signal records.

SignalHistoryPayload dataclass

SignalHistoryPayload(
    signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    sensor_serial_number=None,
    status_approval="yes",
    legacy_signal_id=None,
)

Payload model for signal history records.

LeadCorrectionPayload dataclass

LeadCorrectionPayload(t_ref, coef)

Nested payload model for signal lead correction data.

SignalCalibrationData dataclass

SignalCalibrationData(
    offset=None,
    cwl=None,
    coefficients=None,
    t_ref=None,
    gauge_correction=None,
    lead_correction=None,
)

Nested payload model for signal calibration data.

SignalCalibrationPayload dataclass

SignalCalibrationPayload(
    signal_id,
    calibration_date,
    data,
    tempcomp_signal_id=None,
    status_approval="yes",
)

Payload model for signal calibration records.

DerivedSignalPayload dataclass

DerivedSignalPayload(
    site,
    model_definition,
    asset_location,
    sub_assembly,
    signal_type,
    derived_signal_id,
    visibility_groups,
    heading=None,
    level=None,
    orientation=None,
    stats=None,
    data_additional=None,
    visibility="usergroup",
)

Payload model for derived signal records.

DerivedSignalHistoryPayload dataclass

DerivedSignalHistoryPayload(
    derived_signal_id,
    activity_start_timestamp,
    is_latest_status,
    status,
    status_approval="yes",
)

Payload model for derived signal history records.

DerivedSignalHistoryParentSignalsPatch dataclass

DerivedSignalHistoryParentSignalsPatch(parent_signals)

Patch payload for linking parent signals to a derived signal history.

DerivedSignalCalibrationData dataclass

DerivedSignalCalibrationData(
    yaw_parameter, yaw_offset, measurement_location=None
)

Nested payload model for derived signal calibration data.

DerivedSignalCalibrationPayload dataclass

DerivedSignalCalibrationPayload(
    derived_signal_id,
    calibration_date,
    data,
    status_approval="yes",
)

Payload model for derived signal calibration records.

SensorTypePayload dataclass

SensorTypePayload(
    name,
    type,
    type_extended,
    hardware_supplier,
    file=None,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor type records.

SensorPayload dataclass

SensorPayload(
    sensor_type_id,
    serial_number,
    cabinet,
    visibility="usergroup",
    visibility_groups=None,
)

Payload model for sensor records.

SensorCalibrationPayload dataclass

SensorCalibrationPayload(
    sensor_serial_number, calibration_date, file
)

Payload model for sensor calibration records.

Functions

build_sensor_payloads

build_sensor_payloads(
    sensor_type_id,
    serial_numbers,
    cabinets,
    visibility_groups,
    visibility="usergroup",
)

Build sensor payload models from parallel columns.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_payloads(
    sensor_type_id: int,
    serial_numbers: Sequence[str | None],
    cabinets: Sequence[str | int | None],
    visibility_groups: Sequence[int] | None,
    visibility: str = "usergroup",
) -> list[SensorPayload]:
    """Build sensor payload models from parallel columns."""
    rows = _expand_columns({"serial_number": serial_numbers, "cabinet": cabinets})
    return [
        SensorPayload(
            sensor_type_id=sensor_type_id,
            serial_number=row["serial_number"],
            cabinet=row["cabinet"],
            visibility=visibility,
            visibility_groups=visibility_groups,
        )
        for row in rows
    ]

build_sensor_type_payloads

build_sensor_type_payloads(
    sensor_types_data,
    visibility_groups,
    path_to_images=None,
    visibility="usergroup",
)

Build sensor type payload models from raw records.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_type_payloads(
    sensor_types_data: Sequence[Mapping[str, Any]],
    visibility_groups: Sequence[int] | None,
    path_to_images: str | Path | None = None,
    visibility: str = "usergroup",
) -> list[SensorTypePayload]:
    """Build sensor type payload models from raw records."""
    payloads: list[SensorTypePayload] = []
    for entry in sensor_types_data:
        file_path: Path | None = None
        filename = entry.get("file")
        if filename is not None and path_to_images is not None:
            file_path = Path(path_to_images) / str(filename)
        payloads.append(
            SensorTypePayload(
                name=str(entry["name"]),
                type=str(entry["type"]),
                type_extended=str(entry["type_extended"]),
                hardware_supplier=str(entry["hardware_supplier"]),
                file=file_path,
                visibility=visibility,
                visibility_groups=visibility_groups,
            )
        )
    return payloads

build_sensor_calibration_payloads

build_sensor_calibration_payloads(
    signal_sensor_map,
    signal_calibration_map,
    path_to_datasheets,
)

Build sensor calibration payload models for one turbine.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_sensor_calibration_payloads(
    signal_sensor_map: Mapping[str, int],
    signal_calibration_map: Mapping[str, Mapping[str, str]],
    path_to_datasheets: str | Path,
) -> list[SensorCalibrationPayload]:
    """Build sensor calibration payload models for one turbine."""
    payloads: list[SensorCalibrationPayload] = []
    for signal_name, calibration in signal_calibration_map.items():
        sensor_id = signal_sensor_map.get(signal_name)
        if sensor_id is None:
            continue
        payloads.append(
            SensorCalibrationPayload(
                sensor_serial_number=sensor_id,
                calibration_date=calibration["date"],
                file=Path(path_to_datasheets) / calibration["filename"],
            )
        )
    return payloads

build_signal_main_payload

build_signal_main_payload(signal, signal_data, context)

Build the main signal payload from archive-style signal data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main signal payload from archive-style signal data."""
    if len(signal_data) <= 1:
        return None

    payload = SignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        signal_type=signal.signal_type,
        signal_id=signal.raw,
        sub_assembly=(
            context.subassembly_id_for(signal.subassembly) if signal.subassembly in {"TP", "TW", "MP"} else None
        ),
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=_legacy_signal_misc_data(signal_data),
        visibility_groups=context.permission_group_ids,
    )
    return payload.to_payload()

build_signal_status_payloads

build_signal_status_payloads(
    signal_id, signal_data, sensor_serial_number=None
)

Build signal status payloads from archive-style status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_status_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    sensor_serial_number: int | None = None,
) -> list[dict[str, Any]]:
    """Build signal status payloads from archive-style status rows."""
    statuses = signal_data.get("status")
    if not isinstance(statuses, Sequence) or isinstance(statuses, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for index, status in enumerate(statuses):
        if not isinstance(status, Mapping):
            continue
        status_row = cast(Mapping[str, Any], status)
        payloads.append(
            SignalHistoryPayload(
                signal_id=signal_id,
                activity_start_timestamp=cast(TimestampValue, status_row["time"]),
                is_latest_status=index == len(statuses) - 1,
                status=cast(str, status_row["status"]),
                sensor_serial_number=sensor_serial_number,
                legacy_signal_id=cast(Optional[str], status_row.get("name")),
            ).to_payload()
        )
    return payloads

build_signal_calibration_payloads

build_signal_calibration_payloads(
    signal_id, signal_data, tempcomp_signal_ids=None
)

Build signal calibration payloads from archive-style offset and CWL data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_signal_calibration_payloads(
    signal_id: int,
    signal_data: Mapping[str, Any],
    tempcomp_signal_ids: Mapping[str, int] | None = None,
) -> list[dict[str, Any]]:
    """Build signal calibration payloads from archive-style offset and CWL data."""
    payloads: list[dict[str, Any]] = []

    offsets = signal_data.get("offset")
    if isinstance(offsets, Sequence) and not isinstance(offsets, (str, bytes)):
        for offset in offsets:
            if not isinstance(offset, Mapping):
                continue
            lead_correction = offset.get("lead_correction")
            tc_sensor = offset.get("TCSensor")
            payloads.append(
                SignalCalibrationPayload.from_offset(
                    signal_id=signal_id,
                    calibration_date=offset["time"],
                    offset=offset["offset"],
                    tempcomp_signal_id=(
                        tempcomp_signal_ids.get(tc_sensor)
                        if tempcomp_signal_ids is not None and isinstance(tc_sensor, str)
                        else None
                    ),
                    coefficients=offset.get("Coefficients"),
                    t_ref=offset.get("t_ref"),
                    gauge_correction=offset.get("gauge_correction"),
                    lead_correction=(
                        LeadCorrectionPayload(
                            t_ref=lead_correction["t_ref"],
                            coef=lead_correction["coef"],
                        )
                        if isinstance(lead_correction, Mapping)
                        else None
                    ),
                ).to_payload()
            )

    cwl_rows = signal_data.get("cwl")
    if isinstance(cwl_rows, Sequence) and not isinstance(cwl_rows, (str, bytes)):
        for cwl in cwl_rows:
            if not isinstance(cwl, Mapping):
                continue
            payloads.append(
                SignalCalibrationPayload.from_cwl(
                    signal_id=signal_id,
                    calibration_date=cwl["time"],
                    cwl=cwl["cwl"],
                ).to_payload()
            )

    return payloads

build_derived_signal_main_payload

build_derived_signal_main_payload(
    signal, signal_data, context
)

Build the main derived-signal payload from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_main_payload(
    signal: LegacySignalIdentifier,
    signal_data: Mapping[str, Any],
    context: SignalUploadContext,
) -> dict[str, Any] | None:
    """Build the main derived-signal payload from archive-style data."""
    if len(signal_data) <= 1:
        return None

    sub_assembly = context.subassembly_id_for(signal.subassembly)
    if sub_assembly is None:
        raise KeyError(f"Missing sub-assembly id for {signal.subassembly!r}")

    return DerivedSignalPayload(
        site=context.site_id,
        model_definition=context.model_definition_id,
        asset_location=context.asset_location_id,
        sub_assembly=sub_assembly,
        signal_type=signal.signal_type,
        derived_signal_id=signal.raw,
        heading=signal_data.get("heading"),
        level=signal_data.get("level"),
        orientation=signal_data.get("orientation"),
        stats=signal_data.get("stats"),
        data_additional=signal_data.get("data"),
        visibility_groups=context.permission_group_ids,
    ).to_payload()

build_derived_signal_status_payload

build_derived_signal_status_payload(
    derived_signal_id, signal_data
)

Build the derived-signal status payload used before parent patching.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_status_payload(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> dict[str, Any]:
    """Build the derived-signal status payload used before parent patching."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)) or not calibrations:
        raise ValueError("Derived signal calibration rows are required to build a status payload.")

    first = calibrations[0]
    if not isinstance(first, Mapping):
        raise ValueError("Derived signal calibration rows must be mappings.")

    return DerivedSignalHistoryPayload(
        derived_signal_id=derived_signal_id,
        activity_start_timestamp=first["time"],
        is_latest_status=True,
        status="ok",
    ).to_payload()

build_derived_signal_parent_patch

build_derived_signal_parent_patch(parent_signal_ids)

Build the parent-signals patch payload for derived signal status rows.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_parent_patch(parent_signal_ids: Sequence[int]) -> dict[str, list[int]]:
    """Build the parent-signals patch payload for derived signal status rows."""
    return DerivedSignalHistoryParentSignalsPatch(parent_signal_ids).to_payload()

build_derived_signal_calibration_payloads

build_derived_signal_calibration_payloads(
    derived_signal_id, signal_data
)

Build derived-signal calibration payloads from archive-style data.

Source code in src/owi/metadatabase/shm/upload/payloads.py
def build_derived_signal_calibration_payloads(
    derived_signal_id: int,
    signal_data: Mapping[str, Any],
) -> list[dict[str, Any]]:
    """Build derived-signal calibration payloads from archive-style data."""
    calibrations = signal_data.get("calibration")
    if not isinstance(calibrations, Sequence) or isinstance(calibrations, (str, bytes)):
        return []

    payloads: list[dict[str, Any]] = []
    for calibration in calibrations:
        if not isinstance(calibration, Mapping):
            continue
        payloads.append(
            DerivedSignalCalibrationPayload.from_yaw_offset(
                derived_signal_id=derived_signal_id,
                calibration_date=calibration["time"],
                yaw_parameter=calibration["yaw_parameter"],
                yaw_offset=calibration["yaw_offset"],
                measurement_location=calibration.get("measurement_location"),
            ).to_payload()
        )
    return payloads