CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flyteidl

IDL for Flyte Platform containing protobuf specifications, gRPC API definitions, and generated clients for multiple languages

Pending
Overview
Eval results
Files

data-management.mddocs/

Data Management

Data catalog and caching services providing versioned artifact storage, metadata management, tagging systems, and concurrent access coordination. These services enable efficient data sharing, lineage tracking, and performance optimization across workflow executions in the Flyte platform.

Capabilities

Data Catalog Service

Comprehensive data catalog providing versioned artifact storage with metadata, partitioning, and lineage tracking capabilities.

class DataCatalogService:
    """Data catalog service for artifact and dataset management."""
    
    def CreateDataset(request: CreateDatasetRequest) -> CreateDatasetResponse:
        """
        Create a new dataset definition.
        
        Args:
            request: CreateDatasetRequest with dataset specification
            
        Returns:
            CreateDatasetResponse with creation status
            
        Raises:
            ALREADY_EXISTS: Dataset with the same ID already exists
        """
    
    def GetDataset(request: GetDatasetRequest) -> GetDatasetResponse:
        """
        Retrieve dataset information by ID.
        
        Args:
            request: GetDatasetRequest with dataset identifier
            
        Returns:
            GetDatasetResponse with dataset details
            
        Raises:
            NOT_FOUND: Dataset with specified ID does not exist
        """
    
    def ListDatasets(request: ListDatasetsRequest) -> ListDatasetsResponse:
        """
        List datasets with filtering and pagination.
        
        Args:
            request: ListDatasetsRequest with filters and pagination
            
        Returns:
            ListDatasetsResponse with matching datasets
        """

Artifact Management

Manage versioned artifacts with comprehensive metadata, partitioning, and tagging support.

def CreateArtifact(request: CreateArtifactRequest) -> CreateArtifactResponse:
    """
    Create a new artifact with metadata and data references.
    
    Args:
        request: CreateArtifactRequest with artifact specification
        
    Returns:
        CreateArtifactResponse with creation status and artifact ID
        
    Raises:
        ALREADY_EXISTS: Artifact with the same ID and partition already exists
        INVALID_ARGUMENT: Invalid artifact specification
    """

def GetArtifact(request: GetArtifactRequest) -> GetArtifactResponse:
    """
    Retrieve artifact by ID with optional partition filtering.
    
    Args:
        request: GetArtifactRequest with artifact identifier and filters
        
    Returns:
        GetArtifactResponse with artifact details and data references
        
    Raises:
        NOT_FOUND: Artifact with specified ID does not exist
    """

def ListArtifacts(request: ListArtifactsRequest) -> ListArtifactsResponse:
    """
    List artifacts with comprehensive filtering, sorting, and pagination.
    
    Args:
        request: ListArtifactsRequest with filters and pagination options
        
    Returns:
        ListArtifactsResponse with matching artifacts and metadata
    """

def UpdateArtifact(request: UpdateArtifactRequest) -> UpdateArtifactResponse:
    """
    Update artifact metadata and data references.
    
    Args:
        request: UpdateArtifactRequest with artifact ID and updates
        
    Returns:
        UpdateArtifactResponse with update status
        
    Raises:
        NOT_FOUND: Artifact with specified ID does not exist
    """

Tagging System

Flexible tagging system for artifact organization, discovery, and metadata management.

def AddTag(request: AddTagRequest) -> AddTagResponse:
    """
    Add a tag to an artifact for organization and discovery.
    
    Args:
        request: AddTagRequest with artifact ID and tag information
        
    Returns:
        AddTagResponse with tagging status
        
    Raises:
        NOT_FOUND: Artifact with specified ID does not exist
        ALREADY_EXISTS: Tag already exists on the artifact
    """

Reservation System

Concurrent access coordination through reservation system preventing race conditions in data operations.

