Skip to content

Analyses

analyses

Built-in analyses for the results extension.

Classes

BaseAnalysis

Convenience mixin for concrete protocol-conforming analyses.

Functions
validate_inputs
validate_inputs(payload)

Return validated analysis inputs.

Source code in src/owi/metadatabase/results/analyses/base.py
def validate_inputs(self, payload: Any) -> Any:
    """Return validated analysis inputs."""
    return payload
compute
compute(payload)

Normalize validated analysis inputs.

Source code in src/owi/metadatabase/results/analyses/base.py
def compute(self, payload: Any) -> pd.DataFrame:
    """Normalize validated analysis inputs."""
    raise NotImplementedError
to_results
to_results(payload)

Convert payloads to persisted results.

Source code in src/owi/metadatabase/results/analyses/base.py
def to_results(self, payload: Any) -> list[ResultSeries]:
    """Convert payloads to persisted results."""
    raise NotImplementedError
from_results
from_results(results)

Convert stored results back to normalized data.

Source code in src/owi/metadatabase/results/analyses/base.py
def from_results(self, results: Sequence[ResultSeries]) -> pd.DataFrame:
    """Convert stored results back to normalized data."""
    raise NotImplementedError
plot
plot(results, request=None, plot_strategy=None)

Render results using the default plot strategy.

Source code in src/owi/metadatabase/results/analyses/base.py
def plot(
    self,
    results: Sequence[ResultSeries],
    request: PlotRequest | None = None,
    plot_strategy: PlotStrategyProtocol | None = None,
) -> PlotResponse:
    """Render results using the default plot strategy."""
    plot_request = request or PlotRequest(analysis_name=self.analysis_name)
    strategy = plot_strategy or get_plot_strategy(plot_request.plot_type or self.default_plot_type)
    data = self.from_results(results)
    return strategy.render(data, plot_request)

CeitCorrosionMonitoring

Bases: BaseAnalysis

Time-series analysis for CEIT corrosion monitoring measurements.

Functions
validate_inputs
validate_inputs(payload)

Validate corrosion monitoring request data.

Source code in src/owi/metadatabase/results/analyses/ceit.py
def validate_inputs(self, payload: Any) -> CorrosionMonitoringInput:
    """Validate corrosion monitoring request data."""
    if isinstance(payload, CorrosionMonitoringInput):
        return payload
    return CorrosionMonitoringInput.model_validate(payload)
compute
compute(payload)

Normalize validated CEIT measurements into a long table.

Source code in src/owi/metadatabase/results/analyses/ceit.py
def compute(self, payload: Any) -> pd.DataFrame:
    """Normalize validated CEIT measurements into a long table."""
    validated = self.validate_inputs(payload)
    return ceit_frame_from_measurements(validated.rows)
to_results
to_results(payload)

Convert corrosion monitoring rows to persisted result series.

Source code in src/owi/metadatabase/results/analyses/ceit.py
def to_results(self, payload: Any) -> list[ResultSeries]:
    """Convert corrosion monitoring rows to persisted result series."""
    validated = self.validate_inputs(payload)
    grouped: dict[
        tuple[str, int | None, int | None, str | None, int | None],
        list[CorrosionMonitoringRow],
    ] = {}
    for row in validated.rows:
        related_object = row.related_object
        if isinstance(related_object, dict):
            related_object = RelatedObject(**related_object)
        related_object_type = related_object.type if related_object is not None else None
        related_object_id = related_object.id if related_object is not None else None
        key = (
            row.sensor_identifier,
            row.site_id,
            row.location_id,
            related_object_type,
            related_object_id,
        )
        grouped.setdefault(key, []).append(row)

    results: list[ResultSeries] = []
    for (sensor_identifier, site_id, location_id, _, _), sensor_rows in sorted(grouped.items()):
        ordered_rows = sorted(sensor_rows, key=lambda item: item.timestamp)
        timestamps = [item.timestamp.timestamp() for item in ordered_rows]
        related_object = ordered_rows[0].related_object
        if isinstance(related_object, dict):
            related_object = RelatedObject(**related_object)
        for metric, (source_field, unit) in CEIT_METRICS.items():
            values = [item.metric_values()[metric] for item in ordered_rows]
            results.append(
                ResultSeries(
                    analysis_name=self.analysis_name,
                    analysis_kind=AnalysisKind.TIME_SERIES,
                    result_scope=ResultScope.LOCATION if location_id is not None else ResultScope.SITE,
                    short_description=f"{sensor_identifier}:{metric}",
                    description=f"CEIT {metric} time series for sensor {sensor_identifier}.",
                    site_id=site_id,
                    location_id=location_id,
                    related_object=related_object,
                    data_additional={
                        "sensor_identifier": sensor_identifier,
                        "metric": metric,
                        "series_key": f"{sensor_identifier}:{metric}",
                        "source_field": source_field,
                        "related_object_type": related_object.type if related_object is not None else None,
                        "related_object_id": related_object.id if related_object is not None else None,
                    },
                    vectors=[
                        ResultVector(name="timestamp", unit="s", values=timestamps),
                        ResultVector(name=metric, unit=unit, values=values),
                    ],
                )
            )
    return results
