Source code for timedatamodel.timeseries

"""
TimeSeries — a Polars-backed container for time series data.

Uses a ``polars.DataFrame`` as the internal storage backend.

Data shapes
-----------
Four temporal shapes are supported (see :class:`~timedatamodel.datashape.DataShape`):

* **SIMPLE**:    ``valid_time`` + ``value``
* **VERSIONED**: ``knowledge_time`` + ``valid_time`` + ``value``
* **CORRECTED**: ``valid_time`` + ``change_time`` + ``value``
* **AUDIT**:     ``knowledge_time`` + ``change_time`` + ``valid_time`` + ``value``

Timestamp representation
------------------------
All timestamp columns are stored internally as ``pl.Datetime("us", time_zone="UTC")``.
The ``timezone`` metadata field is a display/context hint (IANA zone string).

Example usage
-------------
>>> import pandas as pd
>>> from timedatamodel import TimeSeries, DataType
>>>
>>> df = pd.DataFrame({
...     "valid_time": pd.date_range("2024-01-01", periods=4, freq="1h", tz="UTC"),
...     "value": [1.0, 2.0, 3.0, 4.0],
... })
>>> ts = TimeSeries.from_pandas(df, name="wind_power", unit="MW")
>>> ts.name
'wind_power'
>>> ts.unit
'MW'

Printing a ``TimeSeries`` renders a formatted summary box with metadata
and a head/tail preview of the data.
"""

from __future__ import annotations

import warnings

import pandas as pd
import polars as pl

from ._repr import _TimeSeriesReprMixin
from .datashape import _REQUIRED_COLUMNS, _TIME_COLS, DataShape  # noqa: F401
from .enums import DataType, Frequency, TimeSeriesType
from .units import _get_registry as _get_pint_registry

_TS_DTYPE = pl.Datetime("us", time_zone="UTC")


def _normalize_time_cols(df: pl.DataFrame) -> pl.DataFrame:
    """Cast all recognized timestamp columns to pl.Datetime("us", UTC)."""
    exprs = []
    for col in _TIME_COLS:
        if col not in df.columns:
            continue
        dtype = df[col].dtype
        if dtype == _TS_DTYPE:
            pass  # already correct
        elif isinstance(dtype, pl.Datetime) and dtype.time_zone is None:
            # numpy datetime64 arrives as naive — localize to UTC
            exprs.append(pl.col(col).dt.replace_time_zone("UTC").cast(_TS_DTYPE))
        else:
            exprs.append(pl.col(col).cast(_TS_DTYPE))
    return df.with_columns(exprs) if exprs else df


# ---------------------------------------------------------------------------
# TimeSeries
# ---------------------------------------------------------------------------


