Google Cloud Platform integration components for the Dagster data orchestration framework.
—
Comprehensive Google Cloud Storage integration for Dagster providing file storage and management capabilities, I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing workflows.
Configurable resource for GCS client management.
class GCSResource(ConfigurableResource):
"""Resource for GCS client management."""
project: Optional[str] # GCP project name
def get_client(self) -> storage.Client:
"""Create authenticated GCS client."""
@resource(
config_schema=GCSResource.to_config_schema(),
description="This resource provides a GCS client"
)
def gcs_resource(init_context) -> storage.Client:
"""Legacy GCS resource factory that returns a GCS client."""Resource providing GCS file manager functionality for direct file operations.
class GCSFileManagerResource(ConfigurableResource):
"""Resource providing GCS file manager functionality."""
project: Optional[str] # GCP project name
gcs_bucket: str # GCS bucket name
gcs_prefix: str = "dagster" # Path prefix
def get_client(self) -> GCSFileManager:
"""Create GCS file manager."""
def gcs_file_manager(config_schema=None) -> ResourceDefinition:
"""Legacy GCS file manager factory."""I/O managers for storing and retrieving objects in GCS.
class GCSPickleIOManager(ConfigurableIOManager):
"""I/O manager for storing pickled objects in GCS."""
gcs: ResourceDependency[GCSResource] # GCS resource dependency
gcs_bucket: str # GCS bucket name
gcs_prefix: str = "dagster" # Path prefix
def load_input(self, context) -> Any:
"""Load input from GCS."""
def handle_output(self, context, obj) -> None:
"""Store output to GCS."""
class PickledObjectGCSIOManager(UPathIOManager):
"""Lower-level I/O manager implementation."""
def load_from_path(self, context, path) -> Any:
"""Load object from GCS path."""
def dump_to_path(self, context, obj, path) -> None:
"""Store object to GCS path."""
def path_exists(self, path) -> bool:
"""Check if path exists."""
def unlink(self, path) -> None:
"""Delete object."""
# Legacy aliases
ConfigurablePickledObjectGCSIOManager = GCSPickleIOManager
def gcs_pickle_io_manager(config_schema=None) -> IOManagerDefinition:
"""Legacy GCS pickle I/O manager factory."""Direct file operations and handles for GCS objects.
class GCSFileHandle(FileHandle):
"""Reference to a file stored in GCS."""
@property
def gcs_bucket(self) -> str: ... # GCS bucket name
@property
def gcs_key(self) -> str: ... # Object key in bucket
@property
def gcs_path(self) -> str: ... # Full gs:// path
@property
def path_desc(self) -> str: ... # Path description
class GCSFileManager(FileManager):
"""File manager implementation for GCS operations."""
def read(self, file_handle, mode) -> Any:
"""Read file content."""
def write(self, file_obj, mode, ext, key) -> GCSFileHandle:
"""Write file to GCS."""
def write_data(self, data, ext, key) -> GCSFileHandle:
"""Write bytes data to GCS."""
def copy_handle_to_local_temp(self, file_handle) -> str:
"""Copy GCS file to local temp."""
def delete_local_temp(self) -> None:
"""Clean up local temp files."""Storage and management of compute logs in GCS.
class GCSComputeLogManager(ConfigurableClass, TruncatingCloudStorageComputeLogManager):
"""Manages compute logs storage in GCS."""
bucket: str # GCS bucket name
local_dir: Optional[str] # Local staging directory
prefix: Optional[str] = "dagster" # Key prefix
json_credentials_envvar: Optional[str] # Environment variable with credentials
upload_interval: Optional[int] # Upload interval in seconds
show_url_only: Optional[bool] = False # Only show URLs instead of content
def capture_logs(self, log_key):
"""Context manager for log capture."""
def delete_logs(self, log_key, prefix):
"""Delete logs from GCS."""
def download_url_for_type(self, log_key, io_type):
"""Get signed download URL."""
def display_path_for_type(self, log_key, io_type):
"""Get display path."""Utilities for GCS-based sensors and data monitoring.
def get_gcs_keys(
bucket: str,
prefix: Optional[str] = None,
since_key: Optional[str] = None,
gcs_session: Optional[Client] = None
) -> List[str]:
"""
Utility function for GCS-based sensors.
Parameters:
- bucket: GCS bucket name
- prefix: Key prefix filter
- since_key: Starting key for incremental processing
- gcs_session: GCS client session
Returns:
List of updated keys
"""Mock GCS classes for testing without actual GCS connectivity.
class FakeGCSBlob:
"""Mock GCS blob for testing."""
class FakeGCSBucket:
"""Mock GCS bucket for testing."""
class FakeGCSClient:
"""Mock GCS client for testing."""
class FakeConfigurableGCSClient:
"""Mock configurable client for testing."""from dagster import asset, Definitions
from dagster_gcp import GCSResource
@asset
def process_gcs_file(gcs: GCSResource):
client = gcs.get_client()
bucket = client.bucket("my-data-bucket")
blob = bucket.blob("data/input.csv")
# Download and process file
content = blob.download_as_text()
processed_data = content.upper() # Simple processing
# Upload processed result
output_blob = bucket.blob("data/output.csv")
output_blob.upload_from_string(processed_data)
return f"Processed {len(content)} characters"
defs = Definitions(
assets=[process_gcs_file],
resources={
"gcs": GCSResource(project="my-gcp-project")
}
)from dagster import asset, Definitions
from dagster_gcp import GCSPickleIOManager, GCSResource
import pandas as pd
@asset
def sales_data():
# This DataFrame will be pickled and stored in GCS
return pd.DataFrame({
'product': ['A', 'B', 'C'],
'sales': [100, 200, 150],
'date': ['2024-01-01', '2024-01-02', '2024-01-03']
})
@asset
def sales_summary(sales_data):
# sales_data will be loaded from GCS automatically
return {
'total_sales': sales_data['sales'].sum(),
'avg_sales': sales_data['sales'].mean(),
'product_count': len(sales_data)
}
defs = Definitions(
assets=[sales_data, sales_summary],
resources={
"io_manager": GCSPickleIOManager(
gcs_bucket="my-data-bucket",
gcs_prefix="dagster/storage",
gcs=GCSResource(project="my-gcp-project")
)
}
)from dagster import op, job, Definitions
from dagster_gcp import GCSFileManagerResource
@op
def create_report(gcs_file_manager: GCSFileManagerResource):
file_manager = gcs_file_manager.get_client()
# Create a report
report_content = "Sales Report\n============\nTotal: $10,000"
# Write to GCS
file_handle = file_manager.write_data(
data=report_content.encode(),
ext=".txt",
key="reports/daily_sales"
)
return file_handle.gcs_path
@job
def generate_report():
create_report()
defs = Definitions(
jobs=[generate_report],
resources={
"gcs_file_manager": GCSFileManagerResource(
project="my-gcp-project",
gcs_bucket="my-reports-bucket",
gcs_prefix="reports"
)
}
)from dagster import sensor, RunRequest, Definitions
from dagster_gcp import get_gcs_keys, GCSResource
@sensor(jobs=[process_new_files])
def gcs_sensor(gcs: GCSResource):
client = gcs.get_client()
# Get new files since last run
new_keys = get_gcs_keys(
bucket="my-input-bucket",
prefix="incoming/",
gcs_session=client
)
for key in new_keys:
yield RunRequest(
run_key=key,
run_config={
"ops": {
"process_file": {
"config": {"file_path": f"gs://my-input-bucket/{key}"}
}
}
}
)
defs = Definitions(
sensors=[gcs_sensor],
resources={
"gcs": GCSResource(project="my-gcp-project")
}
)Install with Tessl CLI
npx tessl i tessl/pypi-dagster-gcp