or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-api.mdcore-types.mddata-management.mdindex.mdmulti-language.mdplugins.mdtasks-workflows.md

data-management.mddocs/

0

# Data Management

1

2

Data catalog and caching services providing versioned artifact storage, metadata management, tagging systems, and concurrent access coordination. These services enable efficient data sharing, lineage tracking, and performance optimization across workflow executions in the Flyte platform.

3

4

## Capabilities

5

6

### Data Catalog Service

7

8

Comprehensive data catalog providing versioned artifact storage with metadata, partitioning, and lineage tracking capabilities.

9

10

```python { .api }

11

class DataCatalogService:

12

"""Data catalog service for artifact and dataset management."""

13

14

def CreateDataset(request: CreateDatasetRequest) -> CreateDatasetResponse:

15

"""

16

Create a new dataset definition.

17

18

Args:

19

request: CreateDatasetRequest with dataset specification

20

21

Returns:

22

CreateDatasetResponse with creation status

23

24

Raises:

25

ALREADY_EXISTS: Dataset with the same ID already exists

26

"""

27

28

def GetDataset(request: GetDatasetRequest) -> GetDatasetResponse:

29

"""

30

Retrieve dataset information by ID.

31

32

Args:

33

request: GetDatasetRequest with dataset identifier

34

35

Returns:

36

GetDatasetResponse with dataset details

37

38

Raises:

39

NOT_FOUND: Dataset with specified ID does not exist

40

"""

41

42

def ListDatasets(request: ListDatasetsRequest) -> ListDatasetsResponse:

43

"""

44

List datasets with filtering and pagination.

45

46

Args:

47

request: ListDatasetsRequest with filters and pagination

48

49

Returns:

50

ListDatasetsResponse with matching datasets

51

"""

52

```

53

54

### Artifact Management

55

56

Manage versioned artifacts with comprehensive metadata, partitioning, and tagging support.

57

58

```python { .api }

59

def CreateArtifact(request: CreateArtifactRequest) -> CreateArtifactResponse:

60

"""

61

Create a new artifact with metadata and data references.

62

63

Args:

64

request: CreateArtifactRequest with artifact specification

65

66

Returns:

67

CreateArtifactResponse with creation status and artifact ID

68

69

Raises:

70

ALREADY_EXISTS: Artifact with the same ID and partition already exists

71

INVALID_ARGUMENT: Invalid artifact specification

72

"""

73

74

def GetArtifact(request: GetArtifactRequest) -> GetArtifactResponse:

75

"""

76

Retrieve artifact by ID with optional partition filtering.

77

78

Args:

79

request: GetArtifactRequest with artifact identifier and filters

80

81

Returns:

82

GetArtifactResponse with artifact details and data references

83

84

Raises:

85

NOT_FOUND: Artifact with specified ID does not exist

86

"""

87

88

def ListArtifacts(request: ListArtifactsRequest) -> ListArtifactsResponse:

89

"""

90

List artifacts with comprehensive filtering, sorting, and pagination.

91

92

Args:

93

request: ListArtifactsRequest with filters and pagination options

94

95

Returns:

96

ListArtifactsResponse with matching artifacts and metadata

97

"""

98

99

def UpdateArtifact(request: UpdateArtifactRequest) -> UpdateArtifactResponse:

100

"""

101

Update artifact metadata and data references.

102

103

Args:

104

request: UpdateArtifactRequest with artifact ID and updates

105

106

Returns:

107

UpdateArtifactResponse with update status

108

109

Raises:

110

NOT_FOUND: Artifact with specified ID does not exist

111

"""

112

```

113

114

### Tagging System

115

116

Flexible tagging system for artifact organization, discovery, and metadata management.

117

118

```python { .api }

119

def AddTag(request: AddTagRequest) -> AddTagResponse:

120

"""

121

Add a tag to an artifact for organization and discovery.

122

123

Args:

124

request: AddTagRequest with artifact ID and tag information

125

126

Returns:

127

AddTagResponse with tagging status

128

129

Raises:

130

NOT_FOUND: Artifact with specified ID does not exist

131

ALREADY_EXISTS: Tag already exists on the artifact

132

"""

133

```

134

135

### Reservation System

136

137

Concurrent access coordination through reservation system preventing race conditions in data operations.

138

139

```python { .api }

140

def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:

141

"""

142

Get or extend a reservation for exclusive access to an artifact.

143

144

Args:

145

request: GetOrExtendReservationRequest with artifact ID and reservation details

146

147

Returns:

148

GetOrExtendReservationResponse with reservation token and status

149

150

Raises:

151

RESOURCE_EXHAUSTED: Unable to acquire reservation due to conflicts

152

"""

153

154

def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:

155

"""

156

Release a previously acquired reservation.

157

158

Args:

159

request: ReleaseReservationRequest with reservation token

160

161

Returns:

162

ReleaseReservationResponse with release status

163

164

Raises:

165

NOT_FOUND: Reservation token not found or already released

166

"""

167

```

168

169

### Cache Service

170

171

High-performance caching service with concurrent access coordination and metadata management.

172

173

```python { .api }

174

class CacheService:

175

"""Cache service for storing and retrieving task outputs."""

176

177

def Get(request: GetCacheRequest) -> GetCacheResponse:

178

"""

179

Retrieve cached data by key.

180

181

Args:

182

request: GetCacheRequest with cache key and metadata

183

184

Returns:

185

GetCacheResponse with cached data or cache miss indication

186

"""

187

188

def Put(request: PutCacheRequest) -> PutCacheResponse:

189

"""

190

Store data in cache with metadata and expiration.

191

192

Args:

193

request: PutCacheRequest with key, data, and metadata

194

195

Returns:

196

PutCacheResponse with storage status

197

198

Raises:

199

RESOURCE_EXHAUSTED: Cache storage limit exceeded

200

"""

201

202

def Delete(request: DeleteCacheRequest) -> DeleteCacheResponse:

203

"""

204

Delete cached data by key.

205

206

Args:

207

request: DeleteCacheRequest with cache key

208

209

Returns:

210

DeleteCacheResponse with deletion status

211

"""

212

```

213

214

### Cache Reservations

215

216

Reservation system for cache operations ensuring consistent concurrent access patterns.

217

218

```python { .api }

219

def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:

220

"""

221

Get or extend a cache reservation for exclusive write access.

222

223

Args:

224

request: GetOrExtendReservationRequest with cache key and reservation details

225

226

Returns:

227

GetOrExtendReservationResponse with reservation token

228

"""

229

230

def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:

231

"""

232

Release a cache reservation.

233

234

Args:

235

request: ReleaseReservationRequest with reservation token

236

237

Returns:

238

ReleaseReservationResponse with release status

239

"""

240

```

241

242

### Data Proxy Service

243

244

Service for creating secure upload/download locations and managing data access patterns.

245

246

```python { .api }

247

class DataProxyService:

248

"""Data proxy service for secure data access and transfer."""

249

250

def CreateUploadLocation(request: CreateUploadLocationRequest) -> CreateUploadLocationResponse:

251

"""

252

Create a secure upload location for data artifacts.

253

254

Args:

255

request: CreateUploadLocationRequest with content type and expiration

256

257

Returns:

258

CreateUploadLocationResponse with signed upload URL and headers

259

"""

260

261

def CreateDownloadLink(request: CreateDownloadLinkRequest) -> CreateDownloadLinkResponse:

262

"""

263

Create a secure download link for data artifacts.

264

265

Args:

266

request: CreateDownloadLinkRequest with artifact location and expiration

267

268

Returns:

269

CreateDownloadLinkResponse with signed download URL

270

"""

271

272

def GetData(request: GetDataRequest) -> GetDataResponse:

273

"""

274

Retrieve data directly through the proxy service.

275

276

Args:

277

request: GetDataRequest with data location and access parameters

278

279

Returns:

280

GetDataResponse with data payload or redirect information

281

"""

282

```

283

284

## Types

285

286

### Dataset Types

287

288

```python { .api }

289

class Dataset:

290

"""Dataset definition with metadata and partitioning information."""

291

id: DatasetID

292

metadata: Metadata

293

partition_keys: list[str]

294

295

class DatasetID:

296

"""Unique identifier for datasets."""

297

project: str

298

name: str

299

domain: str

300

version: str

301

uuid: str

302

303

class Metadata:

304

"""Flexible metadata container for datasets and artifacts."""

305

key_map: dict[str, str]

306

```

307

308

### Artifact Types

309

310

```python { .api }

311

class Artifact:

312

"""Versioned artifact with data references and metadata."""

313

id: ArtifactID

314

dataset: DatasetID

315

data: list[ArtifactData]

316

metadata: Metadata

317

partitions: list[Partition]

318

tags: list[Tag]

319

source: ArtifactSource

320

321

class ArtifactID:

322

"""Unique identifier for artifacts."""

323

artifact_key: ArtifactKey

324

version: str

325

partitions: TimePartition

326

327

class ArtifactKey:

328

"""Key identifying an artifact family."""

329

project: str

330

domain: str

331

name: str

332

333

class ArtifactData:

334

"""Data reference within an artifact."""

335

name: str

336

value: Literal

337

338

class Partition:

339

"""Partition specification for data organization."""

340

key: str

341

value: str

342

343

class Tag:

344

"""Tag for artifact organization and discovery."""

345

name: str

346

artifact_id: str

347

dataset: DatasetID

348

```

349

350

### Cache Types

351

352

```python { .api }

353

class GetCacheRequest:

354

"""Request to retrieve cached data."""

355

task_execution_id: TaskExecutionIdentifier

356

input_reader: Metadata

357

cache_version: str

358

359

class GetCacheResponse:

360

"""Response with cached data or cache miss indication."""

361

entry: CacheEntry

362

363

class PutCacheRequest:

364

"""Request to store data in cache."""

365

task_execution_id: TaskExecutionIdentifier

366

input_reader: Metadata

367

output_reader: Metadata

368

cache_version: str

369

370

class PutCacheResponse:

371

"""Response indicating cache storage status."""

372

pass

373

374

class CacheEntry:

375

"""Cache entry with data and metadata."""

376

outputs: Metadata

377

source: TaskExecutionIdentifier

378

379

class Metadata:

380

"""Cache metadata container."""

381

pass

382

```

383

384

### Reservation Types

385

386

```python { .api }

387

class ReservationID:

388

"""Unique identifier for reservations."""

389

dataset_id: DatasetID

390

tag_name: str

391

392

class GetOrExtendReservationRequest:

393

"""Request to acquire or extend a reservation."""

394

reservation_id: ReservationID

395

owner_id: str

396

heartbeat_interval: timedelta

397

398

class GetOrExtendReservationResponse:

399

"""Response with reservation details."""

400

reservation: Reservation

401

402

class Reservation:

403

"""Active reservation with ownership information."""

404

reservation_id: ReservationID

405

owner_id: str

406

heartbeat_interval: timedelta

407

expires_at: datetime

408

409

class ReleaseReservationRequest:

410

"""Request to release a reservation."""

411

reservation_id: ReservationID

412

owner_id: str

413

414

class ReleaseReservationResponse:

415

"""Response indicating reservation release status."""

416

pass

417

```

418

419

### Data Proxy Types

420

421

```python { .api }

422

class CreateUploadLocationRequest:

423

"""Request to create secure upload location."""

424

project: str

425

domain: str

426

filename: str

427

expires_in: timedelta

428

content_md5: bytes

429

upload_mode: UploadMode

430

431

class CreateUploadLocationResponse:

432

"""Response with upload location and credentials."""

433

signed_url: str

434

native_url: str

435

headers: dict[str, str]

436

437

class CreateDownloadLinkRequest:

438

"""Request to create secure download link."""

439

artifact_type: ArtifactType

440

expires_in: timedelta

441

source: DataSource

442

443

class CreateDownloadLinkResponse:

444

"""Response with download link."""

445

signed_url: str

446

expires_at: datetime

447

448

class GetDataRequest:

449

"""Request to retrieve data through proxy."""

450

flyte_url: str

451

452

class GetDataResponse:

453

"""Response with data or redirect information."""

454

data: bytes

455

metadata: dict[str, str]

456

```

457

458

## Usage Examples

459

460

### Creating and Managing Datasets

461

462

```python

463

from flyteidl.datacatalog import datacatalog_pb2

464

465

# Create dataset identifier

466

dataset_id = datacatalog_pb2.DatasetID(

467

project="ml-project",

468

domain="production",

469

name="training-data",

470

version="v2.0.0"

471

)

472

473

# Create dataset

474

create_request = datacatalog_pb2.CreateDatasetRequest(

475

dataset=datacatalog_pb2.Dataset(

476

id=dataset_id,

477

metadata=datacatalog_pb2.Metadata(

478

key_map={

479

"source": "feature-store",

480

"format": "parquet",

481

"schema_version": "1.2.0"

482

}

483

),

484

partition_keys=["date", "region"]

485

)

486

)

487

488

# Use with datacatalog client

489

response = datacatalog_client.CreateDataset(create_request)

490

```

491

492

### Artifact Management with Partitioning

493

494

