Skip to content

Signal Processing

processing

Signal processing subpackage.

Re-exports all public symbols so from owi.metadatabase.shm.processing import X continues to work unchanged.

Classes

ConfigDiscovery

Bases: ABC

Discover farm configuration files from a filesystem path.

Functions
discover abstractmethod
discover(path_configs, turbines=None)

Return a turbine-to-config-path mapping.

Parameters:

Name Type Description Default
path_configs str | Path

Filesystem path to a directory of configuration files or a single configuration file.

required
turbines Sequence[str] | None

Optional subset of turbine identifiers to retain from the discovered files.

None

Returns:

Type Description
dict[str, Path]

Mapping from turbine identifier to configuration file path.

Source code in src/owi/metadatabase/shm/processing/discovery.py
@abstractmethod
def discover(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
) -> dict[str, Path]:
    """Return a turbine-to-config-path mapping.

    Parameters
    ----------
    path_configs
        Filesystem path to a directory of configuration files or a single
        configuration file.
    turbines
        Optional subset of turbine identifiers to retain from the
        discovered files.

    Returns
    -------
    dict[str, Path]
        Mapping from turbine identifier to configuration file path.
    """

JsonStemConfigDiscovery dataclass

JsonStemConfigDiscovery(suffix='.json')

Bases: ConfigDiscovery

Discover JSON configuration files by stem name.

Parameters:

Name Type Description Default
suffix str

File suffix treated as a valid configuration file.

'.json'

Examples:

>>> JsonStemConfigDiscovery().suffix
'.json'
Functions
discover
discover(path_configs, turbines=None)

Return available JSON config files keyed by turbine name.

Parameters:

Name Type Description Default
path_configs str | Path

Directory containing configuration files or a single JSON config path.

required
turbines Sequence[str] | None

Optional subset of turbine stems to retain from the discovered files.

None

Returns:

Type Description
dict[str, Path]

Mapping from turbine stem to configuration path.

Raises:

Type Description
ValueError

If the path does not resolve to usable JSON files or the requested turbine subset is empty.

Source code in src/owi/metadatabase/shm/processing/discovery.py
def discover(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
) -> dict[str, Path]:
    """Return available JSON config files keyed by turbine name.

    Parameters
    ----------
    path_configs
        Directory containing configuration files or a single JSON config
        path.
    turbines
        Optional subset of turbine stems to retain from the discovered
        files.

    Returns
    -------
    dict[str, Path]
        Mapping from turbine stem to configuration path.

    Raises
    ------
    ValueError
        If the path does not resolve to usable JSON files or the requested
        turbine subset is empty.
    """
    root = Path(path_configs)
    if root.is_dir():
        available = {
            path.stem: path for path in sorted(root.iterdir()) if path.is_file() and path.suffix == self.suffix
        }
    elif root.is_file() and root.suffix == self.suffix:
        available = {root.stem: root}
    else:
        raise ValueError(f"Could not discover configuration files from {root}.")

    if turbines is None:
        return available

    selected = {turbine: available[turbine] for turbine in turbines if turbine in available}
    missing = [turbine for turbine in turbines if turbine not in available]
    if missing:
        warnings.warn(
            "Some turbines from the provided list are not found in the "
            f"configurations directory. Using available turbines: {list(selected)}",
            stacklevel=2,
        )
    if not selected:
        raise ValueError("No valid turbines found in the provided list.")
    return selected

DelimitedSignalKeyParser dataclass

DelimitedSignalKeyParser(signal_prefixes, separator='/')

Parse delimited signal-property keys.

Parameters:

Name Type Description Default
signal_prefixes tuple[str, ...]

Raw key prefixes that belong to direct signal properties.

required
separator str

Separator between the signal identifier and the property name.

'/'

Examples:

>>> parser = DelimitedSignalKeyParser(signal_prefixes=("WF", "X/", "Y/", "Z/"))
>>> parser.parse("WF_WTG_TP_STRAIN/status")
SignalEventKey(signal_name='WF_WTG_TP_STRAIN', property_name='status')
>>> parser.parse("acceleration/yaw_transformation") is None
True
Functions
matches
matches(raw_key)

Return True when the raw key belongs to a direct signal.

Parameters:

Name Type Description Default
raw_key str

Raw configuration key to test.

required

Returns:

Type Description
bool

Whether the key starts with one of the configured signal prefixes.

Source code in src/owi/metadatabase/shm/processing/parsing.py
def matches(self, raw_key: str) -> bool:
    """Return ``True`` when the raw key belongs to a direct signal.

    Parameters
    ----------
    raw_key
        Raw configuration key to test.

    Returns
    -------
    bool
        Whether the key starts with one of the configured signal prefixes.
    """
    return raw_key.startswith(self.signal_prefixes)
parse
parse(raw_key)

Parse a raw key into a signal/property pair.

Parameters:

Name Type Description Default
raw_key str

Raw configuration key containing a signal name and property name separated by :attr:separator.

required

Returns:

Type Description
SignalEventKey or None

Parsed key, or None when the key does not match or lacks a separator.

Source code in src/owi/metadatabase/shm/processing/parsing.py
def parse(self, raw_key: str) -> SignalEventKey | None:
    """Parse a raw key into a signal/property pair.

    Parameters
    ----------
    raw_key
        Raw configuration key containing a signal name and property name
        separated by :attr:`separator`.

    Returns
    -------
    SignalEventKey or None
        Parsed key, or *None* when the key does not match or lacks a
        separator.
    """
    if not self.matches(raw_key) or self.separator not in raw_key:
        return None

    signal_name, property_name = raw_key.split(self.separator, maxsplit=1)
    if not signal_name or not property_name:
        return None
    return SignalEventKey(signal_name=signal_name, property_name=property_name)

SignalEventKey dataclass

SignalEventKey(signal_name, property_name)

Parsed signal-property key.

Parameters:

Name Type Description Default
signal_name str

Canonical signal identifier.

required
property_name str

Property name carried by the raw configuration key.

required

ConfiguredSignalConfigProcessor

ConfiguredSignalConfigProcessor(
    path_configs, processor_spec, turbines=None
)

Bases: SignalConfigProcessor

Signal processor backed by an explicit farm spec.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm signal configuration events.

required
processor_spec SignalProcessorSpec

Explicit processor specification that defines parsing, derivation, and discovery behavior.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None

Examples:

>>> from owi.metadatabase.shm.processing import DelimitedSignalKeyParser, SignalProcessorSpec
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def __init__(
    self,
    path_configs: str | Path,
    processor_spec: SignalProcessorSpec,
    turbines: Sequence[str] | None = None,
) -> None:
    self._processor_spec = processor_spec
    super().__init__(path_configs=path_configs, turbines=turbines)
Functions
process_events
process_events(events)

Transform raw configuration events into typed signal records.

Parameters:

Name Type Description Default
events Sequence[Mapping[str, Any]]

Ordered raw configuration events loaded from one farm config.

required

Returns:

Type Description
SignalProcessingResult

Typed signal and derived-signal records that can be converted to the archive-compatible uploader payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import (
...     ConfiguredSignalConfigProcessor,
...     DelimitedSignalKeyParser,
...     SignalProcessorSpec,
... )
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def process_events(self, events: Sequence[Mapping[str, Any]]) -> SignalProcessingResult:
    """Transform raw configuration events into typed signal records.

    Parameters
    ----------
    events
        Ordered raw configuration events loaded from one farm config.

    Returns
    -------
    SignalProcessingResult
        Typed signal and derived-signal records that can be converted to
        the archive-compatible uploader payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import (
    ...     ConfiguredSignalConfigProcessor,
    ...     DelimitedSignalKeyParser,
    ...     SignalProcessorSpec,
    ... )
    >>> spec = SignalProcessorSpec(
    ...     farm_name="Demo",
    ...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
    ...     derived_signal_strategies={},
    ... )
    >>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
    >>> result = processor.process_events([{"WF_SIG/status": "ok"}])
    >>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
    'ok'
    """
    signals: dict[str, ProcessedSignalRecord] = {}
    derived_signals: dict[str, ProcessedDerivedSignalRecord] = {}
    current_time = self.processor_spec.default_initial_time

    for index, event in enumerate(events):
        current_time = self._resolve_event_time(
            event,
            index=index,
            current_time=current_time,
        )
        for raw_key, value in event.items():
            signal_key = self.processor_spec.signal_key_parser.parse(raw_key)
            if signal_key is not None:
                self._apply_signal_property(
                    signals=signals,
                    signal_key=signal_key,
                    value=value,
                    timestamp=current_time,
                )
                continue

            strategy = self.processor_spec.derived_signal_strategies.get(raw_key)
            if strategy is None:
                continue
            payload = _coerce_mapping(value, context=raw_key)
            self._apply_derived_updates(
                derived_signals=derived_signals,
                event_key=raw_key,
                updates=strategy.emit_updates(raw_key, payload),
                timestamp=current_time,
            )

    self._postprocess_signals(signals)
    return SignalProcessingResult(signals=signals, derived_signals=derived_signals)
signal_preprocess_data
signal_preprocess_data(path_config)

Process one configuration file into archive-compatible mappings.

Parameters:

Name Type Description Default
path_config str | Path

JSON configuration file to load and process.

