Package for AWS-specific Dagster framework solid and resource components.
—
Connect to and manage Amazon RDS instances for relational database operations within Dagster pipelines.
class RDSResource(ResourceWithBoto3Configuration):
"""
Resource for interacting with Amazon RDS.
"""
def get_connection(self, db_instance_identifier: str):
"""
Get connection to RDS database instance.
Parameters:
db_instance_identifier: RDS instance identifier
Returns:
Connection: Database connection object
"""
def describe_db_instances(self) -> List[Dict]:
"""
List available RDS instances.
Returns:
List[Dict]: RDS instance descriptions
"""
def create_db_snapshot(
self,
db_instance_identifier: str,
snapshot_identifier: str
) -> Dict:
"""
Create RDS database snapshot.
Parameters:
db_instance_identifier: Source RDS instance
snapshot_identifier: Snapshot name
Returns:
Dict: Snapshot creation response
"""from dagster import op, job, Definitions
from dagster_aws.rds import RDSResource
@op(required_resource_keys={"rds"})
def backup_database(context):
rds = context.resources.rds
# Create database snapshot
snapshot_id = f"backup-{context.run_id}"
response = rds.create_db_snapshot(
db_instance_identifier="my-prod-database",
snapshot_identifier=snapshot_id
)
context.log.info(f"Created snapshot: {snapshot_id}")
return response
@job(
resource_defs={
"rds": RDSResource(region_name="us-west-2")
}
)
def database_backup_job():
backup_database()
defs = Definitions(jobs=[database_backup_job])Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws