0
# Feature Store Management
1
2
The FeatureStore class serves as the primary interface for all feature store operations. It orchestrates feature definitions, data materialization, and feature retrieval for both training and serving scenarios.
3
4
## Capabilities
5
6
### Feature Store Initialization
7
8
Initialize a feature store instance from a repository configuration or directory path. The feature store manages all metadata, data sources, and serving infrastructure.
9
10
```python { .api }
11
class FeatureStore:
12
def __init__(self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, fs_yaml_file: Optional[Path] = None):
13
"""
14
Initialize a FeatureStore instance.
15
16
Parameters:
17
- repo_path: Path to feature repository directory containing feature_store.yaml
18
- config: RepoConfig object for programmatic configuration
19
- fs_yaml_file: Path to the feature_store.yaml file used to configure the feature store
20
21
At most one of fs_yaml_file and config can be set.
22
"""
23
```
24
25
### Feature Definition Management
26
27
Apply feature definitions (entities, feature views, feature services) to the feature store registry. This registers metadata and prepares the infrastructure for feature materialization.
28
29
```python { .api }
30
def apply(self, objects: List[Union[Entity, FeatureView, FeatureService, DataSource]]):
31
"""
32
Register feature definitions with the feature store.
33
34
Parameters:
35
- objects: List of feature objects to register (entities, feature views, etc.)
36
37
This method validates definitions, updates the registry, and provisions necessary infrastructure.
38
"""
39
```
40
41
### Historical Feature Retrieval
42
43
Retrieve historical features for model training with point-in-time correctness. This ensures no data leakage by only using features available at each entity's timestamp.
44
45
```python { .api }
46
def get_historical_features(
47
self,
48
entity_df: Optional[Union[pd.DataFrame, str]] = None,
49
features: Union[List[str], FeatureService] = [],
50
full_feature_names: bool = False,
51
start_date: Optional[datetime] = None,
52
end_date: Optional[datetime] = None
53
) -> RetrievalJob:
54
"""
55
Retrieve historical features for training.
56
57
Parameters:
58
- entity_df: DataFrame with entity keys and timestamps, or path to file
59
- features: List of feature references or FeatureService object
60
- full_feature_names: Whether to prefix feature names with feature view names
61
- start_date: Start date for feature retrieval (when entity_df is None)
62
- end_date: End date for feature retrieval (when entity_df is None)
63
64
Returns:
65
RetrievalJob that can be converted to DataFrame or Arrow table
66
"""
67
```
68
69
### Online Feature Retrieval
70
71
Retrieve features for real-time model inference with low latency. Features are served from the online store for immediate prediction needs.
72
73
```python { .api }
74
def get_online_features(
75
self,
76
features: Union[List[str], FeatureService],
77
entity_rows: Union[List[Dict[str, Any]], Mapping[str, Union[Sequence[Any], Sequence[Value], RepeatedValue]]],
78
full_feature_names: bool = False
79
) -> OnlineResponse:
80
"""
81
Retrieve features for online serving.
82
83
Parameters:
84
- features: List of feature references or FeatureService object
85
- entity_rows: List of entity key-value dictionaries or mapping of entity keys to value sequences
86
- full_feature_names: Whether to prefix feature names with feature view names
87
88
Returns:
89
OnlineResponse containing feature values
90
"""
91
```
92
93
### Feature Materialization
94
95
Materialize batch features from offline store to online store for serving. This process computes and stores the latest feature values for fast online access.
96
97
```python { .api }
98
def materialize(
99
self,
100
start_date: datetime,
101
end_date: datetime,
102
feature_views: Optional[List[str]] = None
103
):
104
"""
105
Materialize features to online store.
106
107
Parameters:
108
- start_date: Start of materialization time range
109
- end_date: End of materialization time range
110
- feature_views: Specific feature views to materialize (None for all)
111
"""
112
113
def materialize_incremental(
114
self,
115
end_date: datetime,
116
feature_views: Optional[List[str]] = None
117
):
118
"""
119
Incrementally materialize features since last materialization.
120
121
Parameters:
122
- end_date: End timestamp for incremental materialization
123
- feature_views: Specific feature views to materialize (None for all)
124
"""
125
```
126
127
### Feature Store Metadata
128
129
Query and inspect feature store metadata including registered objects and their configurations.
130
131
```python { .api }
132
def list_entities(self) -> List[Entity]:
133
"""List all registered entities."""
134
135
def list_feature_views(self) -> List[FeatureView]:
136
"""List all registered feature views."""
137
138
def list_feature_services(self) -> List[FeatureService]:
139
"""List all registered feature services."""
140
141
def get_entity(self, name: str) -> Entity:
142
"""Get entity by name."""
143
144
def get_feature_view(self, name: str) -> FeatureView:
145
"""Get feature view by name."""
146
147
def get_feature_service(self, name: str) -> FeatureService:
148
"""Get feature service by name."""
149
```
150
151
### Feature Server Operations
152
153
Start and manage the feature server for HTTP/gRPC-based feature serving in production environments.
154
155
```python { .api }
156
def serve(
157
self,
158
host: str = "localhost",
159
port: int = 6566,
160
type_: str = "http",
161
no_access_log: bool = False
162
):
163
"""
164
Start the feature server.
165
166
Parameters:
167
- host: Server host address
168
- port: Server port number
169
- type_: Server type ("http" or "grpc")
170
- no_access_log: Disable access logging
171
"""
172
173
def serve_ui(
174
self,
175
host: str = "localhost",
176
port: int = 8888,
177
get_registry_dump: bool = False
178
):
179
"""
180
Start the Feast Web UI server.
181
182
Parameters:
183
- host: Server host address
184
- port: Server port number
185
- get_registry_dump: Include registry dump in UI
186
"""
187
188
def serve_registry(
189
self,
190
host: str = "localhost",
191
port: int = 6570
192
):
193
"""
194
Start the registry server for remote registry access.
195
196
Parameters:
197
- host: Server host address
198
- port: Server port number
199
"""
200
```
201
202
### Permission Management
203
204
Manage access control permissions for feature store operations and resources.
205
206
```python { .api }
207
def list_permissions(self) -> List[Permission]:
208
"""List all registered permissions."""
209
210
def get_permission(self, name: str) -> Permission:
211
"""Get permission by name."""
212
```
213
214
### Project Management
215
216
Manage projects and multi-tenancy within the feature store.
217
218
```python { .api }
219
def list_projects(self) -> List[Project]:
220
"""List all available projects."""
221
222
def get_project(self, name: Optional[str]) -> Project:
223
"""Get project by name or current project if name is None."""
224
```
225
226
### Data Validation and Logging
227
228
Validate and manage logged feature data for monitoring and debugging.
229
230
```python { .api }
231
def write_logged_features(
232
self,
233
logs: Union[pa.Table, pd.DataFrame],
234
source: LoggingSource,
235
config: LoggingConfig
236
):
237
"""Write logged features to configured logging destination."""
238
239
def validate_logged_features(
240
self,
241
source: LoggingSource,
242
config: LoggingConfig,
243
reference: ValidationReference
244
) -> ValidationResult:
245
"""Validate logged features against reference dataset."""
246
247
def get_validation_reference(self, name: str) -> ValidationReference:
248
"""Get validation reference by name."""
249
250
def list_validation_references(self) -> List[ValidationReference]:
251
"""List all validation references."""
252
```
253
254
### Saved Datasets
255
256
Manage saved datasets for feature store operations.
257
258
```python { .api }
259
def list_saved_datasets(self, allow_cache: bool = True) -> List[SavedDataset]:
260
"""List all saved datasets."""
261
262
def create_saved_dataset(
263
self,
264
from_: Union[RetrievalJob, pd.DataFrame],
265
name: str,
266
storage: SavedDatasetStorage,
267
tags: Optional[Dict[str, str]] = None
268
) -> SavedDataset:
269
"""Create and register a new saved dataset."""
270
271
def get_saved_dataset(self, name: str) -> SavedDataset:
272
"""Get saved dataset by name."""
273
```
274
275
### Advanced Operations
276
277
Additional utility operations for feature store management.
278
279
```python { .api }
280
def plan(self, objects: List[Union[Entity, FeatureView, FeatureService]]) -> None:
281
"""Preview changes that would be applied to the feature store."""
282
283
def teardown(self):
284
"""Tear down feature store infrastructure."""
285
286
def push(
287
self,
288
push_source_name: str,
289
df: pd.DataFrame,
290
allow_registry_cache: bool = True
291
):
292
"""Push data to a PushSource."""
293
294
def write_to_online_store(
295
self,
296
feature_view_name: str,
297
df: Union[pd.DataFrame, pa.Table]
298
):
299
"""Write feature data directly to online store."""
300
301
def write_to_offline_store(
302
self,
303
feature_view_name: str,
304
df: Union[pd.DataFrame, pa.Table]
305
):
306
"""Write feature data directly to offline store."""
307
```
308
309
## Usage Examples
310
311
### Complete Feature Store Workflow
312
313
```python
314
from feast import FeatureStore, Entity, FeatureView, Field, FileSource, ValueType
315
from datetime import datetime, timedelta
316
import pandas as pd
317
318
# Initialize feature store
319
fs = FeatureStore(repo_path="./feature_repo")
320
321
# Define entities
322
driver = Entity(
323
name="driver",
324
value_type=ValueType.INT64,
325
description="Driver identifier"
326
)
327
328
# Define data source
329
driver_source = FileSource(
330
path="data/driver_stats.parquet",
331
timestamp_field="event_timestamp"
332
)
333
334
# Define feature view
335
driver_hourly_stats = FeatureView(
336
name="driver_hourly_stats",
337
entities=[driver],
338
ttl=timedelta(hours=1),
339
schema=[
340
Field(name="conv_rate", dtype=ValueType.FLOAT),
341
Field(name="acc_rate", dtype=ValueType.FLOAT),
342
Field(name="avg_daily_trips", dtype=ValueType.INT64)
343
],
344
source=driver_source
345
)
346
347
# Apply to feature store
348
fs.apply([driver, driver_hourly_stats])
349
350
# Materialize features
351
fs.materialize(
352
start_date=datetime(2023, 1, 1),
353
end_date=datetime(2023, 1, 31)
354
)
355
356
# Get training data
357
entity_df = pd.DataFrame({
358
"driver": [1001, 1002, 1003],
359
"event_timestamp": [
360
datetime(2023, 1, 15, 10, 0),
361
datetime(2023, 1, 15, 11, 0),
362
datetime(2023, 1, 15, 12, 0)
363
]
364
})
365
366
training_data = fs.get_historical_features(
367
entity_df=entity_df,
368
features=[
369
"driver_hourly_stats:conv_rate",
370
"driver_hourly_stats:acc_rate",
371
"driver_hourly_stats:avg_daily_trips"
372
]
373
).to_df()
374
375
# Get online features for serving
376
online_features = fs.get_online_features(
377
features=[
378
"driver_hourly_stats:conv_rate",
379
"driver_hourly_stats:acc_rate"
380
],
381
entity_rows=[
382
{"driver": 1001},
383
{"driver": 1002}
384
]
385
)
386
387
feature_dict = online_features.to_dict()
388
```
389
390
## Types
391
392
```python { .api }
393
class RetrievalJob:
394
def to_df(self) -> pd.DataFrame:
395
"""Convert retrieval job result to pandas DataFrame."""
396
397
def to_arrow(self) -> pa.Table:
398
"""Convert retrieval job result to Apache Arrow table."""
399
400
class OnlineResponse:
401
def to_dict(self) -> Dict[str, List[Any]]:
402
"""Convert online response to dictionary format."""
403
404
def to_df(self) -> pd.DataFrame:
405
"""Convert online response to pandas DataFrame."""
406
```