def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:
    """
    Get or extend a reservation for exclusive access to an artifact.
    
    Args:
        request: GetOrExtendReservationRequest with artifact ID and reservation details
        
    Returns:
        GetOrExtendReservationResponse with reservation token and status
        
    Raises:
        RESOURCE_EXHAUSTED: Unable to acquire reservation due to conflicts
    """

def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:
    """
    Release a previously acquired reservation.
    
    Args:
        request: ReleaseReservationRequest with reservation token
        
    Returns:
        ReleaseReservationResponse with release status
        
    Raises:
        NOT_FOUND: Reservation token not found or already released
    """

Cache Service

High-performance caching service with concurrent access coordination and metadata management.

class CacheService:
    """Cache service for storing and retrieving task outputs."""
    
    def Get(request: GetCacheRequest) -> GetCacheResponse:
        """
        Retrieve cached data by key.
        
        Args:
            request: GetCacheRequest with cache key and metadata
            
        Returns:
            GetCacheResponse with cached data or cache miss indication
        """
    
    def Put(request: PutCacheRequest) -> PutCacheResponse:
        """
        Store data in cache with metadata and expiration.
        
        Args:
            request: PutCacheRequest with key, data, and metadata
            
        Returns:
            PutCacheResponse with storage status
            
        Raises:
            RESOURCE_EXHAUSTED: Cache storage limit exceeded
        """
    
    def Delete(request: DeleteCacheRequest) -> DeleteCacheResponse:
        """
        Delete cached data by key.
        
        Args:
            request: DeleteCacheRequest with cache key
            
        Returns:
            DeleteCacheResponse with deletion status
        """

Cache Reservations

Reservation system for cache operations ensuring consistent concurrent access patterns.

def GetOrExtendReservation(request: GetOrExtendReservationRequest) -> GetOrExtendReservationResponse:
    """
    Get or extend a cache reservation for exclusive write access.
    
    Args:
        request: GetOrExtendReservationRequest with cache key and reservation details
        
    Returns:
        GetOrExtendReservationResponse with reservation token
    """

def ReleaseReservation(request: ReleaseReservationRequest) -> ReleaseReservationResponse:
    """
    Release a cache reservation.
    
    Args:
        request: ReleaseReservationRequest with reservation token
        
    Returns:
        ReleaseReservationResponse with release status
    """

Data Proxy Service

Service for creating secure upload/download locations and managing data access patterns.

class DataProxyService:
    """Data proxy service for secure data access and transfer."""
    
    def CreateUploadLocation(request: CreateUploadLocationRequest) -> CreateUploadLocationResponse:
        """
        Create a secure upload location for data artifacts.
        
        Args:
            request: CreateUploadLocationRequest with content type and expiration
            
        Returns:
            CreateUploadLocationResponse with signed upload URL and headers
        """
    
    def CreateDownloadLink(request: CreateDownloadLinkRequest) -> CreateDownloadLinkResponse:
        """
        Create a secure download link for data artifacts.
        
        Args:
            request: CreateDownloadLinkRequest with artifact location and expiration
            
        Returns:
            CreateDownloadLinkResponse with signed download URL
        """
    
    def GetData(request: GetDataRequest) -> GetDataResponse:
        """
        Retrieve data directly through the proxy service.
        
        Args:
            request: GetDataRequest with data location and access parameters
            
        Returns:
            GetDataResponse with data payload or redirect information
        """

Types

Dataset Types

class Dataset:
    """Dataset definition with metadata and partitioning information."""
    id: DatasetID
    metadata: Metadata
    partition_keys: list[str]

class DatasetID:
    """Unique identifier for datasets."""
    project: str
    name: str
    domain: str
    version: str
    uuid: str

class Metadata:
    """Flexible metadata container for datasets and artifacts."""
    key_map: dict[str, str]

Artifact Types

class Artifact:
    """Versioned artifact with data references and metadata."""
    id: ArtifactID
    dataset: DatasetID
    data: list[ArtifactData]
    metadata: Metadata
    partitions: list[Partition]
    tags: list[Tag]
    source: ArtifactSource

