or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-operations.mddata-sources.mdentities.mdfeature-store.mdfeature-views.mdindex.mdvector-store.md

data-sources.mddocs/

0

# Data Sources

1

2

Data sources in Feast define how to connect to and read data from various storage systems and streaming platforms. Each data source type provides optimized access patterns and configuration options for different data infrastructure scenarios.

3

4

## Capabilities

5

6

### File-Based Data Sources

7

8

File-based data sources support local and remote file systems with various formats including Parquet, CSV, and Delta tables.

9

10

```python { .api }

11

class FileSource:

12

def __init__(

13

self,

14

path: str,

15

timestamp_field: Optional[str] = None,

16

created_timestamp_column: Optional[str] = None,

17

field_mapping: Optional[Dict[str, str]] = None,

18

date_partition_column: Optional[str] = None,

19

description: str = "",

20

tags: Optional[Dict[str, str]] = None,

21

owner: str = ""

22

):

23

"""

24

File-based data source for local or remote files.

25

26

Parameters:

27

- path: File path (local, S3, GCS, etc.)

28

- timestamp_field: Event timestamp column name

29

- created_timestamp_column: Created timestamp column name

30

- field_mapping: Map source column names to feature names

31

- date_partition_column: Column for date-based partitioning

32

- description: Data source description

33

- tags: Metadata tags

34

- owner: Data source owner

35

"""

36

```

37

38

### BigQuery Data Sources

39

40

Google BigQuery data sources provide scalable analytics and feature computation on Google Cloud Platform.

41

42

```python { .api }

43

class BigQuerySource:

44

def __init__(

45

self,

46

table: str,

47

timestamp_field: Optional[str] = None,

48

created_timestamp_column: Optional[str] = None,

49

field_mapping: Optional[Dict[str, str]] = None,

50

date_partition_column: Optional[str] = None,

51

query: Optional[str] = None,

52

description: str = "",

53

tags: Optional[Dict[str, str]] = None,

54

owner: str = ""

55

):

56

"""

57

Google BigQuery data source.

58

59

Parameters:

60

- table: BigQuery table reference (project.dataset.table)

61

- timestamp_field: Event timestamp column name

62

- created_timestamp_column: Created timestamp column name

63

- field_mapping: Column name mappings

64

- date_partition_column: Date partition column

65

- query: Custom SQL query (alternative to table)

66

- description: Data source description

67

- tags: Metadata tags

68

- owner: Data source owner

69

"""

70

```

71

72

### Redshift Data Sources

73

74

Amazon Redshift data sources enable feature computation on AWS data warehouse infrastructure.

75

76

```python { .api }

77

class RedshiftSource:

78

def __init__(

79

self,

80

table: str,

81

timestamp_field: Optional[str] = None,

82

created_timestamp_column: Optional[str] = None,

83

field_mapping: Optional[Dict[str, str]] = None,

84

date_partition_column: Optional[str] = None,

85

query: Optional[str] = None,

86

description: str = "",

87

tags: Optional[Dict[str, str]] = None,

88

owner: str = ""

89

):

90

"""

91

Amazon Redshift data source.

92

93

Parameters:

94

- table: Redshift table reference (schema.table)

95

- timestamp_field: Event timestamp column name

96

- created_timestamp_column: Created timestamp column name

97

- field_mapping: Column name mappings

98

- date_partition_column: Date partition column

99

- query: Custom SQL query (alternative to table)

100

- description: Data source description

101

- tags: Metadata tags

102

- owner: Data source owner

103

"""

104

```

105

106

### Snowflake Data Sources

107

108

Snowflake data sources provide cloud data warehouse connectivity with advanced analytics capabilities.

109

110

```python { .api }

111

class SnowflakeSource:

112

def __init__(

113

self,

114

table: str,

115

timestamp_field: Optional[str] = None,

116

created_timestamp_column: Optional[str] = None,

117

field_mapping: Optional[Dict[str, str]] = None,

118

date_partition_column: Optional[str] = None,

119

query: Optional[str] = None,

120

description: str = "",

121

tags: Optional[Dict[str, str]] = None,

122

owner: str = ""

123

):

124

"""

125

Snowflake data warehouse source.

126

127

Parameters:

128

- table: Snowflake table reference (database.schema.table)

129

- timestamp_field: Event timestamp column name

130

- created_timestamp_column: Created timestamp column name

131

- field_mapping: Column name mappings

132

- date_partition_column: Date partition column

133

- query: Custom SQL query (alternative to table)

134

- description: Data source description

135

- tags: Metadata tags

136

- owner: Data source owner

137

"""

138

```

139

140

### Streaming Data Sources

141

142