required

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings ready for uploader seams.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signal_preprocess_data(
    self,
    path_config: str | Path,
) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Process one configuration file into archive-compatible mappings.

    Parameters
    ----------
    path_config
        JSON configuration file to load and process.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings ready for uploader seams.
    """
    events = self._load_events(path_config)
    return self.process_events(events).to_legacy_data()
signals_process_data
signals_process_data()

Process all discovered configuration files under path_configs.

The processed results are stored on :attr:signals_data and :attr:signals_derived_data, keyed by turbine stem.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signals_process_data(self) -> None:
    """Process all discovered configuration files under ``path_configs``.

    The processed results are stored on :attr:`signals_data` and
    :attr:`signals_derived_data`, keyed by turbine stem.
    """
    config_paths = self.processor_spec.config_discovery.discover(
        self.path_configs,
        turbines=self.turbines,
    )
    self.turbines = list(config_paths)
    for turbine, config_path in config_paths.items():
        signals_data, derived_data = self.signal_preprocess_data(config_path)
        self.signals_data[turbine] = signals_data
        self.signals_derived_data[turbine] = derived_data
build_processor_spec
build_processor_spec()

Return the explicit processor spec passed to the constructor.

Returns:

Type Description
SignalProcessorSpec

The specification supplied at construction time.

Source code in src/owi/metadatabase/shm/processing/processor.py
def build_processor_spec(self) -> SignalProcessorSpec:
    """Return the explicit processor spec passed to the constructor.

    Returns
    -------
    SignalProcessorSpec
        The specification supplied at construction time.
    """
    return self._processor_spec
from_yaml_spec classmethod
from_yaml_spec(
    *, path_configs, processor_spec_path, turbines=None
)

Construct a configured processor from a YAML-backed processor spec.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm configuration events.

required
processor_spec_path str | Path

Path to a YAML processor specification file.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None

Returns:

Type Description
ConfiguredSignalConfigProcessor

Processor loaded with the given YAML spec.

Source code in src/owi/metadatabase/shm/processing/processor.py
@classmethod
def from_yaml_spec(
    cls,
    *,
    path_configs: str | Path,
    processor_spec_path: str | Path,
    turbines: Sequence[str] | None = None,
) -> ConfiguredSignalConfigProcessor:
    """Construct a configured processor from a YAML-backed processor spec.

    Parameters
    ----------
    path_configs
        Directory or JSON file containing farm configuration events.
    processor_spec_path
        Path to a YAML processor specification file.
    turbines
        Optional subset of turbine stems to process during discovery.

    Returns
    -------
    ConfiguredSignalConfigProcessor
        Processor loaded with the given YAML spec.
    """
    return cls(
        path_configs=path_configs,
        processor_spec=load_signal_processor_spec(processor_spec_path),
        turbines=turbines,
    )

DefaultSignalConfigProcessor

DefaultSignalConfigProcessor(
    path_configs, turbines=None, processor_spec=None
)

Bases: ConfiguredSignalConfigProcessor

Built-in specialization of the generic signal processor.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing default wind-farm configuration events.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None
processor_spec SignalProcessorSpec | None

Optional override for the built-in default processor specification.

None
Source code in src/owi/metadatabase/shm/processing/processor.py
def __init__(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
    processor_spec: SignalProcessorSpec | None = None,
) -> None:
    super().__init__(
        path_configs=path_configs,
        turbines=turbines,
        processor_spec=processor_spec or load_default_signal_processor_spec(),
    )
Functions
build_processor_spec
build_processor_spec()

Return the explicit processor spec passed to the constructor.

Returns:

Type Description
SignalProcessorSpec

The specification supplied at construction time.

Source code in src/owi/metadatabase/shm/processing/processor.py
def build_processor_spec(self) -> SignalProcessorSpec:
    """Return the explicit processor spec passed to the constructor.

    Returns
    -------
    SignalProcessorSpec
        The specification supplied at construction time.
    """
    return self._processor_spec
process_events
process_events(events)

Transform raw configuration events into typed signal records.

Parameters:

Name Type Description Default
events Sequence[Mapping[str, Any]]

Ordered raw configuration events loaded from one farm config.

required

Returns:

Type Description
SignalProcessingResult

Typed signal and derived-signal records that can be converted to the archive-compatible uploader payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import (
...     ConfiguredSignalConfigProcessor,
...     DelimitedSignalKeyParser,
...     SignalProcessorSpec,
... )
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def process_events(self, events: Sequence[Mapping[str, Any]]) -> SignalProcessingResult:
    """Transform raw configuration events into typed signal records.

    Parameters
    ----------
    events
        Ordered raw configuration events loaded from one farm config.

    Returns
    -------
    SignalProcessingResult
        Typed signal and derived-signal records that can be converted to
        the archive-compatible uploader payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import (
    ...     ConfiguredSignalConfigProcessor,
    ...     DelimitedSignalKeyParser,
    ...     SignalProcessorSpec,
    ... )
    >>> spec = SignalProcessorSpec(
    ...     farm_name="Demo",
    ...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
    ...     derived_signal_strategies={},
    ... )
    >>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
    >>> result = processor.process_events([{"WF_SIG/status": "ok"}])
    >>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
    'ok'
    """
    signals: dict[str, ProcessedSignalRecord] = {}
    derived_signals: dict[str, ProcessedDerivedSignalRecord] = {}
    current_time = self.processor_spec.default_initial_time

    for index, event in enumerate(events):
        current_time = self._resolve_event_time(
            event,
            index=index,
            current_time=current_time,
        )
        for raw_key, value in event.items():
            signal_key = self.processor_spec.signal_key_parser.parse(raw_key)
            if signal_key is not None:
                self._apply_signal_property(
                    signals=signals,
                    signal_key=signal_key,
                    value=value,
                    timestamp=current_time,
                )
                continue

            strategy = self.processor_spec.derived_signal_strategies.get(raw_key)
            if strategy is None:
                continue
            payload = _coerce_mapping(value, context=raw_key)
            self._apply_derived_updates(
                derived_signals=derived_signals,
                event_key=raw_key,
                updates=strategy.emit_updates(raw_key, payload),
                timestamp=current_time,
            )

    self._postprocess_signals(signals)
    return SignalProcessingResult(signals=signals, derived_signals=derived_signals)
signal_preprocess_data
signal_preprocess_data(path_config)

Process one configuration file into archive-compatible mappings.

Parameters:

Name Type Description Default
path_config str | Path

JSON configuration file to load and process.

required

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings ready for uploader seams.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signal_preprocess_data(
    self,
    path_config: str | Path,
) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Process one configuration file into archive-compatible mappings.

    Parameters
    ----------
    path_config
        JSON configuration file to load and process.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings ready for uploader seams.
    """
    events = self._load_events(path_config)
    return self.process_events(events).to_legacy_data()
signals_process_data
signals_process_data()

Process all discovered configuration files under path_configs.