class ArtifactID:
    """Unique identifier for artifacts."""
    artifact_key: ArtifactKey
    version: str
    partitions: TimePartition

class ArtifactKey:
    """Key identifying an artifact family."""
    project: str
    domain: str
    name: str

class ArtifactData:
    """Data reference within an artifact."""
    name: str
    value: Literal

class Partition:
    """Partition specification for data organization."""
    key: str
    value: str

class Tag:
    """Tag for artifact organization and discovery."""
    name: str
    artifact_id: str
    dataset: DatasetID

Cache Types

class GetCacheRequest:
    """Request to retrieve cached data."""
    task_execution_id: TaskExecutionIdentifier
    input_reader: Metadata
    cache_version: str

class GetCacheResponse:
    """Response with cached data or cache miss indication."""
    entry: CacheEntry

class PutCacheRequest:
    """Request to store data in cache."""
    task_execution_id: TaskExecutionIdentifier
    input_reader: Metadata  
    output_reader: Metadata
    cache_version: str

class PutCacheResponse:
    """Response indicating cache storage status."""
    pass

class CacheEntry:
    """Cache entry with data and metadata."""
    outputs: Metadata
    source: TaskExecutionIdentifier

class Metadata:
    """Cache metadata container."""
    pass

Reservation Types

class ReservationID:
    """Unique identifier for reservations."""
    dataset_id: DatasetID
    tag_name: str

class GetOrExtendReservationRequest:
    """Request to acquire or extend a reservation."""
    reservation_id: ReservationID
    owner_id: str
    heartbeat_interval: timedelta

class GetOrExtendReservationResponse:
    """Response with reservation details."""
    reservation: Reservation

class Reservation:
    """Active reservation with ownership information."""
    reservation_id: ReservationID
    owner_id: str
    heartbeat_interval: timedelta
    expires_at: datetime

class ReleaseReservationRequest:
    """Request to release a reservation."""
    reservation_id: ReservationID
    owner_id: str

class ReleaseReservationResponse:
    """Response indicating reservation release status."""
    pass

Data Proxy Types

class CreateUploadLocationRequest:
    """Request to create secure upload location."""
    project: str
    domain: str
    filename: str
    expires_in: timedelta
    content_md5: bytes
    upload_mode: UploadMode

class CreateUploadLocationResponse:
    """Response with upload location and credentials."""
    signed_url: str
    native_url: str
    headers: dict[str, str]

class CreateDownloadLinkRequest:
    """Request to create secure download link."""
    artifact_type: ArtifactType
    expires_in: timedelta
    source: DataSource

class CreateDownloadLinkResponse:
    """Response with download link."""
    signed_url: str
    expires_at: datetime

class GetDataRequest:
    """Request to retrieve data through proxy."""
    flyte_url: str

class GetDataResponse:
    """Response with data or redirect information."""
    data: bytes
    metadata: dict[str, str]

Usage Examples

Creating and Managing Datasets

from flyteidl.datacatalog import datacatalog_pb2

# Create dataset identifier
dataset_id = datacatalog_pb2.DatasetID(
    project="ml-project",
    domain="production",
    name="training-data",
    version="v2.0.0"
)

# Create dataset
create_request = datacatalog_pb2.CreateDatasetRequest(
    dataset=datacatalog_pb2.Dataset(
        id=dataset_id,
        metadata=datacatalog_pb2.Metadata(
            key_map={
                "source": "feature-store",
                "format": "parquet",
                "schema_version": "1.2.0"
            }
        ),
        partition_keys=["date", "region"]
    )
)

# Use with datacatalog client
response = datacatalog_client.CreateDataset(create_request)

Artifact Management with Partitioning

# Create artifact with partitions
artifact_id = datacatalog_pb2.ArtifactID(
    artifact_key=datacatalog_pb2.ArtifactKey(
        project="ml-project",
        domain="production", 
        name="model-predictions"
    ),
    version="2023-12-01",
    partitions=datacatalog_pb2.TimePartition(
        value=datacatalog_pb2.LabelValue(
            static_value="2023-12-01"
        )
    )
)