Streaming data sources enable real-time feature updates from message brokers and streaming platforms.

143

144

```python { .api }

145

class KafkaSource:

146

def __init__(

147

self,

148

kafka_bootstrap_servers: str,

149

message_format: StreamFormat,

150

topic: str,

151

timestamp_field: Optional[str] = None,

152

created_timestamp_column: Optional[str] = None,

153

field_mapping: Optional[Dict[str, str]] = None,

154

batch_source: Optional[DataSource] = None,

155

watermark_delay_threshold: Optional[timedelta] = None,

156

description: str = "",

157

tags: Optional[Dict[str, str]] = None,

158

owner: str = ""

159

):

160

"""

161

Apache Kafka streaming data source.

162

163

Parameters:

164

- kafka_bootstrap_servers: Kafka broker connection string

165

- message_format: Message serialization format

166

- topic: Kafka topic name

167

- timestamp_field: Event timestamp field in messages

168

- created_timestamp_column: Created timestamp field

169

- field_mapping: Field name mappings

170

- batch_source: Associated batch source for historical data

171

- watermark_delay_threshold: Late data tolerance

172

- description: Data source description

173

- tags: Metadata tags

174

- owner: Data source owner

175

"""

176

177

class KinesisSource:

178

def __init__(

179

self,

180

table: str,

181

region: str,

182

timestamp_field: Optional[str] = None,

183

created_timestamp_column: Optional[str] = None,

184

field_mapping: Optional[Dict[str, str]] = None,

185

batch_source: Optional[DataSource] = None,

186

description: str = "",

187

tags: Optional[Dict[str, str]] = None,

188

owner: str = ""

189

):

190

"""

191

Amazon Kinesis streaming data source.

192

193

Parameters:

194

- table: Kinesis stream name

195

- region: AWS region

196

- timestamp_field: Event timestamp field

197

- created_timestamp_column: Created timestamp field

198

- field_mapping: Field name mappings

199

- batch_source: Associated batch source

200

- description: Data source description

201

- tags: Metadata tags

202

- owner: Data source owner

203

"""

204

```

205

206

### Push and Request Sources

207

208

Special data sources for real-time feature ingestion and request-time data incorporation.

209

210

```python { .api }

211

class PushSource:

212

def __init__(

213

self,

214

name: str,

215

batch_source: Optional[DataSource] = None,

216

description: str = "",

217

tags: Optional[Dict[str, str]] = None,

218

owner: str = ""

219

):

220

"""

221

Push-based data source for real-time feature ingestion.

222

223

Parameters:

224

- name: Push source name

225

- batch_source: Associated batch source for historical data

226

- description: Data source description

227

- tags: Metadata tags

228

- owner: Data source owner

229

"""

230

231

class RequestSource:

232

def __init__(

233

self,

234

name: str,

235

schema: List[Field],

236

description: str = "",

237

tags: Optional[Dict[str, str]] = None,

238

owner: str = ""

239

):

240

"""

241

Request-time data source for on-demand features.

242

243

Parameters:

244

- name: Request source name

245

- schema: Schema of request-time fields

246

- description: Data source description

247

- tags: Metadata tags

248

- owner: Data source owner

249

"""

250

```

251

252

## Stream Formats

253

254

Message formats for streaming data sources define how to deserialize streaming data.

255

256

```python { .api }

257

class StreamFormat:

258

"""Abstract base class for stream message formats."""

259

260

class AvroFormat(StreamFormat):

261

def __init__(self, schema_json: str):

262

"""

263

Avro message format.

264

265

Parameters:

266

- schema_json: Avro schema as JSON string

267

"""

268

269

class JsonFormat(StreamFormat):

270

def __init__(self, schema_json: str = ""):

271

"""

272

JSON message format.

273

274

Parameters:

275

- schema_json: Optional JSON schema for validation

276

"""

277

278

class ProtoFormat(StreamFormat):

279

def __init__(self, class_path: str):

280

"""

281

Protocol Buffers message format.

282

283

Parameters:

284

- class_path: Protobuf class path

285

"""

286

```

287

288

## Usage Examples

289

290

### File Data Sources

291

292

```python

293

from feast import FileSource

294

295

# Parquet file source

296

driver_source = FileSource(

297

path="s3://feast-bucket/driver_features.parquet",

298

timestamp_field="event_timestamp",

299

created_timestamp_column="created_timestamp",

300

description="Driver performance metrics"

301

)

302

303

# CSV file source with field mapping

304

customer_source = FileSource(

305

path="/data/customer_data.csv",

306

timestamp_field="ts",

307

field_mapping={

308

"customer_id": "customer",

309

"signup_ts": "created_timestamp"

310

},

311

description="Customer profile data"

312

)

313

314

# Delta table source with partitioning

315

transaction_source = FileSource(

316

path="s3://data-lake/transactions/",

317

timestamp_field="transaction_time",

318

date_partition_column="date",

319

description="Transaction history with date partitioning"

320

)

321

```

