CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-feast

Python SDK for Feast - an open source feature store for machine learning that manages features for both training and serving environments.

Pending
Overview
Eval results
Files

feature-store.mddocs/

Feature Store Management

The FeatureStore class serves as the primary interface for all feature store operations. It orchestrates feature definitions, data materialization, and feature retrieval for both training and serving scenarios.

Capabilities

Feature Store Initialization

Initialize a feature store instance from a repository configuration or directory path. The feature store manages all metadata, data sources, and serving infrastructure.

class FeatureStore:
    def __init__(self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, fs_yaml_file: Optional[Path] = None):
        """
        Initialize a FeatureStore instance.
        
        Parameters:
        - repo_path: Path to feature repository directory containing feature_store.yaml
        - config: RepoConfig object for programmatic configuration
        - fs_yaml_file: Path to the feature_store.yaml file used to configure the feature store
        
        At most one of fs_yaml_file and config can be set.
        """

Feature Definition Management

Apply feature definitions (entities, feature views, feature services) to the feature store registry. This registers metadata and prepares the infrastructure for feature materialization.

def apply(self, objects: List[Union[Entity, FeatureView, FeatureService, DataSource]]):
    """
    Register feature definitions with the feature store.
    
    Parameters:
    - objects: List of feature objects to register (entities, feature views, etc.)
    
    This method validates definitions, updates the registry, and provisions necessary infrastructure.
    """

Historical Feature Retrieval

Retrieve historical features for model training with point-in-time correctness. This ensures no data leakage by only using features available at each entity's timestamp.

def get_historical_features(
    self,
    entity_df: Optional[Union[pd.DataFrame, str]] = None,
    features: Union[List[str], FeatureService] = [],
    full_feature_names: bool = False,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None
) -> RetrievalJob:
    """
    Retrieve historical features for training.
    
    Parameters:
    - entity_df: DataFrame with entity keys and timestamps, or path to file
    - features: List of feature references or FeatureService object
    - full_feature_names: Whether to prefix feature names with feature view names
    - start_date: Start date for feature retrieval (when entity_df is None)
    - end_date: End date for feature retrieval (when entity_df is None)
    
    Returns:
    RetrievalJob that can be converted to DataFrame or Arrow table
    """

Online Feature Retrieval

Retrieve features for real-time model inference with low latency. Features are served from the online store for immediate prediction needs.

def get_online_features(
    self,
    features: Union[List[str], FeatureService],
    entity_rows: Union[List[Dict[str, Any]], Mapping[str, Union[Sequence[Any], Sequence[Value], RepeatedValue]]],
    full_feature_names: bool = False
) -> OnlineResponse:
    """
    Retrieve features for online serving.
    
    Parameters:
    - features: List of feature references or FeatureService object
    - entity_rows: List of entity key-value dictionaries or mapping of entity keys to value sequences
    - full_feature_names: Whether to prefix feature names with feature view names
    
    Returns:
    OnlineResponse containing feature values
    """

Feature Materialization

Materialize batch features from offline store to online store for serving. This process computes and stores the latest feature values for fast online access.

def materialize(
    self,
    start_date: datetime,
    end_date: datetime,
    feature_views: Optional[List[str]] = None
):
    """
    Materialize features to online store.
    
    Parameters:
    - start_date: Start of materialization time range
    - end_date: End of materialization time range  
    - feature_views: Specific feature views to materialize (None for all)
    """

def materialize_incremental(
    self,
    end_date: datetime,
    feature_views: Optional[List[str]] = None
):
    """
    Incrementally materialize features since last materialization.
    
    Parameters:
    - end_date: End timestamp for incremental materialization
    - feature_views: Specific feature views to materialize (None for all)
    """

Feature Store Metadata

Query and inspect feature store metadata including registered objects and their configurations.

def list_entities(self) -> List[Entity]:
    """List all registered entities."""

def list_feature_views(self) -> List[FeatureView]:
    """List all registered feature views."""

def list_feature_services(self) -> List[FeatureService]:
    """List all registered feature services."""

def get_entity(self, name: str) -> Entity:
    """Get entity by name."""

def get_feature_view(self, name: str) -> FeatureView:
    """Get feature view by name."""

def get_feature_service(self, name: str) -> FeatureService:
    """Get feature service by name."""

Feature Server Operations

Start and manage the feature server for HTTP/gRPC-based feature serving in production environments.

def serve(
    self,
    host: str = "localhost",
    port: int = 6566,
    type_: str = "http",
    no_access_log: bool = False
):
    """
    Start the feature server.
    
    Parameters:
    - host: Server host address
    - port: Server port number
    - type_: Server type ("http" or "grpc")
    - no_access_log: Disable access logging
    """

def serve_ui(
    self,
    host: str = "localhost", 
    port: int = 8888,
    get_registry_dump: bool = False
):
    """
    Start the Feast Web UI server.
    
    Parameters:
    - host: Server host address
    - port: Server port number  
    - get_registry_dump: Include registry dump in UI
    """

