CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-gcp

Google Cloud Platform integration components for the Dagster data orchestration framework.

Pending
Overview
Eval results
Files

gcs.mddocs/

Google Cloud Storage (GCS)

Comprehensive Google Cloud Storage integration for Dagster providing file storage and management capabilities, I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing workflows.

Capabilities

GCS Resource

Configurable resource for GCS client management.

class GCSResource(ConfigurableResource):
    """Resource for GCS client management."""
    project: Optional[str]  # GCP project name
    
    def get_client(self) -> storage.Client:
        """Create authenticated GCS client."""

@resource(
    config_schema=GCSResource.to_config_schema(),
    description="This resource provides a GCS client"
)
def gcs_resource(init_context) -> storage.Client:
    """Legacy GCS resource factory that returns a GCS client."""

File Manager Resource

Resource providing GCS file manager functionality for direct file operations.

class GCSFileManagerResource(ConfigurableResource):
    """Resource providing GCS file manager functionality."""
    project: Optional[str]  # GCP project name
    gcs_bucket: str  # GCS bucket name
    gcs_prefix: str = "dagster"  # Path prefix
    
    def get_client(self) -> GCSFileManager:
        """Create GCS file manager."""

def gcs_file_manager(config_schema=None) -> ResourceDefinition:
    """Legacy GCS file manager factory."""

I/O Managers

I/O managers for storing and retrieving objects in GCS.

class GCSPickleIOManager(ConfigurableIOManager):
    """I/O manager for storing pickled objects in GCS."""
    gcs: ResourceDependency[GCSResource]  # GCS resource dependency
    gcs_bucket: str  # GCS bucket name
    gcs_prefix: str = "dagster"  # Path prefix
    
    def load_input(self, context) -> Any:
        """Load input from GCS."""
    
    def handle_output(self, context, obj) -> None:
        """Store output to GCS."""

class PickledObjectGCSIOManager(UPathIOManager):
    """Lower-level I/O manager implementation."""
    
    def load_from_path(self, context, path) -> Any:
        """Load object from GCS path."""
    
    def dump_to_path(self, context, obj, path) -> None:
        """Store object to GCS path."""
    
    def path_exists(self, path) -> bool:
        """Check if path exists."""
    
    def unlink(self, path) -> None:
        """Delete object."""

# Legacy aliases
ConfigurablePickledObjectGCSIOManager = GCSPickleIOManager

def gcs_pickle_io_manager(config_schema=None) -> IOManagerDefinition:
    """Legacy GCS pickle I/O manager factory."""

File Management

Direct file operations and handles for GCS objects.

class GCSFileHandle(FileHandle):
    """Reference to a file stored in GCS."""
    
    @property
    def gcs_bucket(self) -> str: ...  # GCS bucket name
    
    @property
    def gcs_key(self) -> str: ...  # Object key in bucket
    
    @property
    def gcs_path(self) -> str: ...  # Full gs:// path
    
    @property
    def path_desc(self) -> str: ...  # Path description

class GCSFileManager(FileManager):
    """File manager implementation for GCS operations."""
    
    def read(self, file_handle, mode) -> Any:
        """Read file content."""
    
    def write(self, file_obj, mode, ext, key) -> GCSFileHandle:
        """Write file to GCS."""
    
    def write_data(self, data, ext, key) -> GCSFileHandle:
        """Write bytes data to GCS."""
    
    def copy_handle_to_local_temp(self, file_handle) -> str:
        """Copy GCS file to local temp."""
    
    def delete_local_temp(self) -> None:
        """Clean up local temp files."""

Compute Log Management

Storage and management of compute logs in GCS.

class GCSComputeLogManager(ConfigurableClass, TruncatingCloudStorageComputeLogManager):
    """Manages compute logs storage in GCS."""
    bucket: str  # GCS bucket name
    local_dir: Optional[str]  # Local staging directory
    prefix: Optional[str] = "dagster"  # Key prefix
    json_credentials_envvar: Optional[str]  # Environment variable with credentials
    upload_interval: Optional[int]  # Upload interval in seconds
    show_url_only: Optional[bool] = False  # Only show URLs instead of content
    
    def capture_logs(self, log_key):
        """Context manager for log capture."""
    
    def delete_logs(self, log_key, prefix):
        """Delete logs from GCS."""
    
    def download_url_for_type(self, log_key, io_type):
        """Get signed download URL."""
    
    def display_path_for_type(self, log_key, io_type):
        """Get display path."""