```python

495

# Create artifact with partitions

496

artifact_id = datacatalog_pb2.ArtifactID(

497

artifact_key=datacatalog_pb2.ArtifactKey(

498

project="ml-project",

499

domain="production",

500

name="model-predictions"

501

),

502

version="2023-12-01",

503

partitions=datacatalog_pb2.TimePartition(

504

value=datacatalog_pb2.LabelValue(

505

static_value="2023-12-01"

506

)

507

)

508

)

509

510

# Create artifact with data references

511

create_artifact_request = datacatalog_pb2.CreateArtifactRequest(

512

artifact=datacatalog_pb2.Artifact(

513

id=artifact_id,

514

dataset=dataset_id,

515

data=[

516

datacatalog_pb2.ArtifactData(

517

name="predictions",

518

value=literal_pb2.Literal(

519

scalar=literal_pb2.Scalar(

520

blob=literal_pb2.Blob(

521

uri="s3://ml-bucket/predictions/2023-12-01.parquet",

522

metadata=literal_pb2.BlobMetadata(

523

type=literal_pb2.BlobType(format="parquet")

524

)

525

)

526

)

527

)

528

)

529

],

530

partitions=[

531

datacatalog_pb2.Partition(key="date", value="2023-12-01"),

532

datacatalog_pb2.Partition(key="model_version", value="v1.5.2")

533

],

534

tags=[

535

datacatalog_pb2.Tag(name="latest"),

536

datacatalog_pb2.Tag(name="production")

537

]

538

)

539

)

540

```

541

542

### Cache Operations

543

544

```python

545

from flyteidl.cacheservice import cacheservice_pb2

546

547

# Create cache key from task execution

548

cache_request = cacheservice_pb2.GetCacheRequest(

549

task_execution_id=task_execution_id,

550

input_reader=cacheservice_pb2.Metadata(),

551

cache_version="v1.0.0"

552

)

553

554

# Try to get from cache

555

cache_response = cache_client.Get(cache_request)

556

557

if not cache_response.entry:

558

# Cache miss - execute task and store result

559

# ... execute task logic ...

560

561

# Store result in cache

562

put_request = cacheservice_pb2.PutCacheRequest(

563

task_execution_id=task_execution_id,

564

input_reader=cacheservice_pb2.Metadata(),

565

output_reader=cacheservice_pb2.Metadata(),

566

cache_version="v1.0.0"

567

)

568

cache_client.Put(put_request)

569

```

570

571

### Reservation Management

572

573

```python

574

# Acquire reservation for exclusive access

575

reservation_id = datacatalog_pb2.ReservationID(

576

dataset_id=dataset_id,

577

tag_name="latest"

578

)

579

580

get_reservation_request = datacatalog_pb2.GetOrExtendReservationRequest(

581

reservation_id=reservation_id,

582

owner_id="workflow-execution-123",

583

heartbeat_interval=timedelta(minutes=5)

584

)

585

586

reservation_response = datacatalog_client.GetOrExtendReservation(get_reservation_request)

587

588

try:

589

# Perform exclusive operations

590

# ... update dataset ...

591

592

finally:

593

# Always release reservation

594

release_request = datacatalog_pb2.ReleaseReservationRequest(

595

reservation_id=reservation_id,

596

owner_id="workflow-execution-123"

597

)

598

datacatalog_client.ReleaseReservation(release_request)

599

```

600

601

### Data Proxy Usage

602

603

```python

604

from flyteidl.service import dataproxy_pb2

605

606

# Create upload location

607

upload_request = dataproxy_pb2.CreateUploadLocationRequest(

608

project="ml-project",

609

domain="development",

610

filename="training-data.parquet",

611

expires_in=timedelta(hours=1),

612

content_md5=b"...", # MD5 hash of content

613

upload_mode=dataproxy_pb2.UploadMode.STREAMING

614

)

615

616

upload_response = dataproxy_client.CreateUploadLocation(upload_request)

617

618

# Use signed URL to upload data

619

# requests.put(upload_response.signed_url, data=file_content, headers=upload_response.headers)

620

621

# Create download link

622

download_request = dataproxy_pb2.CreateDownloadLinkRequest(

623

artifact_type=dataproxy_pb2.ArtifactType.INPUTS,

624

expires_in=timedelta(minutes=30),

625

source=dataproxy_pb2.DataSource(

626

execution_id=execution_id,

627

node_id="data-processing-node"

628

)

629

)

630

631

download_response = dataproxy_client.CreateDownloadLink(download_request)

632

# Use download_response.signed_url to retrieve data

633

```