0
# Data Management
1
2
Data catalog and caching services providing versioned artifact storage, metadata management, tagging systems, and concurrent access coordination. These services enable efficient data sharing, lineage tracking, and performance optimization across workflow executions in the Flyte platform.
3
4
## Capabilities
5
6
### Data Catalog Service
7
8
Comprehensive data catalog providing versioned artifact storage with metadata, partitioning, and lineage tracking capabilities.
9
10
```python { .api }
11
class DataCatalogService:
12
"""Data catalog service for artifact and dataset management."""
13
14
def CreateDataset(request: CreateDatasetRequest) -> CreateDatasetResponse:
15
"""
16
Create a new dataset definition.
17
18
Args:
19
request: CreateDatasetRequest with dataset specification
20
21
Returns:
22
CreateDatasetResponse with creation status
23
24
Raises:
25
ALREADY_EXISTS: Dataset with the same ID already exists
26
"""
27
28
def GetDataset(request: GetDatasetRequest) -> GetDatasetResponse:
29
"""
30
Retrieve dataset information by ID.
31
32
Args:
33
request: GetDatasetRequest with dataset identifier
34
35
Returns:
36
GetDatasetResponse with dataset details
37
38
Raises:
39
NOT_FOUND: Dataset with specified ID does not exist
40
"""
41
42
def ListDatasets(request: ListDatasetsRequest) -> ListDatasetsResponse:
43
"""
44
List datasets with filtering and pagination.
45
46
Args:
47
request: ListDatasetsRequest with filters and pagination
48
49
Returns:
50
ListDatasetsResponse with matching datasets
51
"""
52
```
53
54
### Artifact Management
55
56
Manage versioned artifacts with comprehensive metadata, partitioning, and tagging support.
57
58
```python { .api }
59
def CreateArtifact(request: CreateArtifactRequest) -> CreateArtifactResponse:
60
"""
61
Create a new artifact with metadata and data references.
62
63
Args:
64
request: CreateArtifactRequest with artifact specification
65
66
Returns:
67
CreateArtifactResponse with creation status and artifact ID
68
69
Raises:
70
ALREADY_EXISTS: Artifact with the same ID and partition already exists
71
INVALID_ARGUMENT: Invalid artifact specification
72
"""
73
74
def GetArtifact(request: GetArtifactRequest) -> GetArtifactResponse:
75
"""
76
Retrieve artifact by ID with optional partition filtering.
77
78
Args:
79
request: GetArtifactRequest with artifact identifier and filters
80
81
Returns:
82
GetArtifactResponse with artifact details and data references
83
84
Raises:
85
NOT_FOUND: Artifact with specified ID does not exist
86
"""
87
88
def ListArtifacts(request: ListArtifactsRequest) -> ListArtifactsResponse:
89
"""
90
List artifacts with comprehensive filtering, sorting, and pagination.
91
92
Args:
93
request: ListArtifactsRequest with filters and pagination options
94
95
Returns:
96
ListArtifactsResponse with matching artifacts and metadata
97
"""
98
99
def UpdateArtifact(request: UpdateArtifactRequest) -> UpdateArtifactResponse:
100
"""
101
Update artifact metadata and data references.
102
103
Args:
104
request: UpdateArtifactRequest with artifact ID and updates
105
106
Returns:
107
UpdateArtifactResponse with update status
108
109
Raises:
110
NOT_FOUND: Artifact with specified ID does not exist
111
"""
112
```
113
114
### Tagging System
115
116
Flexible tagging system for artifact organization, discovery, and metadata management.
117
118
```python { .api }
119
def AddTag(request: AddTagRequest) -> AddTagResponse:
120
"""
121
Add a tag to an artifact for organization and discovery.
122
123
Args:
124
request: AddTagRequest with artifact ID and tag information
125
126
Returns:
127
AddTagResponse with tagging status
128
129
Raises:
130
NOT_FOUND: Artifact with specified ID does not exist
131
ALREADY_EXISTS: Tag already exists on the artifact
132
"""
133
```
134
135
### Reservation System
136
137
Concurrent access coordination through reservation system preventing race conditions in data operations.
138
139
```python { .api }
140
def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:
141
"""
142
Get or extend a reservation for exclusive access to an artifact.
143
144
Args:
145
request: GetOrExtendReservationRequest with artifact ID and reservation details
146
147
Returns:
148
GetOrExtendReservationResponse with reservation token and status
149
150
Raises:
151
RESOURCE_EXHAUSTED: Unable to acquire reservation due to conflicts
152
"""
153
154
def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:
155
"""
156
Release a previously acquired reservation.
157
158
Args:
159
request: ReleaseReservationRequest with reservation token
160
161
Returns:
162
ReleaseReservationResponse with release status
163
164
Raises:
165
NOT_FOUND: Reservation token not found or already released
166
"""
167
```
168
169
### Cache Service
170
171
High-performance caching service with concurrent access coordination and metadata management.
172
173
```python { .api }
174
class CacheService:
175
"""Cache service for storing and retrieving task outputs."""
176
177
def Get(request: GetCacheRequest) -> GetCacheResponse:
178
"""
179
Retrieve cached data by key.
180
181
Args:
182
request: GetCacheRequest with cache key and metadata
183
184
Returns:
185
GetCacheResponse with cached data or cache miss indication
186
"""
187
188
def Put(request: PutCacheRequest) -> PutCacheResponse:
189
"""
190
Store data in cache with metadata and expiration.
191
192
Args:
193
request: PutCacheRequest with key, data, and metadata
194
195
Returns:
196
PutCacheResponse with storage status
197
198
Raises:
199
RESOURCE_EXHAUSTED: Cache storage limit exceeded
200
"""
201
202
def Delete(request: DeleteCacheRequest) -> DeleteCacheResponse:
203
"""
204
Delete cached data by key.
205
206
Args:
207
request: DeleteCacheRequest with cache key
208
209
Returns:
210
DeleteCacheResponse with deletion status
211
"""
212
```
213
214
### Cache Reservations
215
216
Reservation system for cache operations ensuring consistent concurrent access patterns.
217
218
```python { .api }
219
def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:
220
"""
221
Get or extend a cache reservation for exclusive write access.
222
223
Args:
224
request: GetOrExtendReservationRequest with cache key and reservation details
225
226
Returns:
227
GetOrExtendReservationResponse with reservation token
228
"""
229
230
def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:
231
"""
232
Release a cache reservation.
233
234
Args:
235
request: ReleaseReservationRequest with reservation token
236
237
Returns:
238
ReleaseReservationResponse with release status
239
"""
240
```
241
242
### Data Proxy Service
243
244
Service for creating secure upload/download locations and managing data access patterns.
245
246
```python { .api }
247
class DataProxyService:
248
"""Data proxy service for secure data access and transfer."""
249
250
def CreateUploadLocation(request: CreateUploadLocationRequest) -> CreateUploadLocationResponse:
251
"""
252
Create a secure upload location for data artifacts.
253
254
Args:
255
request: CreateUploadLocationRequest with content type and expiration
256
257
Returns:
258
CreateUploadLocationResponse with signed upload URL and headers
259
"""
260
261
def CreateDownloadLink(request: CreateDownloadLinkRequest) -> CreateDownloadLinkResponse:
262
"""
263
Create a secure download link for data artifacts.
264
265
Args:
266
request: CreateDownloadLinkRequest with artifact location and expiration
267
268
Returns:
269
CreateDownloadLinkResponse with signed download URL
270
"""
271
272
def GetData(request: GetDataRequest) -> GetDataResponse:
273
"""
274
Retrieve data directly through the proxy service.
275
276
Args:
277
request: GetDataRequest with data location and access parameters
278
279
Returns:
280
GetDataResponse with data payload or redirect information
281
"""
282
```
283
284
## Types
285
286
### Dataset Types
287
288
```python { .api }
289
class Dataset:
290
"""Dataset definition with metadata and partitioning information."""
291
id: DatasetID
292
metadata: Metadata
293
partition_keys: list[str]
294
295
class DatasetID:
296
"""Unique identifier for datasets."""
297
project: str
298
name: str
299
domain: str
300
version: str
301
uuid: str
302
303
class Metadata:
304
"""Flexible metadata container for datasets and artifacts."""
305
key_map: dict[str, str]
306
```
307
308
### Artifact Types
309
310
```python { .api }
311
class Artifact:
312
"""Versioned artifact with data references and metadata."""
313
id: ArtifactID
314
dataset: DatasetID
315
data: list[ArtifactData]
316
metadata: Metadata
317
partitions: list[Partition]
318
tags: list[Tag]
319
source: ArtifactSource
320
321
class ArtifactID:
322
"""Unique identifier for artifacts."""
323
artifact_key: ArtifactKey
324
version: str
325
partitions: TimePartition
326
327
class ArtifactKey:
328
"""Key identifying an artifact family."""
329
project: str
330
domain: str
331
name: str
332
333
class ArtifactData:
334
"""Data reference within an artifact."""
335
name: str
336
value: Literal
337
338
class Partition:
339
"""Partition specification for data organization."""
340
key: str
341
value: str
342
343
class Tag:
344
"""Tag for artifact organization and discovery."""
345
name: str
346
artifact_id: str
347
dataset: DatasetID
348
```
349
350
### Cache Types
351
352
```python { .api }
353
class GetCacheRequest:
354
"""Request to retrieve cached data."""
355
task_execution_id: TaskExecutionIdentifier
356
input_reader: Metadata
357
cache_version: str
358
359
class GetCacheResponse:
360
"""Response with cached data or cache miss indication."""
361
entry: CacheEntry
362
363
class PutCacheRequest:
364
"""Request to store data in cache."""
365
task_execution_id: TaskExecutionIdentifier
366
input_reader: Metadata
367
output_reader: Metadata
368
cache_version: str
369
370
class PutCacheResponse:
371
"""Response indicating cache storage status."""
372
pass
373
374
class CacheEntry:
375
"""Cache entry with data and metadata."""
376
outputs: Metadata
377
source: TaskExecutionIdentifier
378
379
class Metadata:
380
"""Cache metadata container."""
381
pass
382
```
383
384
### Reservation Types
385
386
```python { .api }
387
class ReservationID:
388
"""Unique identifier for reservations."""
389
dataset_id: DatasetID
390
tag_name: str
391
392
class GetOrExtendReservationRequest:
393
"""Request to acquire or extend a reservation."""
394
reservation_id: ReservationID
395
owner_id: str
396
heartbeat_interval: timedelta
397
398
class GetOrExtendReservationResponse:
399
"""Response with reservation details."""
400
reservation: Reservation
401
402
class Reservation:
403
"""Active reservation with ownership information."""
404
reservation_id: ReservationID
405
owner_id: str
406
heartbeat_interval: timedelta
407
expires_at: datetime
408
409
class ReleaseReservationRequest:
410
"""Request to release a reservation."""
411
reservation_id: ReservationID
412
owner_id: str
413
414
class ReleaseReservationResponse:
415
"""Response indicating reservation release status."""
416
pass
417
```
418
419
### Data Proxy Types
420
421
```python { .api }
422
class CreateUploadLocationRequest:
423
"""Request to create secure upload location."""
424
project: str
425
domain: str
426
filename: str
427
expires_in: timedelta
428
content_md5: bytes
429
upload_mode: UploadMode
430
431
class CreateUploadLocationResponse:
432
"""Response with upload location and credentials."""
433
signed_url: str
434
native_url: str
435
headers: dict[str, str]
436
437
class CreateDownloadLinkRequest:
438
"""Request to create secure download link."""
439
artifact_type: ArtifactType
440
expires_in: timedelta
441
source: DataSource
442
443
class CreateDownloadLinkResponse:
444
"""Response with download link."""
445
signed_url: str
446
expires_at: datetime
447
448
class GetDataRequest:
449
"""Request to retrieve data through proxy."""
450
flyte_url: str
451
452
class GetDataResponse:
453
"""Response with data or redirect information."""
454
data: bytes
455
metadata: dict[str, str]
456
```
457
458
## Usage Examples
459
460
### Creating and Managing Datasets
461
462
```python
463
from flyteidl.datacatalog import datacatalog_pb2
464
465
# Create dataset identifier
466
dataset_id = datacatalog_pb2.DatasetID(
467
project="ml-project",
468
domain="production",
469
name="training-data",
470
version="v2.0.0"
471
)
472
473
# Create dataset
474
create_request = datacatalog_pb2.CreateDatasetRequest(
475
dataset=datacatalog_pb2.Dataset(
476
id=dataset_id,
477
metadata=datacatalog_pb2.Metadata(
478
key_map={
479
"source": "feature-store",
480
"format": "parquet",
481
"schema_version": "1.2.0"
482
}
483
),
484
partition_keys=["date", "region"]
485
)
486
)
487
488
# Use with datacatalog client
489
response = datacatalog_client.CreateDataset(create_request)
490
```
491
492
### Artifact Management with Partitioning
493
494
```python
495
# Create artifact with partitions
496
artifact_id = datacatalog_pb2.ArtifactID(
497
artifact_key=datacatalog_pb2.ArtifactKey(
498
project="ml-project",
499
domain="production",
500
name="model-predictions"
501
),
502
version="2023-12-01",
503
partitions=datacatalog_pb2.TimePartition(
504
value=datacatalog_pb2.LabelValue(
505
static_value="2023-12-01"
506
)
507
)
508
)
509
510
# Create artifact with data references
511
create_artifact_request = datacatalog_pb2.CreateArtifactRequest(
512
artifact=datacatalog_pb2.Artifact(
513
id=artifact_id,
514
dataset=dataset_id,
515
data=[
516
datacatalog_pb2.ArtifactData(
517
name="predictions",
518
value=literal_pb2.Literal(
519
scalar=literal_pb2.Scalar(
520
blob=literal_pb2.Blob(
521
uri="s3://ml-bucket/predictions/2023-12-01.parquet",
522
metadata=literal_pb2.BlobMetadata(
523
type=literal_pb2.BlobType(format="parquet")
524
)
525
)
526
)
527
)
528
)
529
],
530
partitions=[
531
datacatalog_pb2.Partition(key="date", value="2023-12-01"),
532
datacatalog_pb2.Partition(key="model_version", value="v1.5.2")
533
],
534
tags=[
535
datacatalog_pb2.Tag(name="latest"),
536
datacatalog_pb2.Tag(name="production")
537
]
538
)
539
)
540
```
541
542
### Cache Operations
543
544
```python
545
from flyteidl.cacheservice import cacheservice_pb2
546
547
# Create cache key from task execution
548
cache_request = cacheservice_pb2.GetCacheRequest(
549
task_execution_id=task_execution_id,
550
input_reader=cacheservice_pb2.Metadata(),
551
cache_version="v1.0.0"
552
)
553
554
# Try to get from cache
555
cache_response = cache_client.Get(cache_request)
556
557
if not cache_response.entry:
558
# Cache miss - execute task and store result
559
# ... execute task logic ...
560
561
# Store result in cache
562
put_request = cacheservice_pb2.PutCacheRequest(
563
task_execution_id=task_execution_id,
564
input_reader=cacheservice_pb2.Metadata(),
565
output_reader=cacheservice_pb2.Metadata(),
566
cache_version="v1.0.0"
567
)
568
cache_client.Put(put_request)
569
```
570
571
### Reservation Management
572
573
```python
574
# Acquire reservation for exclusive access
575
reservation_id = datacatalog_pb2.ReservationID(
576
dataset_id=dataset_id,
577
tag_name="latest"
578
)
579
580
get_reservation_request = datacatalog_pb2.GetOrExtendReservationRequest(
581
reservation_id=reservation_id,
582
owner_id="workflow-execution-123",
583
heartbeat_interval=timedelta(minutes=5)
584
)
585
586
reservation_response = datacatalog_client.GetOrExtendReservation(get_reservation_request)
587
588
try:
589
# Perform exclusive operations
590
# ... update dataset ...
591
592
finally:
593
# Always release reservation
594
release_request = datacatalog_pb2.ReleaseReservationRequest(
595
reservation_id=reservation_id,
596
owner_id="workflow-execution-123"
597
)
598
datacatalog_client.ReleaseReservation(release_request)
599
```
600
601
### Data Proxy Usage
602
603
```python
604
from flyteidl.service import dataproxy_pb2
605
606
# Create upload location
607
upload_request = dataproxy_pb2.CreateUploadLocationRequest(
608
project="ml-project",
609
domain="development",
610
filename="training-data.parquet",
611
expires_in=timedelta(hours=1),
612
content_md5=b"...", # MD5 hash of content
613
upload_mode=dataproxy_pb2.UploadMode.STREAMING
614
)
615
616
upload_response = dataproxy_client.CreateUploadLocation(upload_request)
617
618
# Use signed URL to upload data
619
# requests.put(upload_response.signed_url, data=file_content, headers=upload_response.headers)
620
621
# Create download link
622
download_request = dataproxy_pb2.CreateDownloadLinkRequest(
623
artifact_type=dataproxy_pb2.ArtifactType.INPUTS,
624
expires_in=timedelta(minutes=30),
625
source=dataproxy_pb2.DataSource(
626
execution_id=execution_id,
627
node_id="data-processing-node"
628
)
629
)
630
631
download_response = dataproxy_client.CreateDownloadLink(download_request)
632
# Use download_response.signed_url to retrieve data
633
```