The processed results are stored on :attr:signals_data and :attr:signals_derived_data, keyed by turbine stem.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signals_process_data(self) -> None:
    """Process all discovered configuration files under ``path_configs``.

    The processed results are stored on :attr:`signals_data` and
    :attr:`signals_derived_data`, keyed by turbine stem.
    """
    config_paths = self.processor_spec.config_discovery.discover(
        self.path_configs,
        turbines=self.turbines,
    )
    self.turbines = list(config_paths)
    for turbine, config_path in config_paths.items():
        signals_data, derived_data = self.signal_preprocess_data(config_path)
        self.signals_data[turbine] = signals_data
        self.signals_derived_data[turbine] = derived_data
from_yaml_spec classmethod
from_yaml_spec(
    *, path_configs, processor_spec_path, turbines=None
)

Construct a configured processor from a YAML-backed processor spec.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm configuration events.

required
processor_spec_path str | Path

Path to a YAML processor specification file.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None

Returns:

Type Description
ConfiguredSignalConfigProcessor

Processor loaded with the given YAML spec.

Source code in src/owi/metadatabase/shm/processing/processor.py
@classmethod
def from_yaml_spec(
    cls,
    *,
    path_configs: str | Path,
    processor_spec_path: str | Path,
    turbines: Sequence[str] | None = None,
) -> ConfiguredSignalConfigProcessor:
    """Construct a configured processor from a YAML-backed processor spec.

    Parameters
    ----------
    path_configs
        Directory or JSON file containing farm configuration events.
    processor_spec_path
        Path to a YAML processor specification file.
    turbines
        Optional subset of turbine stems to process during discovery.

    Returns
    -------
    ConfiguredSignalConfigProcessor
        Processor loaded with the given YAML spec.
    """
    return cls(
        path_configs=path_configs,
        processor_spec=load_signal_processor_spec(processor_spec_path),
        turbines=turbines,
    )

SignalConfigProcessor

SignalConfigProcessor(path_configs, turbines=None)

Bases: ABC

ABC-backed base class for wind-farm signal config processors.

Parameters:

Name Type Description Default
path_configs str | Path

Directory or JSON file containing farm signal configuration events.

required
turbines Sequence[str] | None

Optional subset of turbine stems to process during discovery.

None
Notes

Subclasses provide the farm-specific :class:SignalProcessorSpec used by the generic processing pipeline.

Source code in src/owi/metadatabase/shm/processing/processor.py
def __init__(
    self,
    path_configs: str | Path,
    turbines: Sequence[str] | None = None,
) -> None:
    self.path_configs = Path(path_configs)
    self.turbines = list(turbines) if turbines is not None else None
    self.signals_data: dict[str, LegacySignalMap] = {}
    self.signals_derived_data: dict[str, LegacySignalMap] = {}
    self.processor_spec = self.build_processor_spec()
Functions
build_processor_spec abstractmethod
build_processor_spec()

Return the farm-specific processor specification.

Returns:

Type Description
SignalProcessorSpec

Specification controlling signal key parsing, derived-signal strategies, and postprocessors.

Source code in src/owi/metadatabase/shm/processing/processor.py
@abstractmethod
def build_processor_spec(self) -> SignalProcessorSpec:
    """Return the farm-specific processor specification.

    Returns
    -------
    SignalProcessorSpec
        Specification controlling signal key parsing, derived-signal
        strategies, and postprocessors.
    """
process_events
process_events(events)

Transform raw configuration events into typed signal records.

Parameters:

Name Type Description Default
events Sequence[Mapping[str, Any]]

Ordered raw configuration events loaded from one farm config.

required

Returns:

Type Description
SignalProcessingResult

Typed signal and derived-signal records that can be converted to the archive-compatible uploader payload shape.

Examples:

>>> from owi.metadatabase.shm.processing import (
...     ConfiguredSignalConfigProcessor,
...     DelimitedSignalKeyParser,
...     SignalProcessorSpec,
... )
>>> spec = SignalProcessorSpec(
...     farm_name="Demo",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
>>> result = processor.process_events([{"WF_SIG/status": "ok"}])
>>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
'ok'
Source code in src/owi/metadatabase/shm/processing/processor.py
def process_events(self, events: Sequence[Mapping[str, Any]]) -> SignalProcessingResult:
    """Transform raw configuration events into typed signal records.

    Parameters
    ----------
    events
        Ordered raw configuration events loaded from one farm config.

    Returns
    -------
    SignalProcessingResult
        Typed signal and derived-signal records that can be converted to
        the archive-compatible uploader payload shape.

    Examples
    --------
    >>> from owi.metadatabase.shm.processing import (
    ...     ConfiguredSignalConfigProcessor,
    ...     DelimitedSignalKeyParser,
    ...     SignalProcessorSpec,
    ... )
    >>> spec = SignalProcessorSpec(
    ...     farm_name="Demo",
    ...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
    ...     derived_signal_strategies={},
    ... )
    >>> processor = ConfiguredSignalConfigProcessor(path_configs='.', processor_spec=spec)
    >>> result = processor.process_events([{"WF_SIG/status": "ok"}])
    >>> result.to_legacy_data()[0]["WF_SIG"]["status"][0]["status"]
    'ok'
    """
    signals: dict[str, ProcessedSignalRecord] = {}
    derived_signals: dict[str, ProcessedDerivedSignalRecord] = {}
    current_time = self.processor_spec.default_initial_time

    for index, event in enumerate(events):
        current_time = self._resolve_event_time(
            event,
            index=index,
            current_time=current_time,
        )
        for raw_key, value in event.items():
            signal_key = self.processor_spec.signal_key_parser.parse(raw_key)
            if signal_key is not None:
                self._apply_signal_property(
                    signals=signals,
                    signal_key=signal_key,
                    value=value,
                    timestamp=current_time,
                )
                continue

            strategy = self.processor_spec.derived_signal_strategies.get(raw_key)
            if strategy is None:
                continue
            payload = _coerce_mapping(value, context=raw_key)
            self._apply_derived_updates(
                derived_signals=derived_signals,
                event_key=raw_key,
                updates=strategy.emit_updates(raw_key, payload),
                timestamp=current_time,
            )

    self._postprocess_signals(signals)
    return SignalProcessingResult(signals=signals, derived_signals=derived_signals)
signal_preprocess_data
signal_preprocess_data(path_config)

Process one configuration file into archive-compatible mappings.

Parameters:

Name Type Description Default
path_config str | Path

JSON configuration file to load and process.

required

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings ready for uploader seams.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signal_preprocess_data(
    self,
    path_config: str | Path,
) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Process one configuration file into archive-compatible mappings.

    Parameters
    ----------
    path_config
        JSON configuration file to load and process.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings ready for uploader seams.
    """
    events = self._load_events(path_config)
    return self.process_events(events).to_legacy_data()
signals_process_data
signals_process_data()

Process all discovered configuration files under path_configs.

The processed results are stored on :attr:signals_data and :attr:signals_derived_data, keyed by turbine stem.

Source code in src/owi/metadatabase/shm/processing/processor.py
def signals_process_data(self) -> None:
    """Process all discovered configuration files under ``path_configs``.

    The processed results are stored on :attr:`signals_data` and
    :attr:`signals_derived_data`, keyed by turbine stem.
    """
    config_paths = self.processor_spec.config_discovery.discover(
        self.path_configs,
        turbines=self.turbines,
    )
    self.turbines = list(config_paths)
    for turbine, config_path in config_paths.items():
        signals_data, derived_data = self.signal_preprocess_data(config_path)
        self.signals_data[turbine] = signals_data
        self.signals_derived_data[turbine] = derived_data

ProcessedDerivedSignalRecord dataclass

ProcessedDerivedSignalRecord(
    data_fields=dict(),
    calibration_rows=list(),
    parent_signals=(),
)

Typed in-memory representation of one processed derived signal.

Examples:

>>> record = ProcessedDerivedSignalRecord()
>>> record.ensure_source_name("strain/bending_moment", {"suffix": "N"})
>>> record.set_parent_signals(["SIG_A", "SIG_B"])
>>> record.add_calibration("01/01/1972 00:00", {"yaw_offset": 2.0})
>>> sorted(record.to_legacy_dict())
['calibration', 'data', 'parent_signals']
Functions
ensure_source_name
ensure_source_name(source_name, extra_fields=None)

Initialize immutable source metadata for the derived signal.

Parameters:

Name Type Description Default
source_name str

Event key that produced the derived signal.

required
extra_fields Mapping[str, Any] | None

Optional metadata merged into the legacy data mapping the first time the source name is set.

None
Source code in src/owi/metadatabase/shm/processing/records.py
def ensure_source_name(
    self,
    source_name: str,
    extra_fields: Mapping[str, Any] | None = None,
) -> None:
    """Initialize immutable source metadata for the derived signal.

    Parameters
    ----------
    source_name
        Event key that produced the derived signal.
    extra_fields
        Optional metadata merged into the legacy ``data`` mapping the first
        time the source name is set.
    """
    if not self.data_fields:
        self.data_fields = {"name": source_name}
        if extra_fields:
            self.data_fields.update(extra_fields)
set_parent_signals
set_parent_signals(parent_signals)

Set parent signals when they are first known.

Parameters:

Name Type Description Default
parent_signals Sequence[str]

Ordered parent signal identifiers for the derived signal.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def set_parent_signals(self, parent_signals: Sequence[str]) -> None:
    """Set parent signals when they are first known.

    Parameters
    ----------
    parent_signals
        Ordered parent signal identifiers for the derived signal.
    """
    if not self.parent_signals:
        self.parent_signals = tuple(parent_signals)
add_calibration
add_calibration(timestamp, calibration_fields)

Append a derived-signal calibration row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the calibration.

required
calibration_fields Mapping[str, Any]

Calibration fields emitted by the derived-signal strategy.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_calibration(self, timestamp: str, calibration_fields: Mapping[str, Any]) -> None:
    """Append a derived-signal calibration row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the calibration.
    calibration_fields
        Calibration fields emitted by the derived-signal strategy.
    """
    row = {"time": timestamp}
    row.update(calibration_fields)
    self.calibration_rows.append(row)
to_legacy_dict
to_legacy_dict()

Return the uploader-facing legacy mapping.

Returns:

Type Description
LegacyRecord

Archive-compatible mapping consumed by uploader payload builders.

Source code in src/owi/metadatabase/shm/processing/records.py
def to_legacy_dict(self) -> LegacyRecord:
    """Return the uploader-facing legacy mapping.

    Returns
    -------
    LegacyRecord
        Archive-compatible mapping consumed by uploader payload builders.
    """
    data: LegacyRecord = {}
    if self.data_fields:
        data["data"] = dict(self.data_fields)
    if self.calibration_rows:
        data["calibration"] = [dict(row) for row in self.calibration_rows]
    if self.parent_signals:
        data["parent_signals"] = list(self.parent_signals)
    return data

ProcessedSignalRecord dataclass

ProcessedSignalRecord(
    scalar_fields=dict(),
    status_rows=list(),
    offset_rows=list(),
    cwl_rows=list(),
)

Typed in-memory representation of one processed signal.

Parameters:

Name Type Description Default
scalar_fields dict[str, Any]

Arbitrary scalar properties stored on the signal.

dict()
status_rows list[dict[str, Any]]

Collected status event rows.

list()
offset_rows list[dict[str, Any]]

Collected offset event rows.

list()
cwl_rows list[dict[str, Any]]

Collected CWL event rows.

list()

Examples:

>>> record = ProcessedSignalRecord()
>>> record.add_status("01/01/1972 00:00", "ok")
>>> record.to_legacy_dict()["status"][0]["status"]
'ok'
Functions
set_scalar
set_scalar(property_name, value)

Store a scalar property on the signal.

Parameters:

Name Type Description Default
property_name str

Scalar field name from the raw configuration event.

required
value Any

Value to persist on the signal record.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def set_scalar(self, property_name: str, value: Any) -> None:
    """Store a scalar property on the signal.

    Parameters
    ----------
    property_name
        Scalar field name from the raw configuration event.
    value
        Value to persist on the signal record.
    """
    self.scalar_fields[property_name] = value
add_status
add_status(timestamp, status)

Append a status row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the status.

required
status Any

Status value to store.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_status(self, timestamp: str, status: Any) -> None:
    """Append a status row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the status.
    status
        Status value to store.
    """
    self.status_rows.append({"time": timestamp, "status": status})
add_status_alias
add_status_alias(timestamp, alias_name)

Append a status row that carries a legacy alias name.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the alias.

required
alias_name str

Legacy signal name that points at this record.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_status_alias(self, timestamp: str, alias_name: str) -> None:
    """Append a status row that carries a legacy alias name.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the alias.
    alias_name
        Legacy signal name that points at this record.
    """
    self.status_rows.append({"time": timestamp, "name": alias_name})
add_offset
add_offset(timestamp, offset)

Append an offset row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the offset.

required
offset Any

Offset value to store.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_offset(self, timestamp: str, offset: Any) -> None:
    """Append an offset row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the offset.
    offset
        Offset value to store.
    """
    self.offset_rows.append({"time": timestamp, "offset": offset})
add_cwl
add_cwl(timestamp, cwl)

Append a CWL row.

Parameters:

Name Type Description Default
timestamp str

Event timestamp associated with the CWL value.

required
cwl Any

CWL value to store.

required
Source code in src/owi/metadatabase/shm/processing/records.py
def add_cwl(self, timestamp: str, cwl: Any) -> None:
    """Append a CWL row.

    Parameters
    ----------
    timestamp
        Event timestamp associated with the CWL value.
    cwl
        CWL value to store.
    """
    self.cwl_rows.append({"time": timestamp, "cwl": cwl})
to_legacy_dict
to_legacy_dict()

Return the uploader-facing legacy mapping.

Returns:

Type Description
LegacyRecord

Archive-compatible mapping consumed by uploader payload builders.

Source code in src/owi/metadatabase/shm/processing/records.py
def to_legacy_dict(self) -> LegacyRecord:
    """Return the uploader-facing legacy mapping.

    Returns
    -------
    LegacyRecord
        Archive-compatible mapping consumed by uploader payload builders.
    """
    data = dict(self.scalar_fields)
    if self.status_rows:
        data["status"] = [dict(row) for row in self.status_rows]
    if self.offset_rows:
        data["offset"] = [dict(row) for row in self.offset_rows]
    if self.cwl_rows:
        data["cwl"] = [dict(row) for row in self.cwl_rows]
    return data

SignalProcessingResult dataclass

SignalProcessingResult(signals, derived_signals)

Processed signal and derived-signal records.

Examples:

>>> signal = ProcessedSignalRecord()
>>> signal.add_status("01/01/1972 00:00", "ok")
>>> result = SignalProcessingResult(signals={"SIG": signal}, derived_signals={})
>>> result.to_legacy_data()[0]["SIG"]["status"][0]["status"]
'ok'
Functions
to_legacy_data
to_legacy_data()

Return archive-compatible dicts for uploader seams.

Returns:

Type Description
tuple[LegacySignalMap, LegacySignalMap]

Main-signal and derived-signal mappings in the uploader-facing archive shape.

Source code in src/owi/metadatabase/shm/processing/records.py
def to_legacy_data(self) -> tuple[LegacySignalMap, LegacySignalMap]:
    """Return archive-compatible dicts for uploader seams.

    Returns
    -------
    tuple[LegacySignalMap, LegacySignalMap]
        Main-signal and derived-signal mappings in the uploader-facing
        archive shape.
    """
    return (
        {name: record.to_legacy_dict() for name, record in self.signals.items()},
        {name: record.to_legacy_dict() for name, record in self.derived_signals.items()},
    )

SignalProcessorSpec dataclass

SignalProcessorSpec(
    farm_name,
    signal_key_parser,
    derived_signal_strategies,
    config_discovery=JsonStemConfigDiscovery(),
    postprocessors=(),
    time_field="time",
    default_initial_time="01/01/1972 00:00",
)

Farm-specific configuration for signal processing.

The YAML structure expected by this spec is designed to be flexible enough to cover a wide range of use cases while remaining human-friendly and avoiding excessive nesting. The top-level keys are:

  • farm_name: A human-readable farm identifier used by the caller.
  • signal_key_parser: A configuration for the signal key parser, which recognizes direct signal-property keys in the input data.
  • derived_signal_strategies: A mapping from raw event keys to derived-signal strategies, which define how to generate derived signals based on specific events in the input data.
  • config_discovery: A strategy used to discover configuration files on disk, allowing the processor to locate and load necessary configurations for processing signals.
  • postprocessors: A list of pure normalization hooks applied after all events are processed, enabling additional transformations or clean-up steps on the processed signals.
  • time_field: The event field used to update the active timestamp during processing.
  • default_initial_time: The timestamp assigned to the first event when the payload omits one, ensuring that all events have a valid timestamp for processing.

Parameters:

Name Type Description Default
farm_name str

Human-readable farm identifier used by the caller.

required
signal_key_parser DelimitedSignalKeyParser

Parser that recognizes direct signal-property keys.

required
derived_signal_strategies Mapping[str, DerivedSignalStrategy]

Mapping from raw event keys to derived-signal strategies.

required
config_discovery ConfigDiscovery

Strategy used to discover configuration files on disk.

JsonStemConfigDiscovery()
postprocessors tuple[SignalPostprocessor, ...]

Pure normalization hooks applied after all events are processed.

()
time_field str

Event field used to update the active timestamp.

'time'
default_initial_time str

Timestamp assigned to the first event when the payload omits one.

'01/01/1972 00:00'

Examples:

>>> spec = SignalProcessorSpec(
...     farm_name="Demo Farm",
...     signal_key_parser=DelimitedSignalKeyParser(signal_prefixes=("WF_",)),
...     derived_signal_strategies={},
... )
>>> spec.default_initial_time
'01/01/1972 00:00'

YAML example ~~~~~~~~~~~~

.. code-block:: yaml farm_name: Demo Farm signal_key_parser: kind: delimited signal_prefixes: ["WF_"] derived_signal_strategies: {} config_discovery: kind: json_stem postprocessors: [] time_field: time default_initial_time: '01/01/1972 00:00'

DerivedSignalStrategy

Bases: ABC

Strategy for translating one event into derived-signal updates.

Implementations keep farm-specific derived-signal semantics outside the generic processor loop.

Functions
emit_updates abstractmethod
emit_updates(event_key, payload)

Build derived-signal updates for one event payload.

Parameters:

Name Type Description Default
event_key str

Raw event key that selected the strategy.

required
payload Mapping[str, Any]

Mapping stored under the raw event key.

required

Returns:

Type Description
list[DerivedSignalUpdate]

Derived-signal mutations emitted for the event.

Source code in src/owi/metadatabase/shm/processing/strategies.py
@abstractmethod
def emit_updates(
    self,
    event_key: str,
    payload: Mapping[str, Any],
) -> list[DerivedSignalUpdate]:
    """Build derived-signal updates for one event payload.

    Parameters
    ----------
    event_key
        Raw event key that selected the strategy.
    payload
        Mapping stored under the raw event key.

    Returns
    -------
    list[DerivedSignalUpdate]
        Derived-signal mutations emitted for the event.
    """

LevelBasedDerivedSignalStrategy dataclass

LevelBasedDerivedSignalStrategy(
    suffixes,
    signal_name_builder=_default_level_signal_name,
    parent_signals_builder=_parent_signals_from_level,
    calibration_fields_builder=_yaw_calibration_fields,
    data_builder=None,
    levels_key="levels",
)

Bases: DerivedSignalStrategy

Expand a level-based event into derived signals.

Parameters:

Name Type Description Default
suffixes tuple[str, ...]

Suffixes appended to each level identifier.

required
signal_name_builder SignalNameBuilder

Callback used to derive the final signal name for a level/suffix pair.

_default_level_signal_name
parent_signals_builder ParentSignalsBuilder

Callback that returns parent signal identifiers for a level.

_parent_signals_from_level
calibration_fields_builder CalibrationFieldsBuilder

Callback that returns calibration data for a level.

_yaw_calibration_fields
data_builder DerivedDataBuilder | None

Optional callback for extra metadata stored under data.

None

Examples:

>>> strategy = LevelBasedDerivedSignalStrategy(
...     suffixes=("FA",),
...     parent_signals_builder=lambda payload, level: tuple(payload[level]),
...     calibration_fields_builder=lambda payload, level: {"yaw_offset": payload["yaw_offset"]},
... )
>>> updates = strategy.emit_updates(
...     "acceleration/yaw_transformation",
...     {"levels": ["SIG_A"], "yaw_offset": 2.0, "SIG_A": ["PARENT_1", "PARENT_2"]},
... )
>>> updates[0].signal_name
'SIG_A_FA'
>>> updates[0].parent_signals
('PARENT_1', 'PARENT_2')
Functions
emit_updates
emit_updates(event_key, payload)

Build derived-signal updates for a level-based payload.

Parameters:

Name Type Description Default
event_key str

Raw event key that triggered the strategy. The value is accepted for interface parity and is not used directly by the default implementation.

required
payload Mapping[str, Any]

Mapping that must contain the configured levels_key plus the fields required by the configured callbacks.

required

Returns:

Type Description
list[DerivedSignalUpdate]

One update per level and configured suffix.

Source code in src/owi/metadatabase/shm/processing/strategies.py
def emit_updates(
    self,
    event_key: str,
    payload: Mapping[str, Any],
) -> list[DerivedSignalUpdate]:
    """Build derived-signal updates for a level-based payload.

    Parameters
    ----------
    event_key
        Raw event key that triggered the strategy. The value is accepted
        for interface parity and is not used directly by the default
        implementation.
    payload
        Mapping that must contain the configured ``levels_key`` plus the
        fields required by the configured callbacks.

    Returns
    -------
    list[DerivedSignalUpdate]
        One update per level and configured suffix.
    """
    del event_key
    levels = _coerce_string_sequence(payload.get(self.levels_key), context=self.levels_key)
    updates: list[DerivedSignalUpdate] = []
    for level in levels:
        for suffix in self.suffixes:
            updates.append(
                DerivedSignalUpdate(
                    signal_name=self.signal_name_builder(level, suffix),
                    parent_signals=tuple(self.parent_signals_builder(payload, level)),
                    calibration_fields=dict(self.calibration_fields_builder(payload, level)),
                    data_fields=(dict(self.data_builder(payload, level)) if self.data_builder is not None else {}),
                )
            )
    return updates

Functions

default_signal_processor_spec

default_signal_processor_spec()

Return the built-in default processor spec for wind-farm signal configs.

Returns:

Type Description
SignalProcessorSpec

Pre-loaded default specification.

Examples:

>>> spec = default_signal_processor_spec()
>>> tuple(spec.derived_signal_strategies)
('acceleration/yaw_transformation', 'strain/bending_moment')
Source code in src/owi/metadatabase/shm/processing/spec.py
def default_signal_processor_spec() -> SignalProcessorSpec:
    """Return the built-in default processor spec for wind-farm signal configs.

    Returns
    -------
    SignalProcessorSpec
        Pre-loaded default specification.

    Examples
    --------
    >>> spec = default_signal_processor_spec()
    >>> tuple(spec.derived_signal_strategies)
    ('acceleration/yaw_transformation', 'strain/bending_moment')
    """
    return load_default_signal_processor_spec()

get_default_signal_processor_spec_path

get_default_signal_processor_spec_path()

Return the packaged YAML path for the built-in default processor spec.

Returns:

Type Description
Path

Absolute path to default_signal_processor.yaml shipped with the package.

Source code in src/owi/metadatabase/shm/processing/spec.py
def get_default_signal_processor_spec_path() -> Path:
    """Return the packaged YAML path for the built-in default processor spec.

    Returns
    -------
    Path
        Absolute path to ``default_signal_processor.yaml`` shipped with the
        package.
    """
    return Path(__file__).parent.parent / "config" / "default_signal_processor.yaml"

load_default_signal_processor_spec

load_default_signal_processor_spec()

Load the built-in default processor spec from its YAML document.

Returns:

Type Description
SignalProcessorSpec

Processor specification loaded from the packaged YAML file.

Source code in src/owi/metadatabase/shm/processing/spec.py
def load_default_signal_processor_spec() -> SignalProcessorSpec:
    """Load the built-in default processor spec from its YAML document.

    Returns
    -------
    SignalProcessorSpec
        Processor specification loaded from the packaged YAML file.
    """
    return load_signal_processor_spec(get_default_signal_processor_spec_path())

load_signal_processor_spec

load_signal_processor_spec(path)

Load a signal processor spec from a YAML document.

The YAML document must conform to the structure expected by SignalProcessorSpec.

Parameters:

Name Type Description Default
path str | Path

Path to the YAML document describing the processor spec.

required

Returns:

Type Description
SignalProcessorSpec

Parsed processor specification.

Source code in src/owi/metadatabase/shm/processing/spec.py
def load_signal_processor_spec(path: str | Path) -> SignalProcessorSpec:
    """Load a signal processor spec from a YAML document.

    The YAML document must conform to the structure expected by
    ``SignalProcessorSpec``.

    Parameters
    ----------
    path
        Path to the YAML document describing the processor spec.

    Returns
    -------
    SignalProcessorSpec
        Parsed processor specification.
    """
    raw_data = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
    config = _coerce_mapping(raw_data, context=str(path))

    raw_signal_key_parser = _coerce_mapping(
        config.get("signal_key_parser"),
        context="signal_key_parser",
    )
    raw_derived_signal_strategies = _coerce_mapping(
        config.get("derived_signal_strategies", {}),
        context="derived_signal_strategies",
    )
    raw_config_discovery = config.get("config_discovery", {"kind": "json_stem"})
    raw_postprocessors = config.get("postprocessors", ())

    return SignalProcessorSpec(
        farm_name=_coerce_string(config.get("farm_name"), context="farm_name"),
        signal_key_parser=_build_signal_key_parser_from_config(raw_signal_key_parser),
        derived_signal_strategies={
            event_key: _build_derived_signal_strategy_from_config(
                event_key,
                _coerce_mapping(
                    raw_strategy,
                    context=f"derived_signal_strategies.{event_key}",
                ),
            )
            for event_key, raw_strategy in raw_derived_signal_strategies.items()
        },
        config_discovery=_build_config_discovery_from_config(
            _coerce_mapping(raw_config_discovery, context="config_discovery")
        ),
        postprocessors=tuple(
            _resolve_registry_value(
                registry=_SIGNAL_POSTPROCESSORS,
                raw_name=postprocessor_name,
                context="postprocessors",
            )
            for postprocessor_name in _coerce_string_sequence(
                raw_postprocessors,
                context="postprocessors",
            )
        ),
        time_field=_coerce_string(config.get("time_field", "time"), context="time_field"),
        default_initial_time=_coerce_string(
            config.get("default_initial_time", "01/01/1972 00:00"),
            context="default_initial_time",
        ),
    )