Sensor Utilities

Utilities for GCS-based sensors and data monitoring.

def get_gcs_keys(
    bucket: str,
    prefix: Optional[str] = None,
    since_key: Optional[str] = None,
    gcs_session: Optional[Client] = None
) -> List[str]:
    """
    Utility function for GCS-based sensors.
    
    Parameters:
    - bucket: GCS bucket name
    - prefix: Key prefix filter
    - since_key: Starting key for incremental processing
    - gcs_session: GCS client session
    
    Returns:
    List of updated keys
    """

Testing Utilities

Mock GCS classes for testing without actual GCS connectivity.

class FakeGCSBlob:
    """Mock GCS blob for testing."""

class FakeGCSBucket:
    """Mock GCS bucket for testing."""

class FakeGCSClient:
    """Mock GCS client for testing."""

class FakeConfigurableGCSClient:
    """Mock configurable client for testing."""

Usage Examples

Basic Resource Usage

from dagster import asset, Definitions
from dagster_gcp import GCSResource

@asset
def process_gcs_file(gcs: GCSResource):
    client = gcs.get_client()
    bucket = client.bucket("my-data-bucket")
    blob = bucket.blob("data/input.csv")
    
    # Download and process file
    content = blob.download_as_text()
    processed_data = content.upper()  # Simple processing
    
    # Upload processed result
    output_blob = bucket.blob("data/output.csv")
    output_blob.upload_from_string(processed_data)
    
    return f"Processed {len(content)} characters"

defs = Definitions(
    assets=[process_gcs_file],
    resources={
        "gcs": GCSResource(project="my-gcp-project")
    }
)

I/O Manager Usage

from dagster import asset, Definitions
from dagster_gcp import GCSPickleIOManager, GCSResource
import pandas as pd

@asset
def sales_data():
    # This DataFrame will be pickled and stored in GCS
    return pd.DataFrame({
        'product': ['A', 'B', 'C'],
        'sales': [100, 200, 150],
        'date': ['2024-01-01', '2024-01-02', '2024-01-03']
    })

@asset 
def sales_summary(sales_data):
    # sales_data will be loaded from GCS automatically
    return {
        'total_sales': sales_data['sales'].sum(),
        'avg_sales': sales_data['sales'].mean(),
        'product_count': len(sales_data)
    }

defs = Definitions(
    assets=[sales_data, sales_summary],
    resources={
        "io_manager": GCSPickleIOManager(
            gcs_bucket="my-data-bucket",
            gcs_prefix="dagster/storage",
            gcs=GCSResource(project="my-gcp-project")
        )
    }
)

File Manager Usage

from dagster import op, job, Definitions
from dagster_gcp import GCSFileManagerResource

@op
def create_report(gcs_file_manager: GCSFileManagerResource):
    file_manager = gcs_file_manager.get_client()
    
    # Create a report
    report_content = "Sales Report\n============\nTotal: $10,000"
    
    # Write to GCS
    file_handle = file_manager.write_data(
        data=report_content.encode(),
        ext=".txt",
        key="reports/daily_sales"
    )
    
    return file_handle.gcs_path

@job
def generate_report():
    create_report()

defs = Definitions(
    jobs=[generate_report],
    resources={
        "gcs_file_manager": GCSFileManagerResource(
            project="my-gcp-project",
            gcs_bucket="my-reports-bucket",
            gcs_prefix="reports"
        )
    }
)

Sensor with GCS Keys

from dagster import sensor, RunRequest, Definitions
from dagster_gcp import get_gcs_keys, GCSResource

@sensor(jobs=[process_new_files])
def gcs_sensor(gcs: GCSResource):
    client = gcs.get_client()
    
    # Get new files since last run
    new_keys = get_gcs_keys(
        bucket="my-input-bucket",
        prefix="incoming/",
        gcs_session=client
    )
    
    for key in new_keys:
        yield RunRequest(
            run_key=key,
            run_config={
                "ops": {
                    "process_file": {
                        "config": {"file_path": f"gs://my-input-bucket/{key}"}
                    }
                }
            }
        )

defs = Definitions(
    sensors=[gcs_sensor],
    resources={
        "gcs": GCSResource(project="my-gcp-project")
    }
)

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-gcp

docs

bigquery.md

dataproc.md

gcs.md

index.md

pipes.md

tile.json