0
# Data Sources
1
2
Data sources in Feast define how to connect to and read data from various storage systems and streaming platforms. Each data source type provides optimized access patterns and configuration options for different data infrastructure scenarios.
3
4
## Capabilities
5
6
### File-Based Data Sources
7
8
File-based data sources support local and remote file systems with various formats including Parquet, CSV, and Delta tables.
9
10
```python { .api }
11
class FileSource:
12
def __init__(
13
self,
14
path: str,
15
timestamp_field: Optional[str] = None,
16
created_timestamp_column: Optional[str] = None,
17
field_mapping: Optional[Dict[str, str]] = None,
18
date_partition_column: Optional[str] = None,
19
description: str = "",
20
tags: Optional[Dict[str, str]] = None,
21
owner: str = ""
22
):
23
"""
24
File-based data source for local or remote files.
25
26
Parameters:
27
- path: File path (local, S3, GCS, etc.)
28
- timestamp_field: Event timestamp column name
29
- created_timestamp_column: Created timestamp column name
30
- field_mapping: Map source column names to feature names
31
- date_partition_column: Column for date-based partitioning
32
- description: Data source description
33
- tags: Metadata tags
34
- owner: Data source owner
35
"""
36
```
37
38
### BigQuery Data Sources
39
40
Google BigQuery data sources provide scalable analytics and feature computation on Google Cloud Platform.
41
42
```python { .api }
43
class BigQuerySource:
44
def __init__(
45
self,
46
table: str,
47
timestamp_field: Optional[str] = None,
48
created_timestamp_column: Optional[str] = None,
49
field_mapping: Optional[Dict[str, str]] = None,
50
date_partition_column: Optional[str] = None,
51
query: Optional[str] = None,
52
description: str = "",
53
tags: Optional[Dict[str, str]] = None,
54
owner: str = ""
55
):
56
"""
57
Google BigQuery data source.
58
59
Parameters:
60
- table: BigQuery table reference (project.dataset.table)
61
- timestamp_field: Event timestamp column name
62
- created_timestamp_column: Created timestamp column name
63
- field_mapping: Column name mappings
64
- date_partition_column: Date partition column
65
- query: Custom SQL query (alternative to table)
66
- description: Data source description
67
- tags: Metadata tags
68
- owner: Data source owner
69
"""
70
```
71
72
### Redshift Data Sources
73
74
Amazon Redshift data sources enable feature computation on AWS data warehouse infrastructure.
75
76
```python { .api }
77
class RedshiftSource:
78
def __init__(
79
self,
80
table: str,
81
timestamp_field: Optional[str] = None,
82
created_timestamp_column: Optional[str] = None,
83
field_mapping: Optional[Dict[str, str]] = None,
84
date_partition_column: Optional[str] = None,
85
query: Optional[str] = None,
86
description: str = "",
87
tags: Optional[Dict[str, str]] = None,
88
owner: str = ""
89
):
90
"""
91
Amazon Redshift data source.
92
93
Parameters:
94
- table: Redshift table reference (schema.table)
95
- timestamp_field: Event timestamp column name
96
- created_timestamp_column: Created timestamp column name
97
- field_mapping: Column name mappings
98
- date_partition_column: Date partition column
99
- query: Custom SQL query (alternative to table)
100
- description: Data source description
101
- tags: Metadata tags
102
- owner: Data source owner
103
"""
104
```
105
106
### Snowflake Data Sources
107
108
Snowflake data sources provide cloud data warehouse connectivity with advanced analytics capabilities.
109
110
```python { .api }
111
class SnowflakeSource:
112
def __init__(
113
self,
114
table: str,
115
timestamp_field: Optional[str] = None,
116
created_timestamp_column: Optional[str] = None,
117
field_mapping: Optional[Dict[str, str]] = None,
118
date_partition_column: Optional[str] = None,
119
query: Optional[str] = None,
120
description: str = "",
121
tags: Optional[Dict[str, str]] = None,
122
owner: str = ""
123
):
124
"""
125
Snowflake data warehouse source.
126
127
Parameters:
128
- table: Snowflake table reference (database.schema.table)
129
- timestamp_field: Event timestamp column name
130
- created_timestamp_column: Created timestamp column name
131
- field_mapping: Column name mappings
132
- date_partition_column: Date partition column
133
- query: Custom SQL query (alternative to table)
134
- description: Data source description
135
- tags: Metadata tags
136
- owner: Data source owner
137
"""
138
```
139
140
### Streaming Data Sources
141
142
Streaming data sources enable real-time feature updates from message brokers and streaming platforms.
143
144
```python { .api }
145
class KafkaSource:
146
def __init__(
147
self,
148
kafka_bootstrap_servers: str,
149
message_format: StreamFormat,
150
topic: str,
151
timestamp_field: Optional[str] = None,
152
created_timestamp_column: Optional[str] = None,
153
field_mapping: Optional[Dict[str, str]] = None,
154
batch_source: Optional[DataSource] = None,
155
watermark_delay_threshold: Optional[timedelta] = None,
156
description: str = "",
157
tags: Optional[Dict[str, str]] = None,
158
owner: str = ""
159
):
160
"""
161
Apache Kafka streaming data source.
162
163
Parameters:
164
- kafka_bootstrap_servers: Kafka broker connection string
165
- message_format: Message serialization format
166
- topic: Kafka topic name
167
- timestamp_field: Event timestamp field in messages
168
- created_timestamp_column: Created timestamp field
169
- field_mapping: Field name mappings
170
- batch_source: Associated batch source for historical data
171
- watermark_delay_threshold: Late data tolerance
172
- description: Data source description
173
- tags: Metadata tags
174
- owner: Data source owner
175
"""
176
177
class KinesisSource:
178
def __init__(
179
self,
180
table: str,
181
region: str,
182
timestamp_field: Optional[str] = None,
183
created_timestamp_column: Optional[str] = None,
184
field_mapping: Optional[Dict[str, str]] = None,
185
batch_source: Optional[DataSource] = None,
186
description: str = "",
187
tags: Optional[Dict[str, str]] = None,
188
owner: str = ""
189
):
190
"""
191
Amazon Kinesis streaming data source.
192
193
Parameters:
194
- table: Kinesis stream name
195
- region: AWS region
196
- timestamp_field: Event timestamp field
197
- created_timestamp_column: Created timestamp field
198
- field_mapping: Field name mappings
199
- batch_source: Associated batch source
200
- description: Data source description
201
- tags: Metadata tags
202
- owner: Data source owner
203
"""
204
```
205
206
### Push and Request Sources
207
208
Special data sources for real-time feature ingestion and request-time data incorporation.
209
210
```python { .api }
211
class PushSource:
212
def __init__(
213
self,
214
name: str,
215
batch_source: Optional[DataSource] = None,
216
description: str = "",
217
tags: Optional[Dict[str, str]] = None,
218
owner: str = ""
219
):
220
"""
221
Push-based data source for real-time feature ingestion.
222
223
Parameters:
224
- name: Push source name
225
- batch_source: Associated batch source for historical data
226
- description: Data source description
227
- tags: Metadata tags
228
- owner: Data source owner
229
"""
230
231
class RequestSource:
232
def __init__(
233
self,
234
name: str,
235
schema: List[Field],
236
description: str = "",
237
tags: Optional[Dict[str, str]] = None,
238
owner: str = ""
239
):
240
"""
241
Request-time data source for on-demand features.
242
243
Parameters:
244
- name: Request source name
245
- schema: Schema of request-time fields
246
- description: Data source description
247
- tags: Metadata tags
248
- owner: Data source owner
249
"""
250
```
251
252
## Stream Formats
253
254
Message formats for streaming data sources define how to deserialize streaming data.
255
256
```python { .api }
257
class StreamFormat:
258
"""Abstract base class for stream message formats."""
259
260
class AvroFormat(StreamFormat):
261
def __init__(self, schema_json: str):
262
"""
263
Avro message format.
264
265
Parameters:
266
- schema_json: Avro schema as JSON string
267
"""
268
269
class JsonFormat(StreamFormat):
270
def __init__(self, schema_json: str = ""):
271
"""
272
JSON message format.
273
274
Parameters:
275
- schema_json: Optional JSON schema for validation
276
"""
277
278
class ProtoFormat(StreamFormat):
279
def __init__(self, class_path: str):
280
"""
281
Protocol Buffers message format.
282
283
Parameters:
284
- class_path: Protobuf class path
285
"""
286
```
287
288
## Usage Examples
289
290
### File Data Sources
291
292
```python
293
from feast import FileSource
294
295
# Parquet file source
296
driver_source = FileSource(
297
path="s3://feast-bucket/driver_features.parquet",
298
timestamp_field="event_timestamp",
299
created_timestamp_column="created_timestamp",
300
description="Driver performance metrics"
301
)
302
303
# CSV file source with field mapping
304
customer_source = FileSource(
305
path="/data/customer_data.csv",
306
timestamp_field="ts",
307
field_mapping={
308
"customer_id": "customer",
309
"signup_ts": "created_timestamp"
310
},
311
description="Customer profile data"
312
)
313
314
# Delta table source with partitioning
315
transaction_source = FileSource(
316
path="s3://data-lake/transactions/",
317
timestamp_field="transaction_time",
318
date_partition_column="date",
319
description="Transaction history with date partitioning"
320
)
321
```
322
323
### Cloud Data Warehouse Sources
324
325
```python
326
from feast import BigQuerySource, RedshiftSource, SnowflakeSource
327
328
# BigQuery source with table reference
329
bq_source = BigQuerySource(
330
table="project.dataset.user_features",
331
timestamp_field="event_timestamp",
332
description="User behavioral features from BigQuery"
333
)
334
335
# BigQuery source with custom query
336
bq_query_source = BigQuerySource(
337
query="""
338
SELECT user_id, feature_1, feature_2, event_timestamp
339
FROM `project.dataset.raw_events`
340
WHERE event_type = 'conversion'
341
""",
342
timestamp_field="event_timestamp",
343
description="Conversion features computed via SQL"
344
)
345
346
# Redshift source
347
redshift_source = RedshiftSource(
348
table="analytics.user_metrics",
349
timestamp_field="created_at",
350
description="User metrics from Redshift warehouse"
351
)
352
353
# Snowflake source
354
snowflake_source = SnowflakeSource(
355
table="PROD.ANALYTICS.CUSTOMER_FEATURES",
356
timestamp_field="EVENT_TIMESTAMP",
357
description="Customer features from Snowflake"
358
)
359
```
360
361
### Streaming Data Sources
362
363
```python
364
from feast import KafkaSource, KinesisSource
365
from feast.data_format import JsonFormat, AvroFormat
366
367
# Kafka source with JSON format
368
kafka_source = KafkaSource(
369
kafka_bootstrap_servers="localhost:9092",
370
message_format=JsonFormat(),
371
topic="user_events",
372
timestamp_field="event_time",
373
description="Real-time user events from Kafka"
374
)
375
376
# Kafka source with Avro format
377
avro_schema = """
378
{
379
"type": "record",
380
"name": "UserEvent",
381
"fields": [
382
{"name": "user_id", "type": "long"},
383
{"name": "event_type", "type": "string"},
384
{"name": "timestamp", "type": "long"}
385
]
386
}
387
"""
388
389
kafka_avro_source = KafkaSource(
390
kafka_bootstrap_servers="kafka-cluster:9092",
391
message_format=AvroFormat(schema_json=avro_schema),
392
topic="user_events_avro",
393
timestamp_field="timestamp",
394
description="User events in Avro format"
395
)
396
397
# Kinesis source
398
kinesis_source = KinesisSource(
399
table="user-activity-stream",
400
region="us-east-1",
401
timestamp_field="event_timestamp",
402
description="User activity from Kinesis stream"
403
)
404
```
405
406
### Push and Request Sources
407
408
```python
409
from feast import PushSource, RequestSource, Field, ValueType
410
411
# Push source for real-time feature updates
412
push_source = PushSource(
413
name="driver_location_push",
414
description="Real-time driver location updates"
415
)
416
417
# Request source for on-demand features
418
request_source = RequestSource(
419
name="ride_request_data",
420
schema=[
421
Field(name="pickup_lat", dtype=ValueType.DOUBLE),
422
Field(name="pickup_lon", dtype=ValueType.DOUBLE),
423
Field(name="dropoff_lat", dtype=ValueType.DOUBLE),
424
Field(name="dropoff_lon", dtype=ValueType.DOUBLE),
425
Field(name="requested_at", dtype=ValueType.UNIX_TIMESTAMP)
426
],
427
description="Request-time ride booking data"
428
)
429
```
430
431
### Data Source Configuration Patterns
432
433
```python
434
# Source with comprehensive metadata
435
production_source = BigQuerySource(
436
table="production.ml_features.customer_metrics",
437
timestamp_field="feature_timestamp",
438
created_timestamp_column="created_timestamp",
439
field_mapping={
440
"cust_id": "customer_id",
441
"signup_date": "created_date"
442
},
443
date_partition_column="feature_date",
444
description="Production customer metrics with full lineage",
445
tags={
446
"environment": "production",
447
"data_classification": "internal",
448
"update_frequency": "hourly",
449
"retention_days": "365"
450
},
451
owner="data-platform@company.com"
452
)
453
454
# Development source with different configuration
455
dev_source = FileSource(
456
path="./test_data/customer_features_sample.parquet",
457
timestamp_field="feature_timestamp",
458
description="Development sample data for testing",
459
tags={
460
"environment": "development",
461
"data_size": "1000_rows"
462
},
463
owner="ml-engineer@company.com"
464
)
465
```