from_results
from_results(results)

Reconstruct normalized CEIT rows from stored results.

Source code in src/owi/metadatabase/results/analyses/ceit.py
def from_results(self, results: Sequence[ResultSeries]) -> pd.DataFrame:
    """Reconstruct normalized CEIT rows from stored results."""
    rows: list[dict[str, Any]] = []
    for result in results:
        sensor_identifier = str(result.data_additional.get("sensor_identifier", "unknown"))
        metric = str(result.data_additional.get("metric", result.vectors[1].name))
        related_object_type = result.data_additional.get(
            "related_object_type",
            result.related_object.type if result.related_object is not None else None,
        )
        related_object_id = result.data_additional.get(
            "related_object_id",
            result.related_object.id if result.related_object is not None else None,
        )
        point_count = min(len(result.vectors[0].values), len(result.vectors[1].values))
        for index in range(point_count):
            rows.append(
                {
                    "analysis_name": result.analysis_name,
                    "sensor_identifier": sensor_identifier,
                    "metric": metric,
                    "timestamp": datetime.fromtimestamp(
                        result.vectors[0].values[index],
                        tz=timezone.utc,
                    ).isoformat(),
                    "value": float(result.vectors[1].values[index]),
                    "site_id": result.site_id,
                    "location_id": result.location_id,
                    "related_object_type": related_object_type,
                    "related_object_id": related_object_id,
                }
            )
    return pd.DataFrame(rows)
plot
plot(results, request=None, plot_strategy=None)

Render corrosion monitoring results using the dedicated plot.

Source code in src/owi/metadatabase/results/analyses/ceit.py
def plot(
    self,
    results: Sequence[ResultSeries],
    request: PlotRequest | None = None,
    plot_strategy: Any | None = None,
) -> PlotResponse:
    """Render corrosion monitoring results using the dedicated plot."""
    del request, plot_strategy
    return plot_ceit_analyses(self.from_results(results))

LifetimeDesignFrequencies

Bases: BaseAnalysis

Comparison analysis for design frequencies by turbine and reference.

Functions
validate_inputs
validate_inputs(payload)

Validate frequency comparison request data.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_frequencies.py
def validate_inputs(self, payload: Any) -> LifetimeDesignFrequenciesInput:
    """Validate frequency comparison request data."""
    if isinstance(payload, LifetimeDesignFrequenciesInput):
        return payload
    return LifetimeDesignFrequenciesInput.model_validate(payload)
compute
compute(payload)

Normalize comparison input rows into a long table.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_frequencies.py
def compute(self, payload: Any) -> pd.DataFrame:
    """Normalize comparison input rows into a long table."""
    validated = self.validate_inputs(payload)
    rows: list[dict[str, Any]] = []
    for row in validated.rows:
        for metric, value in row.metric_values().items():
            rows.append(
                {
                    "x": row.reference,
                    "y": value,
                    "series_name": f"{row.turbine} - {metric}",
                    "turbine": row.turbine,
                    "metric": metric,
                    "reference": row.reference,
                    "location_id": row.location_id,
                    "site_id": row.site_id,
                }
            )
    return pd.DataFrame(rows)
to_results
to_results(payload)

Convert design frequency rows to persisted result series.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_frequencies.py
def to_results(self, payload: Any) -> list[ResultSeries]:
    """Convert design frequency rows to persisted result series."""
    validated = self.validate_inputs(payload)
    grouped: dict[tuple[str, str, int | None, int | None], list[tuple[str, float]]] = {}
    for row in validated.rows:
        for metric, value in row.metric_values().items():
            key = (row.turbine, metric, row.site_id, row.location_id)
            grouped.setdefault(key, []).append((str(row.reference), value))
    results: list[ResultSeries] = []
    for (turbine, metric, site_id, location_id), rows in grouped.items():
        x_labels = [reference for reference, _ in rows]
        x_indices = [float(index) for index, _ in enumerate(rows)]
        y_values = [value for _, value in rows]
        results.append(
            ResultSeries(
                analysis_name=self.analysis_name,
                analysis_kind=AnalysisKind.COMPARISON,
                result_scope=ResultScope.LOCATION if location_id is not None else ResultScope.SITE,
                short_description=f"{turbine} - {metric}",
                site_id=site_id,
                location_id=location_id,
                data_additional={
                    self.reference_labels_key: x_labels,
                },
                vectors=[
                    ResultVector(name="reference_index", unit="index", values=x_indices),
                    ResultVector(name=metric, unit="Hz", values=y_values),
                ],
            )
        )
    return results
from_results
from_results(results)

Reconstruct normalized comparison rows from stored results.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_frequencies.py
def from_results(self, results: Sequence[ResultSeries]) -> pd.DataFrame:
    """Reconstruct normalized comparison rows from stored results."""
    rows: list[dict[str, Any]] = []
    for result in results:
        x_values = result.vectors[0].values
        y_values = result.vectors[1].values
        labels = self._reference_labels_from_result(result)
        turbine, metric = self._split_series_description(result.short_description)
        point_count = min(len(x_values), len(y_values))
        for index in range(point_count):
            x_value = x_values[index]
            y_value = y_values[index]
            label = labels[index] if index < len(labels) else str(x_value)
            rows.append(
                {
                    "x": label,
                    "y": y_value,
                    "series_name": result.short_description,
                    "turbine": turbine,
                    "metric": metric or result.vectors[1].name,
                    "reference": label,
                    "location_id": result.location_id,
                    "site_id": result.site_id,
                }
            )
    return pd.DataFrame(rows)
plot
plot(results, request=None, plot_strategy=None)

Render frequencies using the requested plot type.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_frequencies.py
def plot(
    self,
    results: Sequence[ResultSeries],
    request: PlotRequest | None = None,
    plot_strategy: Any | None = None,
) -> PlotResponse:
    """Render frequencies using the requested plot type."""
    del plot_strategy
    plot_request = request or PlotRequest(analysis_name=self.analysis_name)
    plot_type = plot_request.plot_type or "location"
    location_frame = plot_request.context.get("location_frame")
    data = self.from_results(results)

    if plot_type == "comparison":
        return plot_lifetime_design_frequencies_comparison(data, location_frame=location_frame)
    if plot_type == "location":
        return plot_lifetime_design_frequencies_by_location(data, location_frame=location_frame)
    if plot_type in {"geo", "map"}:
        if not isinstance(location_frame, pd.DataFrame) or location_frame.empty:
            raise ValueError("Location coordinates are required for the geo lifetime design frequency plot.")
        return plot_lifetime_design_frequencies_geo(data, location_frame=location_frame)
    raise ValueError(f"Unsupported plot_type for {self.analysis_name}: {plot_type}")

LifetimeDesignVerification

Bases: BaseAnalysis

Lifetime design verification time-series analysis.

Functions
validate_inputs
validate_inputs(payload)

Validate verification request data.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_verification.py
def validate_inputs(self, payload: Any) -> LifetimeDesignVerificationInput:
    """Validate verification request data."""
    if isinstance(payload, LifetimeDesignVerificationInput):
        return payload
    return LifetimeDesignVerificationInput.model_validate(payload)
compute
compute(payload)

Normalize verification input rows into a long table.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_verification.py
def compute(self, payload: Any) -> pd.DataFrame:
    """Normalize verification input rows into a long table."""
    validated = self.validate_inputs(payload)
    rows: list[dict[str, Any]] = []
    for row in validated.rows:
        for metric in self.metric_columns:
            value = getattr(row, metric)
            if value is None:
                continue
            rows.append(
                {
                    "x": row.timestamp.isoformat(),
                    "y": value,
                    "series_name": f"{row.turbine} - {metric.upper()}",
                    "turbine": row.turbine,
                    "metric": metric.upper(),
                }
            )
    return pd.DataFrame(rows)
to_results
to_results(payload)

Convert verification rows to persisted result series.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_verification.py
def to_results(self, payload: Any) -> list[ResultSeries]:
    """Convert verification rows to persisted result series."""
    validated = self.validate_inputs(payload)
    grouped: dict[tuple[str, str, int | None, int | None], list[VerificationRow]] = {}
    for row in validated.rows:
        for metric in self.metric_columns:
            if getattr(row, metric) is None:
                continue
            key = (row.turbine, metric.upper(), row.site_id, row.location_id)
            grouped.setdefault(key, []).append(row)
    results: list[ResultSeries] = []
    for (turbine, metric, site_id, location_id), rows in grouped.items():
        ordered_rows = sorted(rows, key=lambda item: item.timestamp)
        epoch_values = [item.timestamp.astimezone(timezone.utc).timestamp() for item in ordered_rows]
        metric_values = [float(getattr(item, metric.lower())) for item in ordered_rows]
        results.append(
            ResultSeries(
                analysis_name=self.analysis_name,
                analysis_kind=AnalysisKind.TIME_SERIES,
                result_scope=ResultScope.LOCATION if location_id is not None else ResultScope.SITE,
                short_description=f"{turbine} - {metric}",
                site_id=site_id,
                location_id=location_id,
                data_additional={"turbine": turbine, "metric": metric, "series_key": f"{turbine}:{metric}"},
                vectors=[
                    ResultVector(name="timestamp", unit="s", values=epoch_values),
                    ResultVector(name=metric.lower(), unit="Hz", values=metric_values),
                ],
            )
        )
    return results
from_results
from_results(results)

Reconstruct normalized time-series rows from stored results.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_verification.py
def from_results(self, results: Sequence[ResultSeries]) -> pd.DataFrame:
    """Reconstruct normalized time-series rows from stored results."""
    rows: list[dict[str, Any]] = []
    for result in results:
        x_values = result.vectors[0].values
        y_values = result.vectors[1].values
        turbine = result.data_additional.get("turbine", result.short_description)
        metric = result.data_additional.get("metric", result.vectors[1].name.upper())
        point_count = min(len(x_values), len(y_values))
        for index in range(point_count):
            x_value = x_values[index]
            y_value = y_values[index]
            rows.append(
                {
                    "x": datetime.fromtimestamp(x_value, tz=timezone.utc).isoformat(),
                    "y": y_value,
                    "series_name": result.short_description,
                    "turbine": turbine,
                    "metric": metric,
                }
            )
    return pd.DataFrame(rows)
plot
plot(results, request=None, plot_strategy=None)

Render verification results using the requested plot type.

Source code in src/owi/metadatabase/results/analyses/lifetime_design_verification.py
def plot(
    self,
    results: Sequence[ResultSeries],
    request: PlotRequest | None = None,
    plot_strategy: Any | None = None,
) -> PlotResponse:
    """Render verification results using the requested plot type."""
    del plot_strategy
    plot_request = request or PlotRequest(analysis_name=self.analysis_name)
    plot_type = plot_request.plot_type or self.default_plot_type
    data = self.from_results(results)

    if plot_type == "time_series":
        return plot_verification_time_series(data)
    if plot_type == "comparison":
        return plot_verification_comparison(data)
    raise ValueError(f"Unsupported plot_type for {self.analysis_name}: {plot_type}")

WindSpeedHistogram

Bases: BaseAnalysis

Wind speed histogram analysis.

Functions
plot
plot(results, request=None, plot_strategy=None)

Render results using the default plot strategy.

Source code in src/owi/metadatabase/results/analyses/base.py
def plot(
    self,
    results: Sequence[ResultSeries],
    request: PlotRequest | None = None,
    plot_strategy: PlotStrategyProtocol | None = None,
) -> PlotResponse:
    """Render results using the default plot strategy."""
    plot_request = request or PlotRequest(analysis_name=self.analysis_name)
    strategy = plot_strategy or get_plot_strategy(plot_request.plot_type or self.default_plot_type)
    data = self.from_results(results)
    return strategy.render(data, plot_request)
validate_inputs
validate_inputs(payload)

Validate histogram request data.

Source code in src/owi/metadatabase/results/analyses/wind_speed_histogram.py
def validate_inputs(self, payload: Any) -> WindSpeedHistogramInput:
    """Validate histogram request data."""
    if isinstance(payload, WindSpeedHistogramInput):
        return payload
    return WindSpeedHistogramInput.model_validate(payload)
compute
compute(payload)

Normalize histogram data to a flat table.

Source code in src/owi/metadatabase/results/analyses/wind_speed_histogram.py
def compute(self, payload: Any) -> pd.DataFrame:
    """Normalize histogram data to a flat table."""
    validated = self.validate_inputs(payload)
    rows: list[dict[str, Any]] = []
    for series in validated.series:
        point_count = min(len(series.bins), len(series.values))
        for index in range(point_count):
            bin_left, bin_right = series.bins[index]
            value = series.values[index]
            rows.append(
                {
                    "series_name": series.title,
                    "scope": series.scope_label,
                    "bin_left": bin_left,
                    "bin_right": bin_right,
                    "value": value,
                }
            )
    return pd.DataFrame(rows)
to_results
to_results(payload)

Convert histogram inputs to persisted result series.

Source code in src/owi/metadatabase/results/analyses/wind_speed_histogram.py
def to_results(self, payload: Any) -> list[ResultSeries]:
    """Convert histogram inputs to persisted result series."""
    validated = self.validate_inputs(payload)
    results: list[ResultSeries] = []
    for series in validated.series:
        scope = ResultScope.LOCATION if series.location_id is not None else ResultScope.SITE
        results.append(
            ResultSeries(
                analysis_name=self.analysis_name,
                analysis_kind=AnalysisKind.HISTOGRAM,
                result_scope=scope,
                short_description=series.title,
                description=series.description,
                site_id=series.site_id,
                location_id=series.location_id,
                data_additional={
                    "scope_label": series.scope_label,
                    "series_key": series.title,
                    **series.metadata,
                },
                vectors=[
                    ResultVector(
                        name="bin_left", unit=validated.bin_unit, values=[left for left, _ in series.bins]
                    ),
                    ResultVector(name="value", unit=validated.value_unit, values=series.values),
                    ResultVector(
                        name="bin_right", unit=validated.bin_unit, values=[right for _, right in series.bins]
                    ),
                ],
            )
        )
    return results
from_results
from_results(results)

Reconstruct normalized histogram rows from stored results.

Source code in src/owi/metadatabase/results/analyses/wind_speed_histogram.py
def from_results(self, results: Sequence[ResultSeries]) -> pd.DataFrame:
    """Reconstruct normalized histogram rows from stored results."""
    rows: list[dict[str, Any]] = []
    for result in results:
        bin_left = result.vectors[0].values
        values = result.vectors[1].values
        bin_right = result.vectors[2].values if len(result.vectors) > 2 else [None] * len(values)
        point_count = min(len(bin_left), len(values), len(bin_right))
        for index in range(point_count):
            left = bin_left[index]
            value = values[index]
            right = bin_right[index]
            rows.append(
                {
                    "series_name": result.short_description,
                    "scope": result.data_additional.get("scope_label", result.result_scope.value),
                    "bin_left": left,
                    "bin_right": right,
                    "value": value,
                }
            )
    return pd.DataFrame(rows)