def serve_registry(
    self,
    host: str = "localhost",
    port: int = 6570
):
    """
    Start the registry server for remote registry access.
    
    Parameters:
    - host: Server host address
    - port: Server port number
    """

Permission Management

Manage access control permissions for feature store operations and resources.

def list_permissions(self) -> List[Permission]:
    """List all registered permissions."""

def get_permission(self, name: str) -> Permission:
    """Get permission by name."""

Project Management

Manage projects and multi-tenancy within the feature store.

def list_projects(self) -> List[Project]:
    """List all available projects."""

def get_project(self, name: Optional[str]) -> Project:
    """Get project by name or current project if name is None."""

Data Validation and Logging

Validate and manage logged feature data for monitoring and debugging.

def write_logged_features(
    self,
    logs: Union[pa.Table, pd.DataFrame],
    source: LoggingSource,
    config: LoggingConfig
):
    """Write logged features to configured logging destination."""

def validate_logged_features(
    self,
    source: LoggingSource,
    config: LoggingConfig,
    reference: ValidationReference
) -> ValidationResult:
    """Validate logged features against reference dataset."""

def get_validation_reference(self, name: str) -> ValidationReference:
    """Get validation reference by name."""

def list_validation_references(self) -> List[ValidationReference]:
    """List all validation references."""

Saved Datasets

Manage saved datasets for feature store operations.

def list_saved_datasets(self, allow_cache: bool = True) -> List[SavedDataset]:
    """List all saved datasets."""

def create_saved_dataset(
    self,
    from_: Union[RetrievalJob, pd.DataFrame],
    name: str,
    storage: SavedDatasetStorage,
    tags: Optional[Dict[str, str]] = None
) -> SavedDataset:
    """Create and register a new saved dataset."""

def get_saved_dataset(self, name: str) -> SavedDataset:
    """Get saved dataset by name."""

Advanced Operations

Additional utility operations for feature store management.

def plan(self, objects: List[Union[Entity, FeatureView, FeatureService]]) -> None:
    """Preview changes that would be applied to the feature store."""

def teardown(self):
    """Tear down feature store infrastructure."""

def push(
    self,
    push_source_name: str,
    df: pd.DataFrame,
    allow_registry_cache: bool = True
):
    """Push data to a PushSource."""

def write_to_online_store(
    self,
    feature_view_name: str,
    df: Union[pd.DataFrame, pa.Table]
):
    """Write feature data directly to online store."""

def write_to_offline_store(
    self,
    feature_view_name: str,
    df: Union[pd.DataFrame, pa.Table]
):
    """Write feature data directly to offline store."""

Usage Examples

Complete Feature Store Workflow

from feast import FeatureStore, Entity, FeatureView, Field, FileSource, ValueType
from datetime import datetime, timedelta
import pandas as pd

# Initialize feature store
fs = FeatureStore(repo_path="./feature_repo")

# Define entities
driver = Entity(
    name="driver",
    value_type=ValueType.INT64,
    description="Driver identifier"
)

# Define data source
driver_source = FileSource(
    path="data/driver_stats.parquet", 
    timestamp_field="event_timestamp"
)

# Define feature view
driver_hourly_stats = FeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    ttl=timedelta(hours=1),
    schema=[
        Field(name="conv_rate", dtype=ValueType.FLOAT),
        Field(name="acc_rate", dtype=ValueType.FLOAT),
        Field(name="avg_daily_trips", dtype=ValueType.INT64)
    ],
    source=driver_source
)

# Apply to feature store
fs.apply([driver, driver_hourly_stats])

# Materialize features
fs.materialize(
    start_date=datetime(2023, 1, 1),
    end_date=datetime(2023, 1, 31)
)

# Get training data
entity_df = pd.DataFrame({
    "driver": [1001, 1002, 1003],
    "event_timestamp": [
        datetime(2023, 1, 15, 10, 0),
        datetime(2023, 1, 15, 11, 0),
        datetime(2023, 1, 15, 12, 0)
    ]
})

training_data = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate", 
        "driver_hourly_stats:avg_daily_trips"
    ]
).to_df()

# Get online features for serving
online_features = fs.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate"
    ],
    entity_rows=[
        {"driver": 1001},
        {"driver": 1002}
    ]
)

feature_dict = online_features.to_dict()

Types

class RetrievalJob:
    def to_df(self) -> pd.DataFrame:
        """Convert retrieval job result to pandas DataFrame."""
        
    def to_arrow(self) -> pa.Table:
        """Convert retrieval job result to Apache Arrow table."""

class OnlineResponse:
    def to_dict(self) -> Dict[str, List[Any]]:
        """Convert online response to dictionary format."""
        
    def to_df(self) -> pd.DataFrame:
        """Convert online response to pandas DataFrame."""

Install with Tessl CLI

npx tessl i tessl/pypi-feast

docs

cli-operations.md

data-sources.md

entities.md

feature-store.md

feature-views.md

index.md

vector-store.md

tile.json