# Create artifact with data references
create_artifact_request = datacatalog_pb2.CreateArtifactRequest(
    artifact=datacatalog_pb2.Artifact(
        id=artifact_id,
        dataset=dataset_id,
        data=[
            datacatalog_pb2.ArtifactData(
                name="predictions",
                value=literal_pb2.Literal(
                    scalar=literal_pb2.Scalar(
                        blob=literal_pb2.Blob(
                            uri="s3://ml-bucket/predictions/2023-12-01.parquet",
                            metadata=literal_pb2.BlobMetadata(
                                type=literal_pb2.BlobType(format="parquet")
                            )
                        )
                    )
                )
            )
        ],
        partitions=[
            datacatalog_pb2.Partition(key="date", value="2023-12-01"),
            datacatalog_pb2.Partition(key="model_version", value="v1.5.2")
        ],
        tags=[
            datacatalog_pb2.Tag(name="latest"),
            datacatalog_pb2.Tag(name="production")
        ]
    )
)

Cache Operations

from flyteidl.cacheservice import cacheservice_pb2

# Create cache key from task execution
cache_request = cacheservice_pb2.GetCacheRequest(
    task_execution_id=task_execution_id,
    input_reader=cacheservice_pb2.Metadata(),
    cache_version="v1.0.0"
)

# Try to get from cache
cache_response = cache_client.Get(cache_request)

if not cache_response.entry:
    # Cache miss - execute task and store result
    # ... execute task logic ...
    
    # Store result in cache
    put_request = cacheservice_pb2.PutCacheRequest(
        task_execution_id=task_execution_id,
        input_reader=cacheservice_pb2.Metadata(),
        output_reader=cacheservice_pb2.Metadata(),
        cache_version="v1.0.0"
    )
    cache_client.Put(put_request)

Reservation Management

# Acquire reservation for exclusive access
reservation_id = datacatalog_pb2.ReservationID(
    dataset_id=dataset_id,
    tag_name="latest"
)

get_reservation_request = datacatalog_pb2.GetOrExtendReservationRequest(
    reservation_id=reservation_id,
    owner_id="workflow-execution-123", 
    heartbeat_interval=timedelta(minutes=5)
)

reservation_response = datacatalog_client.GetOrExtendReservation(get_reservation_request)

try:
    # Perform exclusive operations
    # ... update dataset ...
    
finally:
    # Always release reservation
    release_request = datacatalog_pb2.ReleaseReservationRequest(
        reservation_id=reservation_id,
        owner_id="workflow-execution-123"
    )
    datacatalog_client.ReleaseReservation(release_request)

Data Proxy Usage

from flyteidl.service import dataproxy_pb2

# Create upload location
upload_request = dataproxy_pb2.CreateUploadLocationRequest(
    project="ml-project",
    domain="development",
    filename="training-data.parquet",
    expires_in=timedelta(hours=1),
    content_md5=b"...",  # MD5 hash of content
    upload_mode=dataproxy_pb2.UploadMode.STREAMING
)

upload_response = dataproxy_client.CreateUploadLocation(upload_request)

# Use signed URL to upload data
# requests.put(upload_response.signed_url, data=file_content, headers=upload_response.headers)

# Create download link
download_request = dataproxy_pb2.CreateDownloadLinkRequest(
    artifact_type=dataproxy_pb2.ArtifactType.INPUTS,
    expires_in=timedelta(minutes=30),
    source=dataproxy_pb2.DataSource(
        execution_id=execution_id,
        node_id="data-processing-node"
    )
)

download_response = dataproxy_client.CreateDownloadLink(download_request)
# Use download_response.signed_url to retrieve data

Install with Tessl CLI

npx tessl i tessl/pypi-flyteidl

docs

admin-api.md

core-types.md

data-management.md

index.md

multi-language.md

plugins.md

tasks-workflows.md

tile.json