"""
TimeSeries — a Polars-backed container for time series data.
Uses a ``polars.DataFrame`` as the internal storage backend.
Data shapes
-----------
Four temporal shapes are supported (see :class:`~timedatamodel.datashape.DataShape`):
* **SIMPLE**: ``valid_time`` + ``value``
* **VERSIONED**: ``knowledge_time`` + ``valid_time`` + ``value``
* **CORRECTED**: ``valid_time`` + ``change_time`` + ``value``
* **AUDIT**: ``knowledge_time`` + ``change_time`` + ``valid_time`` + ``value``
Timestamp representation
------------------------
All timestamp columns are stored internally as ``pl.Datetime("us", time_zone="UTC")``.
The ``timezone`` metadata field is a display/context hint (IANA zone string).
Example usage
-------------
>>> import pandas as pd
>>> from timedatamodel import TimeSeries, DataType
>>>
>>> df = pd.DataFrame({
... "valid_time": pd.date_range("2024-01-01", periods=4, freq="1h", tz="UTC"),
... "value": [1.0, 2.0, 3.0, 4.0],
... })
>>> ts = TimeSeries.from_pandas(df, name="wind_power", unit="MW")
>>> ts.name
'wind_power'
>>> ts.unit
'MW'
Printing a ``TimeSeries`` renders a formatted summary box with metadata
and a head/tail preview of the data.
"""
from __future__ import annotations
import warnings
import pandas as pd
import polars as pl
from ._repr import _TimeSeriesReprMixin
from .datashape import _REQUIRED_COLUMNS, _TIME_COLS, DataShape # noqa: F401
from .enums import DataType, Frequency, TimeSeriesType
from .units import _get_registry as _get_pint_registry
_TS_DTYPE = pl.Datetime("us", time_zone="UTC")
def _normalize_time_cols(df: pl.DataFrame) -> pl.DataFrame:
"""Cast all recognized timestamp columns to pl.Datetime("us", UTC)."""
exprs = []
for col in _TIME_COLS:
if col not in df.columns:
continue
dtype = df[col].dtype
if dtype == _TS_DTYPE:
pass # already correct
elif isinstance(dtype, pl.Datetime) and dtype.time_zone is None:
# numpy datetime64 arrives as naive — localize to UTC
exprs.append(pl.col(col).dt.replace_time_zone("UTC").cast(_TS_DTYPE))
else:
exprs.append(pl.col(col).cast(_TS_DTYPE))
return df.with_columns(exprs) if exprs else df
# ---------------------------------------------------------------------------
# TimeSeries
# ---------------------------------------------------------------------------
[docs]
class TimeSeries(_TimeSeriesReprMixin):
"""Polars-backed container for time series data with rich metadata.
The underlying ``df`` is optional. Construct with ``df=None`` to declare
a series structure (name, unit, data type, …) before any data exists —
useful for registering series in a catalog. Methods that need data
(converters, ``head``/``tail``, ``convert_unit``, …) raise
:class:`ValueError` when no df is attached. Use :attr:`has_df` to check.
Parameters
----------
df:
A ``polars.DataFrame`` whose columns conform to one of the recognised
:class:`~timedatamodel.datashape.DataShape` patterns, or ``None`` for
a metadata-only instance. All timestamp columns must use
``pl.Datetime("us", time_zone="UTC")``.
name:
Series name (e.g. ``"wind_power"``, ``"electricity.supply"``).
description:
Human-readable description.
unit:
Canonical physical unit string (e.g. ``"MW"``, ``"dimensionless"``).
timezone:
IANA timezone string for display purposes. Internal data is always
UTC; this is a metadata hint only.
frequency:
Pandas offset alias describing the expected data cadence.
data_type:
Semantic nature of the observations (:class:`~timedatamodel.enums.DataType`).
timeseries_type:
Storage/versioning model (:class:`~timedatamodel.enums.TimeSeriesType`).
"""
def __init__(
self,
df: pl.DataFrame | None = None,
*,
name: str,
description: str | None = None,
unit: str = "dimensionless",
timezone: str = "UTC",
frequency: Frequency | None = None,
data_type: DataType | None = None,
timeseries_type: TimeSeriesType = TimeSeriesType.FLAT,
) -> None:
if df is None:
self._df: pl.DataFrame | None = None
self._shape: DataShape | None = None
else:
if not isinstance(df, pl.DataFrame):
raise TypeError(f"df must be a polars.DataFrame or None, got {type(df)!r}")
shape = _infer_shape(df)
_validate_table(df, shape)
self._df = df
self._shape = shape
self.name: str = name
self.description: str | None = description
self.unit: str = unit
self.timezone: str = timezone
self.frequency: Frequency | None = frequency
self.data_type: DataType | None = data_type
self.timeseries_type: TimeSeriesType = timeseries_type
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def shape(self) -> DataShape | None:
"""Which temporal columns are present (inferred from the DataFrame).
``None`` for metadata-only instances.
"""
return self._shape
@property
def num_rows(self) -> int:
"""Number of data rows. ``0`` for metadata-only instances."""
return self._df.height if self._df is not None else 0
@property
def columns(self) -> list[str]:
"""Column names present in the underlying Polars DataFrame.
Empty list for metadata-only instances.
"""
return self._df.columns if self._df is not None else []
@property
def df(self) -> pl.DataFrame | None:
"""The underlying ``polars.DataFrame`` (read-only by convention).
``None`` for metadata-only instances.
"""
return self._df
@property
def has_df(self) -> bool:
"""True when a DataFrame is attached."""
return self._df is not None
@property
def has_missing(self) -> bool:
"""True if the ``value`` column contains any null values.
``False`` for metadata-only instances.
"""
if self._df is None:
return False
return self._df["value"].is_null().any()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _require_df(self) -> pl.DataFrame:
"""Return the attached DataFrame or raise if none.
Used by methods that cannot operate on a metadata-only instance.
"""
if self._df is None:
raise ValueError(f"TimeSeries {self.name!r} has no data attached (df=None)")
return self._df
# ------------------------------------------------------------------
# Constructors
# ------------------------------------------------------------------
[docs]
@classmethod
def from_polars(
cls,
df: pl.DataFrame,
*,
name: str,
description: str | None = None,
unit: str = "dimensionless",
timezone: str = "UTC",
frequency: Frequency | None = None,
data_type: DataType | None = None,
timeseries_type: TimeSeriesType = TimeSeriesType.FLAT,
) -> TimeSeries:
"""Create a :class:`TimeSeries` directly from a ``polars.DataFrame``.
All timestamp columns must already use
``pl.Datetime("us", time_zone="UTC")``.
"""
return cls(
df,
name=name,
description=description,
unit=unit,
timezone=timezone,
frequency=frequency,
data_type=data_type,
timeseries_type=timeseries_type,
)
[docs]
@classmethod
def from_list(
cls,
data: dict[str, list],
*,
name: str,
description: str | None = None,
unit: str = "dimensionless",
timezone: str = "UTC",
frequency: Frequency | None = None,
data_type: DataType | None = None,
timeseries_type: TimeSeriesType = TimeSeriesType.FLAT,
) -> TimeSeries:
"""Create a :class:`TimeSeries` from a column-oriented dict of lists.
Accepts the format returned by :meth:`to_list`. Timestamp columns are
normalised to UTC automatically.
"""
return cls(
_normalize_time_cols(pl.DataFrame(data)),
name=name,
description=description,
unit=unit,
timezone=timezone,
frequency=frequency,
data_type=data_type,
timeseries_type=timeseries_type,
)
[docs]
@classmethod
def from_numpy(
cls,
data: dict[str, np.ndarray],
*,
name: str,
description: str | None = None,
unit: str = "dimensionless",
timezone: str = "UTC",
frequency: Frequency | None = None,
data_type: DataType | None = None,
timeseries_type: TimeSeriesType = TimeSeriesType.FLAT,
) -> TimeSeries:
"""Create a :class:`TimeSeries` from a column-oriented dict of NumPy arrays.
Accepts the format returned by :meth:`to_numpy`. Timestamp columns
(``numpy.datetime64``, always timezone-naive) are localised to UTC.
Requires ``numpy``.
"""
try:
import numpy as np # noqa: F401
except ImportError as e:
raise ImportError("numpy is required for from_numpy(). Install with: pip install numpy") from e
return cls(
_normalize_time_cols(pl.DataFrame(data)),
name=name,
description=description,
unit=unit,
timezone=timezone,
frequency=frequency,
data_type=data_type,
timeseries_type=timeseries_type,
)
[docs]
@classmethod
def from_pyarrow(
cls,
table: pa.Table,
*,
name: str,
description: str | None = None,
unit: str = "dimensionless",
timezone: str = "UTC",
frequency: Frequency | None = None,
data_type: DataType | None = None,
timeseries_type: TimeSeriesType = TimeSeriesType.FLAT,
) -> TimeSeries:
"""Create a :class:`TimeSeries` from a PyArrow Table.
Accepts the format returned by :meth:`to_pyarrow`. Arrow
``timestamp[us, UTC]`` columns are converted automatically.
Requires ``pyarrow``.
"""
try:
import pyarrow as pa # noqa: F401
except ImportError as e:
raise ImportError("pyarrow is required for from_pyarrow(). Install with: pip install pyarrow") from e
return cls(
pl.from_arrow(table),
name=name,
description=description,
unit=unit,
timezone=timezone,
frequency=frequency,
data_type=data_type,
timeseries_type=timeseries_type,
)
[docs]
@classmethod
def from_pandas(
cls,
df: pd.DataFrame,
*,
name: str,
description: str | None = None,
unit: str = "dimensionless",
timezone: str = "UTC",
frequency: Frequency | None = None,
data_type: DataType | None = None,
timeseries_type: TimeSeriesType = TimeSeriesType.FLAT,
) -> TimeSeries:
"""Create a :class:`TimeSeries` from a ``pandas.DataFrame``.
Only ``SIMPLE`` and ``VERSIONED`` shapes can be constructed via
``from_pandas``. ``AUDIT`` and ``CORRECTED`` shapes (which require a
``change_time`` column) are read-only results from the database layer.
The data shape is inferred from the column names (and MultiIndex levels
if the DataFrame uses an index).
Raises
------
ValueError
If the DataFrame contains a ``change_time`` column.
"""
polars_df = _ingest_pandas_to_polars(df)
shape = _infer_shape(polars_df)
if shape in (DataShape.AUDIT, DataShape.CORRECTED):
raise ValueError(
f"from_pandas produced shape {shape.value} because 'change_time' is present. "
f"Only SIMPLE and VERSIONED shapes can be created via from_pandas. "
f"Use from_polars() to wrap an existing read result with change_time."
)
return cls(
polars_df,
name=name,
description=description,
unit=unit,
timezone=timezone,
frequency=frequency,
data_type=data_type,
timeseries_type=timeseries_type,
)
# ------------------------------------------------------------------
# Conversion
# ------------------------------------------------------------------
[docs]
def validate_for_insert(self) -> tuple[pl.DataFrame, DataShape]:
"""Validate that this TimeSeries can be inserted and return the underlying
DataFrame with its shape.
Only :attr:`DataShape.SIMPLE` and :attr:`DataShape.VERSIONED` are
supported for insert.
Returns
-------
Tuple[pl.DataFrame, DataShape]
Raises
------
ValueError
If :attr:`shape` is :attr:`DataShape.AUDIT` or
:attr:`DataShape.CORRECTED`.
"""
df = self._require_df()
if self._shape in (DataShape.AUDIT, DataShape.CORRECTED):
raise ValueError(
f"TimeSeries with shape {self._shape.value} cannot be inserted. "
f"Only SIMPLE and VERSIONED shapes are supported for insert."
)
assert self._shape is not None # df present implies shape present
return df, self._shape
[docs]
def to_pandas(self) -> pd.DataFrame:
"""Convert to a ``pandas.DataFrame``.
Restores the conventional index:
* ``SIMPLE`` — ``valid_time`` as index.
* ``VERSIONED`` — ``(knowledge_time, valid_time)`` MultiIndex.
* ``AUDIT`` — ``(knowledge_time, change_time, valid_time)`` MultiIndex.
* ``CORRECTED`` — ``(valid_time, change_time)`` MultiIndex.
"""
df = self._require_df().to_pandas()
# Polars converts Datetime("us", tz="UTC") to pandas datetime64[us, UTC]
# which is exactly what we want.
if self._shape == DataShape.SIMPLE:
return df.set_index("valid_time")
elif self._shape == DataShape.VERSIONED:
return df.set_index(["knowledge_time", "valid_time"])
elif self._shape == DataShape.AUDIT:
return df.set_index(["knowledge_time", "change_time", "valid_time"])
elif self._shape == DataShape.CORRECTED:
return df.set_index(["valid_time", "change_time"])
return df # unreachable, safe fallback
[docs]
def to_polars(self) -> pl.DataFrame:
"""Return the underlying ``polars.DataFrame``."""
return self._require_df()
[docs]
def to_list(self) -> dict:
"""Return the series as a column-oriented dict of lists.
Each key is a column name; each value is a Python list of that column's
values. Timestamps are Python ``datetime`` objects; null values are
``None``.
Example::
{"valid_time": [datetime(...), ...], "value": [1.0, None, 3.0, ...]}
"""
return self._require_df().to_dict(as_series=False)
[docs]
def to_numpy(self) -> dict[str, np.ndarray]:
"""Return the series as a dictionary of NumPy arrays.
Each column maps to a 1-D ``numpy.ndarray``. Timestamp columns become
``numpy.datetime64[us]`` values; null values become ``NaN`` or ``NaT``.
Requires ``numpy``. Install with: ``pip install numpy``.
"""
try:
import numpy as np # noqa: F401
except ImportError as e:
raise ImportError("numpy is required for to_numpy(). Install it with: pip install numpy") from e
df = self._require_df()
return {col: df[col].to_numpy(allow_copy=True) for col in df.columns}
[docs]
def to_pyarrow(self) -> pa.Table:
"""Return the series as a ``pyarrow.Table``.
All timestamp columns are Arrow ``timestamp[us, UTC]``.
Requires ``pyarrow``. Install with: ``pip install pyarrow``.
"""
try:
import pyarrow # noqa: F401
except ImportError as e:
raise ImportError("pyarrow is required for to_pyarrow(). Install it with: pip install pyarrow") from e
return self._require_df().to_arrow()
[docs]
def coverage_bar(self) -> CoverageBar:
"""Return a :class:`~timedatamodel.CoverageBar` showing value coverage.
``True`` = value present, ``False`` = null/missing.
In Jupyter the coverage bar renders as an SVG.
"""
from ._repr import CoverageBar
df = self._require_df()
mask = df["value"].is_not_null().to_list()
begin = df["valid_time"][0] if self.num_rows > 0 else None
end = df["valid_time"][-1] if self.num_rows > 0 else None
return CoverageBar([(self.name, mask)], begin, end)
# ------------------------------------------------------------------
# Data access helpers
# ------------------------------------------------------------------
[docs]
def head(self, n: int = 5) -> TimeSeries:
"""Return the first *n* rows as a new :class:`TimeSeries`."""
return self._clone(self._require_df().head(n))
[docs]
def tail(self, n: int = 5) -> TimeSeries:
"""Return the last *n* rows as a new :class:`TimeSeries`."""
return self._clone(self._require_df().tail(n))
# ------------------------------------------------------------------
# Unit conversion
# ------------------------------------------------------------------
[docs]
def convert_unit(self, target_unit: str) -> TimeSeries:
"""Return a new :class:`TimeSeries` with values converted to *target_unit*.
Uses the pint library for unit conversion. The ``unit`` metadata field
is updated to *target_unit*.
Parameters
----------
target_unit:
Target unit string understood by pint (e.g. ``"km/h"``, ``"kW"``).
Raises
------
ImportError
If pint is not installed.
pint.DimensionalityError
If the current unit and *target_unit* are dimensionally incompatible.
"""
try:
ureg = _get_pint_registry()
except ImportError as exc:
raise ImportError(
"Unit conversion requires the optional 'pint' dependency. "
"Install it with: pip install timedatamodel[pint]"
) from exc
factor = float(ureg.Quantity(1.0, self.unit).to(target_unit).magnitude)
new_df = self._require_df().with_columns(pl.col("value") * factor)
return self._clone(new_df, unit=target_unit)
# ------------------------------------------------------------------
# Internal clone helper
# ------------------------------------------------------------------
def _clone(self, new_df: pl.DataFrame | None = None, **overrides) -> TimeSeries:
"""Create a new :class:`TimeSeries` with *new_df* and the same metadata.
Any keyword in *overrides* replaces the corresponding metadata field.
Pass ``new_df=None`` to clone as a metadata-only instance.
"""
return TimeSeries(
new_df,
name=overrides.get("name", self.name),
description=overrides.get("description", self.description),
unit=overrides.get("unit", self.unit),
timezone=overrides.get("timezone", self.timezone),
frequency=overrides.get("frequency", self.frequency),
data_type=overrides.get("data_type", self.data_type),
timeseries_type=overrides.get("timeseries_type", self.timeseries_type),
)
# ------------------------------------------------------------------
# Metadata helpers
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Dunder
# ------------------------------------------------------------------
def __len__(self) -> int:
return self._df.height if self._df is not None else 0
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ingest_pandas_to_polars(df: pd.DataFrame) -> pl.DataFrame:
"""Ingest a pandas DataFrame into a ``pl.DataFrame``.
1. Flatten any temporal index levels into regular columns.
2. Normalize timestamp columns to UTC-aware pandas Series.
3. Convert to Polars and cast timestamp columns to
``pl.Datetime("us", time_zone="UTC")``.
"""
# ── 1. Flatten index ────────────────────────────────────────────────────
if isinstance(df.index, pd.MultiIndex):
levels_to_reset = [n for n in df.index.names if n in _TIME_COLS]
else:
levels_to_reset = [df.index.name] if df.index.name in _TIME_COLS else []
if levels_to_reset:
df = df.reset_index(level=levels_to_reset)
else:
df = df.copy(deep=False)
# ── 2. Ensure every timestamp column is UTC-aware ───────────────────────
for col in _TIME_COLS:
if col not in df.columns:
continue
s = df[col]
if not pd.api.types.is_datetime64_any_dtype(s):
warnings.warn(
f"Column '{col}' is not a datetime type; parsing and converting to UTC.",
UserWarning,
stacklevel=3,
)
df[col] = pd.to_datetime(s, utc=True)
continue
tz = s.dt.tz
if tz is None:
warnings.warn(
f"Column '{col}' has no timezone; assuming UTC.",
UserWarning,
stacklevel=3,
)
df[col] = s.dt.tz_localize("UTC")
elif str(tz) != "UTC":
df[col] = s.dt.tz_convert("UTC")
# else: already UTC — no allocation
# ── 3. Convert to Polars and cast timestamp columns ─────────────────────
polars_df = pl.from_pandas(df)
cast_exprs = [pl.col(c).cast(_TS_DTYPE) for c in _TIME_COLS if c in polars_df.columns]
if cast_exprs:
polars_df = polars_df.with_columns(cast_exprs)
return polars_df
def _infer_shape(df: pl.DataFrame) -> DataShape:
"""Infer :class:`DataShape` from the column names present in *df*."""
names = set(df.columns)
has_kt = "knowledge_time" in names
has_ct = "change_time" in names
if has_kt and has_ct:
return DataShape.AUDIT
if has_ct:
return DataShape.CORRECTED
if has_kt:
return DataShape.VERSIONED
return DataShape.SIMPLE
def _validate_table(df: pl.DataFrame, shape: DataShape) -> None:
"""Raise ``ValueError`` if required columns are missing or have wrong type."""
names = set(df.columns)
required = _REQUIRED_COLUMNS[shape]
missing = [c for c in required if c not in names]
if missing:
raise ValueError(f"DataFrame is missing required columns for shape {shape.value}: {missing}")
# Check timestamp columns have the right dtype
for col in _TIME_COLS:
if col not in names:
continue
dtype = df[col].dtype
if not isinstance(dtype, pl.Datetime):
raise TypeError(f"Column '{col}' must be a Polars Datetime type, got {dtype!r}")
if dtype.time_zone is None:
raise TypeError(f"Column '{col}' must be timezone-aware with time_zone='UTC', got time_zone=None")
if dtype.time_zone != "UTC":
raise TypeError(f"Column '{col}' must have time_zone='UTC', got time_zone={dtype.time_zone!r}")