[docs] class TimeSeries(_TimeSeriesReprMixin): """Polars-backed container for time series data with rich metadata. The underlying ``df`` is optional. Construct with ``df=None`` to declare a series structure (name, unit, data type, …) before any data exists — useful for registering series in a catalog. Methods that need data (converters, ``head``/``tail``, ``convert_unit``, …) raise :class:`ValueError` when no df is attached. Use :attr:`has_df` to check. Parameters ---------- df: A ``polars.DataFrame`` whose columns conform to one of the recognised :class:`~timedatamodel.datashape.DataShape` patterns, or ``None`` for a metadata-only instance. All timestamp columns must use ``pl.Datetime("us", time_zone="UTC")``. name: Series name (e.g. ``"wind_power"``, ``"electricity.supply"``). description: Human-readable description. unit: Canonical physical unit string (e.g. ``"MW"``, ``"dimensionless"``). timezone: IANA timezone string for display purposes. Internal data is always UTC; this is a metadata hint only. frequency: Pandas offset alias describing the expected data cadence. data_type: Semantic nature of the observations (:class:`~timedatamodel.enums.DataType`). timeseries_type: Storage/versioning model (:class:`~timedatamodel.enums.TimeSeriesType`). """ def __init__( self, df: pl.DataFrame | None = None, *, name: str, description: str | None = None, unit: str = "dimensionless", timezone: str = "UTC", frequency: Frequency | None = None, data_type: DataType | None = None, timeseries_type: TimeSeriesType = TimeSeriesType.FLAT, ) -> None: if df is None: self._df: pl.DataFrame | None = None self._shape: DataShape | None = None else: if not isinstance(df, pl.DataFrame): raise TypeError(f"df must be a polars.DataFrame or None, got {type(df)!r}") shape = _infer_shape(df) _validate_table(df, shape) self._df = df self._shape = shape self.name: str = name self.description: str | None = description self.unit: str = unit self.timezone: str = timezone self.frequency: Frequency | None = frequency self.data_type: DataType | None = data_type self.timeseries_type: TimeSeriesType = timeseries_type # ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @property def shape(self) -> DataShape | None: """Which temporal columns are present (inferred from the DataFrame). ``None`` for metadata-only instances. """ return self._shape @property def num_rows(self) -> int: """Number of data rows. ``0`` for metadata-only instances.""" return self._df.height if self._df is not None else 0 @property def columns(self) -> list[str]: """Column names present in the underlying Polars DataFrame. Empty list for metadata-only instances. """ return self._df.columns if self._df is not None else [] @property def df(self) -> pl.DataFrame | None: """The underlying ``polars.DataFrame`` (read-only by convention). ``None`` for metadata-only instances. """ return self._df @property def has_df(self) -> bool: """True when a DataFrame is attached.""" return self._df is not None @property def has_missing(self) -> bool: """True if the ``value`` column contains any null values. ``False`` for metadata-only instances. """ if self._df is None: return False return self._df["value"].is_null().any() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _require_df(self) -> pl.DataFrame: """Return the attached DataFrame or raise if none. Used by methods that cannot operate on a metadata-only instance. """ if self._df is None: raise ValueError(f"TimeSeries {self.name!r} has no data attached (df=None)") return self._df # ------------------------------------------------------------------ # Constructors # ------------------------------------------------------------------
[docs] @classmethod def from_polars( cls, df: pl.DataFrame, *, name: str, description: str | None = None, unit: str = "dimensionless", timezone: str = "UTC", frequency: Frequency | None = None, data_type: DataType | None = None, timeseries_type: TimeSeriesType = TimeSeriesType.FLAT, ) -> TimeSeries: """Create a :class:`TimeSeries` directly from a ``polars.DataFrame``. All timestamp columns must already use ``pl.Datetime("us", time_zone="UTC")``. """ return cls( df, name=name, description=description, unit=unit, timezone=timezone, frequency=frequency, data_type=data_type, timeseries_type=timeseries_type, )
[docs] @classmethod def from_list( cls, data: dict[str, list], *, name: str, description: str | None = None, unit: str = "dimensionless", timezone: str = "UTC", frequency: Frequency | None = None, data_type: DataType | None = None, timeseries_type: TimeSeriesType = TimeSeriesType.FLAT, ) -> TimeSeries: """Create a :class:`TimeSeries` from a column-oriented dict of lists. Accepts the format returned by :meth:`to_list`. Timestamp columns are normalised to UTC automatically. """ return cls( _normalize_time_cols(pl.DataFrame(data)), name=name, description=description, unit=unit, timezone=timezone, frequency=frequency, data_type=data_type, timeseries_type=timeseries_type, )
[docs] @classmethod def from_numpy( cls, data: dict[str, np.ndarray], *, name: str, description: str | None = None, unit: str = "dimensionless", timezone: str = "UTC", frequency: Frequency | None = None, data_type: DataType | None = None, timeseries_type: TimeSeriesType = TimeSeriesType.FLAT, ) -> TimeSeries: """Create a :class:`TimeSeries` from a column-oriented dict of NumPy arrays. Accepts the format returned by :meth:`to_numpy`. Timestamp columns (``numpy.datetime64``, always timezone-naive) are localised to UTC. Requires ``numpy``. """ try: import numpy as np # noqa: F401 except ImportError as e: raise ImportError("numpy is required for from_numpy(). Install with: pip install numpy") from e return cls( _normalize_time_cols(pl.DataFrame(data)), name=name, description=description, unit=unit, timezone=timezone, frequency=frequency, data_type=data_type, timeseries_type=timeseries_type, )
[docs] @classmethod def from_pyarrow( cls, table: pa.Table, *, name: str, description: str | None = None, unit: str = "dimensionless", timezone: str = "UTC", frequency: Frequency | None = None, data_type: DataType | None = None, timeseries_type: TimeSeriesType = TimeSeriesType.FLAT, ) -> TimeSeries: """Create a :class:`TimeSeries` from a PyArrow Table. Accepts the format returned by :meth:`to_pyarrow`. Arrow ``timestamp[us, UTC]`` columns are converted automatically. Requires ``pyarrow``. """ try: import pyarrow as pa # noqa: F401 except ImportError as e: raise ImportError("pyarrow is required for from_pyarrow(). Install with: pip install pyarrow") from e return cls( pl.from_arrow(table), name=name, description=description, unit=unit, timezone=timezone, frequency=frequency, data_type=data_type, timeseries_type=timeseries_type, )
[docs] @classmethod def from_pandas( cls, df: pd.DataFrame, *, name: str, description: str | None = None, unit: str = "dimensionless", timezone: str = "UTC", frequency: Frequency | None = None, data_type: DataType | None = None, timeseries_type: TimeSeriesType = TimeSeriesType.FLAT, ) -> TimeSeries: """Create a :class:`TimeSeries` from a ``pandas.DataFrame``. Only ``SIMPLE`` and ``VERSIONED`` shapes can be constructed via ``from_pandas``. ``AUDIT`` and ``CORRECTED`` shapes (which require a ``change_time`` column) are read-only results from the database layer. The data shape is inferred from the column names (and MultiIndex levels if the DataFrame uses an index). Raises ------ ValueError If the DataFrame contains a ``change_time`` column. """ polars_df = _ingest_pandas_to_polars(df) shape = _infer_shape(polars_df) if shape in (DataShape.AUDIT, DataShape.CORRECTED): raise ValueError( f"from_pandas produced shape {shape.value} because 'change_time' is present. " f"Only SIMPLE and VERSIONED shapes can be created via from_pandas. " f"Use from_polars() to wrap an existing read result with change_time." ) return cls( polars_df, name=name, description=description, unit=unit, timezone=timezone, frequency=frequency, data_type=data_type, timeseries_type=timeseries_type, )
# ------------------------------------------------------------------ # Conversion # ------------------------------------------------------------------
[docs] def validate_for_insert(self) -> tuple[pl.DataFrame, DataShape]: """Validate that this TimeSeries can be inserted and return the underlying DataFrame with its shape. Only :attr:`DataShape.SIMPLE` and :attr:`DataShape.VERSIONED` are supported for insert. Returns ------- Tuple[pl.DataFrame, DataShape] Raises ------ ValueError If :attr:`shape` is :attr:`DataShape.AUDIT` or :attr:`DataShape.CORRECTED`. """ df = self._require_df() if self._shape in (DataShape.AUDIT, DataShape.CORRECTED): raise ValueError( f"TimeSeries with shape {self._shape.value} cannot be inserted. " f"Only SIMPLE and VERSIONED shapes are supported for insert." ) assert self._shape is not None # df present implies shape present return df, self._shape
[docs] def to_pandas(self) -> pd.DataFrame: """Convert to a ``pandas.DataFrame``. Restores the conventional index: * ``SIMPLE`` — ``valid_time`` as index. * ``VERSIONED`` — ``(knowledge_time, valid_time)`` MultiIndex. * ``AUDIT`` — ``(knowledge_time, change_time, valid_time)`` MultiIndex. * ``CORRECTED`` — ``(valid_time, change_time)`` MultiIndex. """ df = self._require_df().to_pandas() # Polars converts Datetime("us", tz="UTC") to pandas datetime64[us, UTC] # which is exactly what we want. if self._shape == DataShape.SIMPLE: return df.set_index("valid_time") elif self._shape == DataShape.VERSIONED: return df.set_index(["knowledge_time", "valid_time"]) elif self._shape == DataShape.AUDIT: return df.set_index(["knowledge_time", "change_time", "valid_time"]) elif self._shape == DataShape.CORRECTED: return df.set_index(["valid_time", "change_time"]) return df # unreachable, safe fallback
[docs] def to_polars(self) -> pl.DataFrame: """Return the underlying ``polars.DataFrame``.""" return self._require_df()
[docs] def to_list(self) -> dict: """Return the series as a column-oriented dict of lists. Each key is a column name; each value is a Python list of that column's values. Timestamps are Python ``datetime`` objects; null values are ``None``. Example:: {"valid_time": [datetime(...), ...], "value": [1.0, None, 3.0, ...]} """ return self._require_df().to_dict(as_series=False)
[docs] def to_numpy(self) -> dict[str, np.ndarray]: """Return the series as a dictionary of NumPy arrays. Each column maps to a 1-D ``numpy.ndarray``. Timestamp columns become ``numpy.datetime64[us]`` values; null values become ``NaN`` or ``NaT``. Requires ``numpy``. Install with: ``pip install numpy``. """ try: import numpy as np # noqa: F401 except ImportError as e: raise ImportError("numpy is required for to_numpy(). Install it with: pip install numpy") from e df = self._require_df() return {col: df[col].to_numpy(allow_copy=True) for col in df.columns}
[docs] def to_pyarrow(self) -> pa.Table: """Return the series as a ``pyarrow.Table``. All timestamp columns are Arrow ``timestamp[us, UTC]``. Requires ``pyarrow``. Install with: ``pip install pyarrow``. """ try: import pyarrow # noqa: F401 except ImportError as e: raise ImportError("pyarrow is required for to_pyarrow(). Install it with: pip install pyarrow") from e return self._require_df().to_arrow()
[docs] def coverage_bar(self) -> CoverageBar: """Return a :class:`~timedatamodel.CoverageBar` showing value coverage. ``True`` = value present, ``False`` = null/missing. In Jupyter the coverage bar renders as an SVG. """ from ._repr import CoverageBar df = self._require_df() mask = df["value"].is_not_null().to_list() begin = df["valid_time"][0] if self.num_rows > 0 else None end = df["valid_time"][-1] if self.num_rows > 0 else None return CoverageBar([(self.name, mask)], begin, end)
# ------------------------------------------------------------------ # Data access helpers # ------------------------------------------------------------------
[docs] def head(self, n: int = 5) -> TimeSeries: """Return the first *n* rows as a new :class:`TimeSeries`.""" return self._clone(self._require_df().head(n))
[docs] def tail(self, n: int = 5) -> TimeSeries: """Return the last *n* rows as a new :class:`TimeSeries`.""" return self._clone(self._require_df().tail(n))
# ------------------------------------------------------------------ # Unit conversion # ------------------------------------------------------------------
[docs] def convert_unit(self, target_unit: str) -> TimeSeries: """Return a new :class:`TimeSeries` with values converted to *target_unit*. Uses the pint library for unit conversion. The ``unit`` metadata field is updated to *target_unit*. Parameters ---------- target_unit: Target unit string understood by pint (e.g. ``"km/h"``, ``"kW"``). Raises ------ ImportError If pint is not installed. pint.DimensionalityError If the current unit and *target_unit* are dimensionally incompatible. """ try: ureg = _get_pint_registry() except ImportError as exc: raise ImportError( "Unit conversion requires the optional 'pint' dependency. " "Install it with: pip install timedatamodel[pint]" ) from exc factor = float(ureg.Quantity(1.0, self.unit).to(target_unit).magnitude) new_df = self._require_df().with_columns(pl.col("value") * factor) return self._clone(new_df, unit=target_unit)
# ------------------------------------------------------------------ # Internal clone helper # ------------------------------------------------------------------ def _clone(self, new_df: pl.DataFrame | None = None, **overrides) -> TimeSeries: """Create a new :class:`TimeSeries` with *new_df* and the same metadata. Any keyword in *overrides* replaces the corresponding metadata field. Pass ``new_df=None`` to clone as a metadata-only instance. """ return TimeSeries( new_df, name=overrides.get("name", self.name), description=overrides.get("description", self.description), unit=overrides.get("unit", self.unit), timezone=overrides.get("timezone", self.timezone), frequency=overrides.get("frequency", self.frequency), data_type=overrides.get("data_type", self.data_type), timeseries_type=overrides.get("timeseries_type", self.timeseries_type), ) # ------------------------------------------------------------------ # Metadata helpers # ------------------------------------------------------------------
[docs] def metadata_dict(self) -> dict: """Return all metadata fields as a plain dict.""" return { "name": self.name, "description": self.description, "unit": self.unit, "timezone": self.timezone, "frequency": self.frequency, "data_type": self.data_type.value if self.data_type else None, "timeseries_type": self.timeseries_type.value, "shape": self._shape.value if self._shape is not None else None, "num_rows": self.num_rows, }
# ------------------------------------------------------------------ # Dunder # ------------------------------------------------------------------ def __len__(self) -> int: return self._df.height if self._df is not None else 0
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _ingest_pandas_to_polars(df: pd.DataFrame) -> pl.DataFrame: """Ingest a pandas DataFrame into a ``pl.DataFrame``. 1. Flatten any temporal index levels into regular columns. 2. Normalize timestamp columns to UTC-aware pandas Series. 3. Convert to Polars and cast timestamp columns to ``pl.Datetime("us", time_zone="UTC")``. """ # ── 1. Flatten index ──────────────────────────────────────────────────── if isinstance(df.index, pd.MultiIndex): levels_to_reset = [n for n in df.index.names if n in _TIME_COLS] else: levels_to_reset = [df.index.name] if df.index.name in _TIME_COLS else [] if levels_to_reset: df = df.reset_index(level=levels_to_reset) else: df = df.copy(deep=False) # ── 2. Ensure every timestamp column is UTC-aware ─────────────────────── for col in _TIME_COLS: if col not in df.columns: continue s = df[col] if not pd.api.types.is_datetime64_any_dtype(s): warnings.warn( f"Column '{col}' is not a datetime type; parsing and converting to UTC.", UserWarning, stacklevel=3, ) df[col] = pd.to_datetime(s, utc=True) continue tz = s.dt.tz if tz is None: warnings.warn( f"Column '{col}' has no timezone; assuming UTC.", UserWarning, stacklevel=3, ) df[col] = s.dt.tz_localize("UTC") elif str(tz) != "UTC": df[col] = s.dt.tz_convert("UTC") # else: already UTC — no allocation # ── 3. Convert to Polars and cast timestamp columns ───────────────────── polars_df = pl.from_pandas(df) cast_exprs = [pl.col(c).cast(_TS_DTYPE) for c in _TIME_COLS if c in polars_df.columns] if cast_exprs: polars_df = polars_df.with_columns(cast_exprs) return polars_df def _infer_shape(df: pl.DataFrame) -> DataShape: """Infer :class:`DataShape` from the column names present in *df*.""" names = set(df.columns) has_kt = "knowledge_time" in names has_ct = "change_time" in names if has_kt and has_ct: return DataShape.AUDIT if has_ct: return DataShape.CORRECTED if has_kt: return DataShape.VERSIONED return DataShape.SIMPLE def _validate_table(df: pl.DataFrame, shape: DataShape) -> None: """Raise ``ValueError`` if required columns are missing or have wrong type.""" names = set(df.columns) required = _REQUIRED_COLUMNS[shape] missing = [c for c in required if c not in names] if missing: raise ValueError(f"DataFrame is missing required columns for shape {shape.value}: {missing}") # Check timestamp columns have the right dtype for col in _TIME_COLS: if col not in names: continue dtype = df[col].dtype if not isinstance(dtype, pl.Datetime): raise TypeError(f"Column '{col}' must be a Polars Datetime type, got {dtype!r}") if dtype.time_zone is None: raise TypeError(f"Column '{col}' must be timezone-aware with time_zone='UTC', got time_zone=None") if dtype.time_zone != "UTC": raise TypeError(f"Column '{col}' must have time_zone='UTC', got time_zone={dtype.time_zone!r}")