IDL for Flyte Platform containing protobuf specifications, gRPC API definitions, and generated clients for multiple languages
—
Data catalog and caching services providing versioned artifact storage, metadata management, tagging systems, and concurrent access coordination. These services enable efficient data sharing, lineage tracking, and performance optimization across workflow executions in the Flyte platform.
Comprehensive data catalog providing versioned artifact storage with metadata, partitioning, and lineage tracking capabilities.
class DataCatalogService:
"""Data catalog service for artifact and dataset management."""
def CreateDataset(request: CreateDatasetRequest) -> CreateDatasetResponse:
"""
Create a new dataset definition.
Args:
request: CreateDatasetRequest with dataset specification
Returns:
CreateDatasetResponse with creation status
Raises:
ALREADY_EXISTS: Dataset with the same ID already exists
"""
def GetDataset(request: GetDatasetRequest) -> GetDatasetResponse:
"""
Retrieve dataset information by ID.
Args:
request: GetDatasetRequest with dataset identifier
Returns:
GetDatasetResponse with dataset details
Raises:
NOT_FOUND: Dataset with specified ID does not exist
"""
def ListDatasets(request: ListDatasetsRequest) -> ListDatasetsResponse:
"""
List datasets with filtering and pagination.
Args:
request: ListDatasetsRequest with filters and pagination
Returns:
ListDatasetsResponse with matching datasets
"""Manage versioned artifacts with comprehensive metadata, partitioning, and tagging support.
def CreateArtifact(request: CreateArtifactRequest) -> CreateArtifactResponse:
"""
Create a new artifact with metadata and data references.
Args:
request: CreateArtifactRequest with artifact specification
Returns:
CreateArtifactResponse with creation status and artifact ID
Raises:
ALREADY_EXISTS: Artifact with the same ID and partition already exists
INVALID_ARGUMENT: Invalid artifact specification
"""
def GetArtifact(request: GetArtifactRequest) -> GetArtifactResponse:
"""
Retrieve artifact by ID with optional partition filtering.
Args:
request: GetArtifactRequest with artifact identifier and filters
Returns:
GetArtifactResponse with artifact details and data references
Raises:
NOT_FOUND: Artifact with specified ID does not exist
"""
def ListArtifacts(request: ListArtifactsRequest) -> ListArtifactsResponse:
"""
List artifacts with comprehensive filtering, sorting, and pagination.
Args:
request: ListArtifactsRequest with filters and pagination options
Returns:
ListArtifactsResponse with matching artifacts and metadata
"""
def UpdateArtifact(request: UpdateArtifactRequest) -> UpdateArtifactResponse:
"""
Update artifact metadata and data references.
Args:
request: UpdateArtifactRequest with artifact ID and updates
Returns:
UpdateArtifactResponse with update status
Raises:
NOT_FOUND: Artifact with specified ID does not exist
"""Flexible tagging system for artifact organization, discovery, and metadata management.
def AddTag(request: AddTagRequest) -> AddTagResponse:
"""
Add a tag to an artifact for organization and discovery.
Args:
request: AddTagRequest with artifact ID and tag information
Returns:
AddTagResponse with tagging status
Raises:
NOT_FOUND: Artifact with specified ID does not exist
ALREADY_EXISTS: Tag already exists on the artifact
"""Concurrent access coordination through reservation system preventing race conditions in data operations.
def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:
"""
Get or extend a reservation for exclusive access to an artifact.
Args:
request: GetOrExtendReservationRequest with artifact ID and reservation details
Returns:
GetOrExtendReservationResponse with reservation token and status
Raises:
RESOURCE_EXHAUSTED: Unable to acquire reservation due to conflicts
"""
def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:
"""
Release a previously acquired reservation.
Args:
request: ReleaseReservationRequest with reservation token
Returns:
ReleaseReservationResponse with release status
Raises:
NOT_FOUND: Reservation token not found or already released
"""High-performance caching service with concurrent access coordination and metadata management.
class CacheService:
"""Cache service for storing and retrieving task outputs."""
def Get(request: GetCacheRequest) -> GetCacheResponse:
"""
Retrieve cached data by key.
Args:
request: GetCacheRequest with cache key and metadata
Returns:
GetCacheResponse with cached data or cache miss indication
"""
def Put(request: PutCacheRequest) -> PutCacheResponse:
"""
Store data in cache with metadata and expiration.
Args:
request: PutCacheRequest with key, data, and metadata
Returns:
PutCacheResponse with storage status
Raises:
RESOURCE_EXHAUSTED: Cache storage limit exceeded
"""
def Delete(request: DeleteCacheRequest) -> DeleteCacheResponse:
"""
Delete cached data by key.
Args:
request: DeleteCacheRequest with cache key
Returns:
DeleteCacheResponse with deletion status
"""Reservation system for cache operations ensuring consistent concurrent access patterns.
def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:
"""
Get or extend a cache reservation for exclusive write access.
Args:
request: GetOrExtendReservationRequest with cache key and reservation details
Returns:
GetOrExtendReservationResponse with reservation token
"""
def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:
"""
Release a cache reservation.
Args:
request: ReleaseReservationRequest with reservation token
Returns:
ReleaseReservationResponse with release status
"""Service for creating secure upload/download locations and managing data access patterns.
class DataProxyService:
"""Data proxy service for secure data access and transfer."""
def CreateUploadLocation(request: CreateUploadLocationRequest) -> CreateUploadLocationResponse:
"""
Create a secure upload location for data artifacts.
Args:
request: CreateUploadLocationRequest with content type and expiration
Returns:
CreateUploadLocationResponse with signed upload URL and headers
"""
def CreateDownloadLink(request: CreateDownloadLinkRequest) -> CreateDownloadLinkResponse:
"""
Create a secure download link for data artifacts.
Args:
request: CreateDownloadLinkRequest with artifact location and expiration
Returns:
CreateDownloadLinkResponse with signed download URL
"""
def GetData(request: GetDataRequest) -> GetDataResponse:
"""
Retrieve data directly through the proxy service.
Args:
request: GetDataRequest with data location and access parameters
Returns:
GetDataResponse with data payload or redirect information
"""class Dataset:
"""Dataset definition with metadata and partitioning information."""
id: DatasetID
metadata: Metadata
partition_keys: list[str]
class DatasetID:
"""Unique identifier for datasets."""
project: str
name: str
domain: str
version: str
uuid: str
class Metadata:
"""Flexible metadata container for datasets and artifacts."""
key_map: dict[str, str]class Artifact:
"""Versioned artifact with data references and metadata."""
id: ArtifactID
dataset: DatasetID
data: list[ArtifactData]
metadata: Metadata
partitions: list[Partition]
tags: list[Tag]
source: ArtifactSource
class ArtifactID:
"""Unique identifier for artifacts."""
artifact_key: ArtifactKey
version: str
partitions: TimePartition
class ArtifactKey:
"""Key identifying an artifact family."""
project: str
domain: str
name: str
class ArtifactData:
"""Data reference within an artifact."""
name: str
value: Literal
class Partition:
"""Partition specification for data organization."""
key: str
value: str
class Tag:
"""Tag for artifact organization and discovery."""
name: str
artifact_id: str
dataset: DatasetIDclass GetCacheRequest:
"""Request to retrieve cached data."""
task_execution_id: TaskExecutionIdentifier
input_reader: Metadata
cache_version: str
class GetCacheResponse:
"""Response with cached data or cache miss indication."""
entry: CacheEntry
class PutCacheRequest:
"""Request to store data in cache."""
task_execution_id: TaskExecutionIdentifier
input_reader: Metadata
output_reader: Metadata
cache_version: str
class PutCacheResponse:
"""Response indicating cache storage status."""
pass
class CacheEntry:
"""Cache entry with data and metadata."""
outputs: Metadata
source: TaskExecutionIdentifier
class Metadata:
"""Cache metadata container."""
passclass ReservationID:
"""Unique identifier for reservations."""
dataset_id: DatasetID
tag_name: str
class GetOrExtendReservationRequest:
"""Request to acquire or extend a reservation."""
reservation_id: ReservationID
owner_id: str
heartbeat_interval: timedelta
class GetOrExtendReservationResponse:
"""Response with reservation details."""
reservation: Reservation
class Reservation:
"""Active reservation with ownership information."""
reservation_id: ReservationID
owner_id: str
heartbeat_interval: timedelta
expires_at: datetime
class ReleaseReservationRequest:
"""Request to release a reservation."""
reservation_id: ReservationID
owner_id: str
class ReleaseReservationResponse:
"""Response indicating reservation release status."""
passclass CreateUploadLocationRequest:
"""Request to create secure upload location."""
project: str
domain: str
filename: str
expires_in: timedelta
content_md5: bytes
upload_mode: UploadMode
class CreateUploadLocationResponse:
"""Response with upload location and credentials."""
signed_url: str
native_url: str
headers: dict[str, str]
class CreateDownloadLinkRequest:
"""Request to create secure download link."""
artifact_type: ArtifactType
expires_in: timedelta
source: DataSource
class CreateDownloadLinkResponse:
"""Response with download link."""
signed_url: str
expires_at: datetime
class GetDataRequest:
"""Request to retrieve data through proxy."""
flyte_url: str
class GetDataResponse:
"""Response with data or redirect information."""
data: bytes
metadata: dict[str, str]from flyteidl.datacatalog import datacatalog_pb2
# Create dataset identifier
dataset_id = datacatalog_pb2.DatasetID(
project="ml-project",
domain="production",
name="training-data",
version="v2.0.0"
)
# Create dataset
create_request = datacatalog_pb2.CreateDatasetRequest(
dataset=datacatalog_pb2.Dataset(
id=dataset_id,
metadata=datacatalog_pb2.Metadata(
key_map={
"source": "feature-store",
"format": "parquet",
"schema_version": "1.2.0"
}
),
partition_keys=["date", "region"]
)
)
# Use with datacatalog client
response = datacatalog_client.CreateDataset(create_request)# Create artifact with partitions
artifact_id = datacatalog_pb2.ArtifactID(
artifact_key=datacatalog_pb2.ArtifactKey(
project="ml-project",
domain="production",
name="model-predictions"
),
version="2023-12-01",
partitions=datacatalog_pb2.TimePartition(
value=datacatalog_pb2.LabelValue(
static_value="2023-12-01"
)
)
)
# Create artifact with data references
create_artifact_request = datacatalog_pb2.CreateArtifactRequest(
artifact=datacatalog_pb2.Artifact(
id=artifact_id,
dataset=dataset_id,
data=[
datacatalog_pb2.ArtifactData(
name="predictions",
value=literal_pb2.Literal(
scalar=literal_pb2.Scalar(
blob=literal_pb2.Blob(
uri="s3://ml-bucket/predictions/2023-12-01.parquet",
metadata=literal_pb2.BlobMetadata(
type=literal_pb2.BlobType(format="parquet")
)
)
)
)
)
],
partitions=[
datacatalog_pb2.Partition(key="date", value="2023-12-01"),
datacatalog_pb2.Partition(key="model_version", value="v1.5.2")
],
tags=[
datacatalog_pb2.Tag(name="latest"),
datacatalog_pb2.Tag(name="production")
]
)
)from flyteidl.cacheservice import cacheservice_pb2
# Create cache key from task execution
cache_request = cacheservice_pb2.GetCacheRequest(
task_execution_id=task_execution_id,
input_reader=cacheservice_pb2.Metadata(),
cache_version="v1.0.0"
)
# Try to get from cache
cache_response = cache_client.Get(cache_request)
if not cache_response.entry:
# Cache miss - execute task and store result
# ... execute task logic ...
# Store result in cache
put_request = cacheservice_pb2.PutCacheRequest(
task_execution_id=task_execution_id,
input_reader=cacheservice_pb2.Metadata(),
output_reader=cacheservice_pb2.Metadata(),
cache_version="v1.0.0"
)
cache_client.Put(put_request)# Acquire reservation for exclusive access
reservation_id = datacatalog_pb2.ReservationID(
dataset_id=dataset_id,
tag_name="latest"
)
get_reservation_request = datacatalog_pb2.GetOrExtendReservationRequest(
reservation_id=reservation_id,
owner_id="workflow-execution-123",
heartbeat_interval=timedelta(minutes=5)
)
reservation_response = datacatalog_client.GetOrExtendReservation(get_reservation_request)
try:
# Perform exclusive operations
# ... update dataset ...
finally:
# Always release reservation
release_request = datacatalog_pb2.ReleaseReservationRequest(
reservation_id=reservation_id,
owner_id="workflow-execution-123"
)
datacatalog_client.ReleaseReservation(release_request)from flyteidl.service import dataproxy_pb2
# Create upload location
upload_request = dataproxy_pb2.CreateUploadLocationRequest(
project="ml-project",
domain="development",
filename="training-data.parquet",
expires_in=timedelta(hours=1),
content_md5=b"...", # MD5 hash of content
upload_mode=dataproxy_pb2.UploadMode.STREAMING
)
upload_response = dataproxy_client.CreateUploadLocation(upload_request)
# Use signed URL to upload data
# requests.put(upload_response.signed_url, data=file_content, headers=upload_response.headers)
# Create download link
download_request = dataproxy_pb2.CreateDownloadLinkRequest(
artifact_type=dataproxy_pb2.ArtifactType.INPUTS,
expires_in=timedelta(minutes=30),
source=dataproxy_pb2.DataSource(
execution_id=execution_id,
node_id="data-processing-node"
)
)
download_response = dataproxy_client.CreateDownloadLink(download_request)
# Use download_response.signed_url to retrieve dataInstall with Tessl CLI
npx tessl i tessl/pypi-flyteidl