Hierarchical Methods Time series forecasting
—
Utilities for creating hierarchical data structures from bottom-level time series data. These functions handle aggregation across multiple dimensions, create summing matrices, and prepare data in the format required by hierarchical reconciliation methods.
Main function for creating hierarchical structures from bottom-level time series by aggregating across categorical dimensions.
def aggregate(
df: Frame,
spec: list[list[str]],
exog_vars: Optional[dict[str, Union[str, list[str]]]] = None,
sparse_s: bool = False,
id_col: str = 'unique_id',
time_col: str = 'ds',
id_time_col: Optional[str] = None,
target_cols: Sequence[str] = ('y',)
) -> tuple[FrameT, FrameT, dict]:
"""
Create hierarchical structure from bottom-level time series.
Parameters:
- df: DataFrame with bottom-level time series data
Must contain id_col, time_col, and target_cols
- spec: list of aggregation specifications
Each inner list defines groupings for that level
Example: [['A', 'B'], ['C', 'D']] creates two aggregation levels
- exog_vars: dict mapping exogenous variable names to aggregation functions
Example: {'price': 'mean', 'volume': 'sum'}
- sparse_s: bool, whether to return sparse summing matrix for memory efficiency
- id_col: str, name of unique identifier column
- time_col: str, name of time column
- id_time_col: str, temporal hierarchy identifier (for temporal aggregation)
- target_cols: tuple of target variable column names
Returns:
- Y_df: DataFrame with hierarchically structured series
- S_df: DataFrame representation of summing matrix (or sparse matrix if sparse_s=True)
- tags: dict mapping hierarchy level names to series indices
"""Function for creating temporal hierarchies by aggregating time series at different frequencies.
def aggregate_temporal(
df: Frame,
spec: dict[str, int],
exog_vars: Optional[dict[str, Union[str, list[str]]]] = None,
sparse_s: bool = False,
id_col: str = 'unique_id',
time_col: str = 'ds',
id_time_col: str = 'temporal_id',
target_cols: Sequence[str] = ('y',),
aggregation_type: str = 'local'
) -> tuple[FrameT, FrameT, dict]:
"""
Create temporal hierarchy from time series data.
Parameters:
- df: DataFrame with time series data at base frequency
- spec: dict mapping temporal level names to aggregation frequencies
Example: {'Monthly': 12, 'Quarterly': 4, 'Annual': 1}
- exog_vars: dict of exogenous variables and their aggregation functions
- sparse_s: bool, return sparse summing matrix
- id_col: str, unique identifier column name
- time_col: str, time column name
- id_time_col: str, temporal hierarchy identifier column name
- target_cols: tuple of target variable names
- aggregation_type: str, type of temporal aggregation ('local' or 'global')
Returns:
- Y_df: DataFrame with temporal hierarchy
- S_df: Temporal summing matrix
- tags: dict mapping temporal levels to indices
"""Utility for creating future timestamp dataframes for forecasting.
def make_future_dataframe(
df: Frame,
freq: Union[str, int],
h: int,
id_col: str = 'unique_id',
time_col: str = 'ds'
) -> FrameT:
"""
Create dataframe with future timestamps for forecasting.
Parameters:
- df: DataFrame with historical time series data
- freq: str, frequency string (e.g., 'D', 'M', 'Q', 'Y')
- h: int, forecast horizon (number of periods ahead)
- id_col: str, unique identifier column name
- time_col: str, time column name
Returns:
DataFrame with future timestamps for each series
"""Function for generating tags that combine cross-sectional and temporal hierarchies.
def get_cross_temporal_tags(
df: pd.DataFrame,
tags_cs: dict,
tags_te: dict,
sep: str = '//',
id_col: str = 'unique_id',
id_time_col: str = 'temporal_id',
cross_temporal_id_col: str = 'cross_temporal_id'
) -> tuple[pd.DataFrame, dict]:
"""
Generate cross-temporal hierarchy tags.
Parameters:
- df: DataFrame with cross-temporal data
- tags_cs: dict with cross-sectional hierarchy tags
- tags_te: dict with temporal hierarchy tags
- sep: str, separator for combining cross-sectional and temporal identifiers
- id_col: str, cross-sectional identifier column
- id_time_col: str, temporal identifier column
- cross_temporal_id_col: str, combined identifier column name
Returns:
- Updated DataFrame with cross-temporal identifiers
- Combined tags dictionary for cross-temporal hierarchy
"""Utility function to check if a hierarchy structure is strictly hierarchical.
def is_strictly_hierarchical(S: pd.DataFrame, tags: dict) -> bool:
"""
Check if hierarchy structure is strictly hierarchical.
Parameters:
- S: summing matrix DataFrame
- tags: hierarchy tags dictionary
Returns:
bool indicating whether structure is strictly hierarchical
"""import pandas as pd
from hierarchicalforecast.utils import aggregate
# Bottom-level data
df = pd.DataFrame({
'unique_id': ['A', 'A', 'B', 'B', 'C', 'C', 'D', 'D'],
'ds': pd.date_range('2020-01-01', periods=2, freq='D').tolist() * 4,
'y': [100, 110, 200, 220, 150, 160, 180, 190],
'category': ['X', 'X', 'X', 'X', 'Y', 'Y', 'Y', 'Y'],
'region': ['North', 'North', 'North', 'North', 'South', 'South', 'South', 'South']
})
# Define hierarchy specification
spec = [
['A', 'B', 'C', 'D'], # Bottom level (no aggregation)
['category'], # Aggregate by category
['region'], # Aggregate by region
]
# Create hierarchical structure
Y_df, S_df, tags = aggregate(df, spec)
print("Hierarchical series:")
print(Y_df.head())
print("\nHierarchy tags:")
print(tags)from hierarchicalforecast.utils import aggregate_temporal
# Daily data to be aggregated temporally
daily_df = pd.DataFrame({
'unique_id': ['series1'] * 365,
'ds': pd.date_range('2020-01-01', periods=365, freq='D'),
'y': np.random.randn(365).cumsum() + 100
})
# Define temporal aggregation specification
temporal_spec = {
'Daily': 1, # Base frequency
'Weekly': 7, # Aggregate every 7 days
'Monthly': 30, # Aggregate every 30 days
'Quarterly': 90 # Aggregate every 90 days
}
# Create temporal hierarchy
Y_temporal, S_temporal, tags_temporal = aggregate_temporal(
daily_df,
temporal_spec
)# Data with exogenous variables
df_with_exog = pd.DataFrame({
'unique_id': ['A', 'A', 'B', 'B'],
'ds': pd.date_range('2020-01-01', periods=2, freq='D').tolist() * 2,
'y': [100, 110, 200, 220],
'price': [10.5, 10.8, 12.0, 12.3],
'volume': [1000, 1100, 2000, 2200]
})
# Specify how to aggregate exogenous variables
exog_aggregation = {
'price': 'mean', # Average price across aggregated series
'volume': 'sum' # Sum volume across aggregated series
}
spec = [['A', 'B']] # Simple aggregation
Y_df, S_df, tags = aggregate(
df_with_exog,
spec,
exog_vars=exog_aggregation
)# For very large hierarchies, use sparse matrices
Y_df_sparse, S_sparse, tags_sparse = aggregate(
large_dataset,
complex_spec,
sparse_s=True # Returns scipy.sparse matrix for S
)
# S_sparse will be a scipy sparse matrix instead of DataFrame
print(f"Sparse matrix shape: {S_sparse.shape}")
print(f"Non-zero elements: {S_sparse.nnz}")from hierarchicalforecast.utils import make_future_dataframe
# Create future timestamps for forecasting
future_df = make_future_dataframe(
df=historical_data,
freq='D', # Daily frequency
h=30, # 30 days ahead
id_col='unique_id',
time_col='ds'
)
print("Future timestamps:")
print(future_df.head())from hierarchicalforecast.utils import get_cross_temporal_tags
# First create cross-sectional hierarchy
Y_cs, S_cs, tags_cs = aggregate(df, cross_sectional_spec)
# Then create temporal hierarchy
Y_te, S_te, tags_te = aggregate_temporal(Y_cs, temporal_spec)
# Combine them
Y_cross_temp, tags_cross_temp = get_cross_temporal_tags(
df=Y_te,
tags_cs=tags_cs,
tags_te=tags_te,
sep='//'
)from hierarchicalforecast.utils import is_strictly_hierarchical
# Check if hierarchy is strictly hierarchical
is_strict = is_strictly_hierarchical(S_df, tags)
print(f"Strictly hierarchical: {is_strict}")Utility functions for converting prediction intervals and samples to different output formats.
def level_to_outputs(level: list[int]) -> list[str]:
"""
Convert confidence levels to output column names.
Parameters:
- level: list of confidence levels (e.g., [80, 95])
Returns:
List of column name strings for low and high bounds
"""
def quantiles_to_outputs(quantiles: list[float]) -> list[str]:
"""
Convert quantiles to output column names.
Parameters:
- quantiles: list of quantile levels (e.g., [0.1, 0.5, 0.9])
Returns:
List of quantile column name strings
"""
def samples_to_quantiles_df(
samples: np.ndarray,
unique_ids: list,
dates: list,
quantiles: list[float],
id_col: str = 'unique_id',
time_col: str = 'ds'
) -> pd.DataFrame:
"""
Transform samples array to quantile DataFrame.
Parameters:
- samples: array of forecast samples
- unique_ids: list of series identifiers
- dates: list of forecast dates
- quantiles: list of quantile levels to compute
- id_col: identifier column name
- time_col: time column name
Returns:
DataFrame with quantile columns
"""Install with Tessl CLI
npx tessl i tessl/pypi-hierarchicalforecast