Python SDK for Feast - an open source feature store for machine learning that manages features for both training and serving environments.
—
Data sources in Feast define how to connect to and read data from various storage systems and streaming platforms. Each data source type provides optimized access patterns and configuration options for different data infrastructure scenarios.
File-based data sources support local and remote file systems with various formats including Parquet, CSV, and Delta tables.
class FileSource:
def __init__(
self,
path: str,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
File-based data source for local or remote files.
Parameters:
- path: File path (local, S3, GCS, etc.)
- timestamp_field: Event timestamp column name
- created_timestamp_column: Created timestamp column name
- field_mapping: Map source column names to feature names
- date_partition_column: Column for date-based partitioning
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""Google BigQuery data sources provide scalable analytics and feature computation on Google Cloud Platform.
class BigQuerySource:
def __init__(
self,
table: str,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
query: Optional[str] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Google BigQuery data source.
Parameters:
- table: BigQuery table reference (project.dataset.table)
- timestamp_field: Event timestamp column name
- created_timestamp_column: Created timestamp column name
- field_mapping: Column name mappings
- date_partition_column: Date partition column
- query: Custom SQL query (alternative to table)
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""Amazon Redshift data sources enable feature computation on AWS data warehouse infrastructure.
class RedshiftSource:
def __init__(
self,
table: str,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
query: Optional[str] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Amazon Redshift data source.
Parameters:
- table: Redshift table reference (schema.table)
- timestamp_field: Event timestamp column name
- created_timestamp_column: Created timestamp column name
- field_mapping: Column name mappings
- date_partition_column: Date partition column
- query: Custom SQL query (alternative to table)
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""Snowflake data sources provide cloud data warehouse connectivity with advanced analytics capabilities.
class SnowflakeSource:
def __init__(
self,
table: str,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
query: Optional[str] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Snowflake data warehouse source.
Parameters:
- table: Snowflake table reference (database.schema.table)
- timestamp_field: Event timestamp column name
- created_timestamp_column: Created timestamp column name
- field_mapping: Column name mappings
- date_partition_column: Date partition column
- query: Custom SQL query (alternative to table)
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""Streaming data sources enable real-time feature updates from message brokers and streaming platforms.
class KafkaSource:
def __init__(
self,
kafka_bootstrap_servers: str,
message_format: StreamFormat,
topic: str,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
batch_source: Optional[DataSource] = None,
watermark_delay_threshold: Optional[timedelta] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Apache Kafka streaming data source.
Parameters:
- kafka_bootstrap_servers: Kafka broker connection string
- message_format: Message serialization format
- topic: Kafka topic name
- timestamp_field: Event timestamp field in messages
- created_timestamp_column: Created timestamp field
- field_mapping: Field name mappings
- batch_source: Associated batch source for historical data
- watermark_delay_threshold: Late data tolerance
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""
class KinesisSource:
def __init__(
self,
table: str,
region: str,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
batch_source: Optional[DataSource] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Amazon Kinesis streaming data source.
Parameters:
- table: Kinesis stream name
- region: AWS region
- timestamp_field: Event timestamp field
- created_timestamp_column: Created timestamp field
- field_mapping: Field name mappings
- batch_source: Associated batch source
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""Special data sources for real-time feature ingestion and request-time data incorporation.
class PushSource:
def __init__(
self,
name: str,
batch_source: Optional[DataSource] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Push-based data source for real-time feature ingestion.
Parameters:
- name: Push source name
- batch_source: Associated batch source for historical data
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""
class RequestSource:
def __init__(
self,
name: str,
schema: List[Field],
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = ""
):
"""
Request-time data source for on-demand features.
Parameters:
- name: Request source name
- schema: Schema of request-time fields
- description: Data source description
- tags: Metadata tags
- owner: Data source owner
"""Message formats for streaming data sources define how to deserialize streaming data.
class StreamFormat:
"""Abstract base class for stream message formats."""
class AvroFormat(StreamFormat):
def __init__(self, schema_json: str):
"""
Avro message format.
Parameters:
- schema_json: Avro schema as JSON string
"""
class JsonFormat(StreamFormat):
def __init__(self, schema_json: str = ""):
"""
JSON message format.
Parameters:
- schema_json: Optional JSON schema for validation
"""
class ProtoFormat(StreamFormat):
def __init__(self, class_path: str):
"""
Protocol Buffers message format.
Parameters:
- class_path: Protobuf class path
"""from feast import FileSource
# Parquet file source
driver_source = FileSource(
path="s3://feast-bucket/driver_features.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
description="Driver performance metrics"
)
# CSV file source with field mapping
customer_source = FileSource(
path="/data/customer_data.csv",
timestamp_field="ts",
field_mapping={
"customer_id": "customer",
"signup_ts": "created_timestamp"
},
description="Customer profile data"
)
# Delta table source with partitioning
transaction_source = FileSource(
path="s3://data-lake/transactions/",
timestamp_field="transaction_time",
date_partition_column="date",
description="Transaction history with date partitioning"
)from feast import BigQuerySource, RedshiftSource, SnowflakeSource
# BigQuery source with table reference
bq_source = BigQuerySource(
table="project.dataset.user_features",
timestamp_field="event_timestamp",
description="User behavioral features from BigQuery"
)
# BigQuery source with custom query
bq_query_source = BigQuerySource(
query="""
SELECT user_id, feature_1, feature_2, event_timestamp
FROM `project.dataset.raw_events`
WHERE event_type = 'conversion'
""",
timestamp_field="event_timestamp",
description="Conversion features computed via SQL"
)
# Redshift source
redshift_source = RedshiftSource(
table="analytics.user_metrics",
timestamp_field="created_at",
description="User metrics from Redshift warehouse"
)
# Snowflake source
snowflake_source = SnowflakeSource(
table="PROD.ANALYTICS.CUSTOMER_FEATURES",
timestamp_field="EVENT_TIMESTAMP",
description="Customer features from Snowflake"
)from feast import KafkaSource, KinesisSource
from feast.data_format import JsonFormat, AvroFormat
# Kafka source with JSON format
kafka_source = KafkaSource(
kafka_bootstrap_servers="localhost:9092",
message_format=JsonFormat(),
topic="user_events",
timestamp_field="event_time",
description="Real-time user events from Kafka"
)
# Kafka source with Avro format
avro_schema = """
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "long"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
"""
kafka_avro_source = KafkaSource(
kafka_bootstrap_servers="kafka-cluster:9092",
message_format=AvroFormat(schema_json=avro_schema),
topic="user_events_avro",
timestamp_field="timestamp",
description="User events in Avro format"
)
# Kinesis source
kinesis_source = KinesisSource(
table="user-activity-stream",
region="us-east-1",
timestamp_field="event_timestamp",
description="User activity from Kinesis stream"
)from feast import PushSource, RequestSource, Field, ValueType
# Push source for real-time feature updates
push_source = PushSource(
name="driver_location_push",
description="Real-time driver location updates"
)
# Request source for on-demand features
request_source = RequestSource(
name="ride_request_data",
schema=[
Field(name="pickup_lat", dtype=ValueType.DOUBLE),
Field(name="pickup_lon", dtype=ValueType.DOUBLE),
Field(name="dropoff_lat", dtype=ValueType.DOUBLE),
Field(name="dropoff_lon", dtype=ValueType.DOUBLE),
Field(name="requested_at", dtype=ValueType.UNIX_TIMESTAMP)
],
description="Request-time ride booking data"
)# Source with comprehensive metadata
production_source = BigQuerySource(
table="production.ml_features.customer_metrics",
timestamp_field="feature_timestamp",
created_timestamp_column="created_timestamp",
field_mapping={
"cust_id": "customer_id",
"signup_date": "created_date"
},
date_partition_column="feature_date",
description="Production customer metrics with full lineage",
tags={
"environment": "production",
"data_classification": "internal",
"update_frequency": "hourly",
"retention_days": "365"
},
owner="data-platform@company.com"
)
# Development source with different configuration
dev_source = FileSource(
path="./test_data/customer_features_sample.parquet",
timestamp_field="feature_timestamp",
description="Development sample data for testing",
tags={
"environment": "development",
"data_size": "1000_rows"
},
owner="ml-engineer@company.com"
)Install with Tessl CLI
npx tessl i tessl/pypi-feast