322

323

### Cloud Data Warehouse Sources

324

325

```python

326

from feast import BigQuerySource, RedshiftSource, SnowflakeSource

327

328

# BigQuery source with table reference

329

bq_source = BigQuerySource(

330

table="project.dataset.user_features",

331

timestamp_field="event_timestamp",

332

description="User behavioral features from BigQuery"

333

)

334

335

# BigQuery source with custom query

336

bq_query_source = BigQuerySource(

337

query="""

338

SELECT user_id, feature_1, feature_2, event_timestamp

339

FROM `project.dataset.raw_events`

340

WHERE event_type = 'conversion'

341

""",

342

timestamp_field="event_timestamp",

343

description="Conversion features computed via SQL"

344

)

345

346

# Redshift source

347

redshift_source = RedshiftSource(

348

table="analytics.user_metrics",

349

timestamp_field="created_at",

350

description="User metrics from Redshift warehouse"

351

)

352

353

# Snowflake source

354

snowflake_source = SnowflakeSource(

355

table="PROD.ANALYTICS.CUSTOMER_FEATURES",

356

timestamp_field="EVENT_TIMESTAMP",

357

description="Customer features from Snowflake"

358

)

359

```

360

361

### Streaming Data Sources

362

363

```python

364

from feast import KafkaSource, KinesisSource

365

from feast.data_format import JsonFormat, AvroFormat

366

367

# Kafka source with JSON format

368

kafka_source = KafkaSource(

369

kafka_bootstrap_servers="localhost:9092",

370

message_format=JsonFormat(),

371

topic="user_events",

372

timestamp_field="event_time",

373

description="Real-time user events from Kafka"

374

)

375

376

# Kafka source with Avro format

377

avro_schema = """

378

{

379

"type": "record",

380

"name": "UserEvent",

381

"fields": [

382

{"name": "user_id", "type": "long"},

383

{"name": "event_type", "type": "string"},

384

{"name": "timestamp", "type": "long"}

385

]

386

}

387

"""

388

389

kafka_avro_source = KafkaSource(

390

kafka_bootstrap_servers="kafka-cluster:9092",

391

message_format=AvroFormat(schema_json=avro_schema),

392

topic="user_events_avro",

393

timestamp_field="timestamp",

394

description="User events in Avro format"

395

)

396

397

# Kinesis source

398

kinesis_source = KinesisSource(

399

table="user-activity-stream",

400

region="us-east-1",

401

timestamp_field="event_timestamp",

402

description="User activity from Kinesis stream"

403

)

404

```

405

406

### Push and Request Sources

407

408

```python

409

from feast import PushSource, RequestSource, Field, ValueType

410

411

# Push source for real-time feature updates

412

push_source = PushSource(

413

name="driver_location_push",

414

description="Real-time driver location updates"

415

)

416

417

# Request source for on-demand features

418

request_source = RequestSource(

419

name="ride_request_data",

420

schema=[

421

Field(name="pickup_lat", dtype=ValueType.DOUBLE),

422

Field(name="pickup_lon", dtype=ValueType.DOUBLE),

423

Field(name="dropoff_lat", dtype=ValueType.DOUBLE),

424

Field(name="dropoff_lon", dtype=ValueType.DOUBLE),

425

Field(name="requested_at", dtype=ValueType.UNIX_TIMESTAMP)

426

],

427

description="Request-time ride booking data"

428

)

429

```

430

431

### Data Source Configuration Patterns

432

433

```python

434

# Source with comprehensive metadata

435

production_source = BigQuerySource(

436

table="production.ml_features.customer_metrics",

437

timestamp_field="feature_timestamp",

438

created_timestamp_column="created_timestamp",

439

field_mapping={

440

"cust_id": "customer_id",

441

"signup_date": "created_date"

442

},

443

date_partition_column="feature_date",

444

description="Production customer metrics with full lineage",

445

tags={

446

"environment": "production",

447

"data_classification": "internal",

448

"update_frequency": "hourly",

449

"retention_days": "365"

450

},

451

owner="data-platform@company.com"

452

)

453

454

# Development source with different configuration

455

dev_source = FileSource(

456

path="./test_data/customer_features_sample.parquet",

457

timestamp_field="feature_timestamp",

458

description="Development sample data for testing",

459

tags={

460

"environment": "development",

461

"data_size": "1000_rows"

462

},

463

owner="ml-engineer@company.com"

464

)

465

```