CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

glue-processing.mddocs/

Glue Data Processing

AWS Glue integration for serverless ETL (Extract, Transform, Load) operations and data catalog management. Provides job execution, crawler management, and data preparation capabilities for building scalable data processing workflows.

Capabilities

Glue Job Hook

Core Glue client providing job execution and monitoring functionality.

class GlueJobHook(AwsBaseHook):
    def __init__(self, job_name: str = None, desc: str = None, concurrent_run_limit: int = 1, script_location: str = None, retry_limit: int = 0, num_of_dpus: int = None, aws_conn_id: str = 'aws_default', region_name: str = None, s3_bucket: str = None, iam_role_name: str = None, create_job_kwargs: dict = None, **kwargs):
        """
        Initialize Glue Job Hook.
        
        Parameters:
        - job_name: Name of the Glue job
        - desc: Job description
        - concurrent_run_limit: Maximum concurrent runs
        - script_location: S3 location of job script
        - retry_limit: Number of retries on failure
        - num_of_dpus: Number of DPUs allocated to job
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        - s3_bucket: S3 bucket for job artifacts
        - iam_role_name: IAM role for job execution
        - create_job_kwargs: Additional job creation parameters
        """

    def list_jobs(self) -> list:
        """
        List all Glue jobs.
        
        Returns:
        List of job names
        """

    def get_job_state(self, job_name: str, run_id: str) -> str:
        """
        Get state of specific Glue job run.
        
        Parameters:
        - job_name: Name of the Glue job
        - run_id: Job run ID
        
        Returns:
        Current job run state
        """

    def initialize_job(self, job_name: str, arguments: dict = None) -> dict:
        """
        Initialize and start Glue job.
        
        Parameters:
        - job_name: Name of the Glue job
        - arguments: Job arguments dictionary
        
        Returns:
        Job run information
        """

    def get_or_create_glue_job(self) -> str:
        """
        Get existing Glue job or create new one.
        
        Returns:
        Job name
        """

    def get_job_run(self, run_id: str, job_name: str) -> dict:
        """
        Get details of specific job run.
        
        Parameters:
        - run_id: Job run ID
        - job_name: Name of the Glue job
        
        Returns:
        Job run details
        """

Glue Crawler Hook

Hook for managing Glue crawlers that discover and catalog data.

class GlueCrawlerHook(AwsBaseHook):
    def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Initialize Glue Crawler Hook.
        
        Parameters:
        - crawler_name: Name of the Glue crawler
        - aws_conn_id: AWS connection ID
        """

    def get_crawler(self, name: str) -> dict:
        """
        Get crawler configuration.
        
        Parameters:
        - name: Crawler name
        
        Returns:
        Crawler configuration and state
        """

    def start_crawler(self, name: str) -> dict:
        """
        Start Glue crawler.
        
        Parameters:
        - name: Crawler name
        
        Returns:
        Start crawler response
        """

    def stop_crawler(self, name: str) -> dict:
        """
        Stop running crawler.
        
        Parameters:
        - name: Crawler name
        
        Returns:
        Stop crawler response
        """

    def get_crawler_metrics(self, crawler_names: list = None) -> dict:
        """
        Get crawler metrics.
        
        Parameters:
        - crawler_names: List of crawler names
        
        Returns:
        Crawler metrics
        """

Glue Operators

Task implementations for Glue job and crawler operations.

class GlueJobOperator(BaseOperator):
    def __init__(self, job_name: str = 'aws_glue_default_job', job_desc: str = 'AWS Glue Job with Airflow', script_location: str = None, concurrent_run_limit: int = 1, script_args: dict = None, retry_limit: int = 0, num_of_dpus: int = 10, aws_conn_id: str = 'aws_default', region_name: str = None, s3_bucket: str = None, iam_role_name: str = None, create_job_kwargs: dict = None, **kwargs):
        """
        Execute AWS Glue job.
        
        Parameters:
        - job_name: Name of the Glue job
        - job_desc: Job description
        - script_location: S3 location of job script
        - concurrent_run_limit: Maximum concurrent runs
        - script_args: Arguments passed to the job script
        - retry_limit: Number of retries on failure
        - num_of_dpus: Number of DPUs allocated to job
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        - s3_bucket: S3 bucket for job artifacts
        - iam_role_name: IAM role for job execution
        - create_job_kwargs: Additional job creation parameters
        """

class GlueCrawlerOperator(BaseOperator):
    def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', poll_interval: int = 5, **kwargs):
        """
        Run AWS Glue crawler.
        
        Parameters:
        - crawler_name: Name of the Glue crawler
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

Glue Sensors

Monitoring tasks for Glue job and crawler completion.

class GlueJobSensor(BaseSensorOperator):
    def __init__(self, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for Glue job completion.
        
        Parameters:
        - job_name: Name of the Glue job
        - run_id: Job run ID to monitor
        - aws_conn_id: AWS connection ID
        """

class GlueCrawlerSensor(BaseSensorOperator):
    def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for Glue crawler completion.
        
        Parameters:
        - crawler_name: Name of the Glue crawler
        - aws_conn_id: AWS connection ID
        """

Glue Triggers

Asynchronous triggers for Glue operations.

class GlueJobTrigger(BaseTrigger):
    def __init__(self, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
        """
        Asynchronous trigger for Glue job monitoring.
        
        Parameters:
        - job_name: Name of the Glue job
        - run_id: Job run ID to monitor
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

Usage Examples

Basic Glue Job Execution

from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor

dag = DAG('glue_etl_job', start_date=datetime(2023, 1, 1))

# Run Glue ETL job
run_glue_job = GlueJobOperator(
    task_id='run_etl_job',
    job_name='customer-data-processing',
    script_location='s3://my-glue-scripts/customer_etl.py',
    script_args={
        '--input_path': 's3://raw-data/customers/{{ ds }}/',
        '--output_path': 's3://processed-data/customers/{{ ds }}/',
        '--database_name': 'analytics_db',
        '--table_name': 'customer_dim'
    },
    retry_limit=2,
    num_of_dpus=10,
    aws_conn_id='aws_default',
    dag=dag
)

Glue Crawler for Data Discovery

from airflow.providers.amazon.aws.operators.glue import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueCrawlerSensor

# Run crawler to discover new data
discover_data = GlueCrawlerOperator(
    task_id='discover_new_data',
    crawler_name='s3-data-crawler',
    aws_conn_id='aws_default',
    dag=dag
)

# Wait for crawler completion
wait_for_crawler = GlueCrawlerSensor(
    task_id='wait_for_discovery',
    crawler_name='s3-data-crawler',
    timeout=1800,  # 30 minutes
    dag=dag
)

discover_data >> wait_for_crawler

Data Pipeline with Glue DataBrew

from airflow.providers.amazon.aws.operators.glue_databrew import GlueDataBrewStartJobRunOperator

# Data preparation with DataBrew
prepare_data = GlueDataBrewStartJobRunOperator(
    task_id='prepare_customer_data',
    job_name='customer-data-preparation',
    aws_conn_id='aws_default',
    dag=dag
)

# Process prepared data with Glue job
process_data = GlueJobOperator(
    task_id='process_prepared_data',
    job_name='customer-analytics-job',
    script_location='s3://my-glue-scripts/analytics.py',
    script_args={
        '--input_path': 's3://prepared-data/customers/{{ ds }}/',
        '--output_path': 's3://analytics-data/customers/{{ ds }}/'
    },
    dag=dag
)

prepare_data >> process_data

Types

# Glue job states
class GlueJobState:
    STARTING = 'STARTING'
    RUNNING = 'RUNNING'
    STOPPING = 'STOPPING'
    STOPPED = 'STOPPED'
    SUCCEEDED = 'SUCCEEDED'
    FAILED = 'FAILED'
    TIMEOUT = 'TIMEOUT'

# Glue crawler states
class GlueCrawlerState:
    READY = 'READY'
    RUNNING = 'RUNNING'
    STOPPING = 'STOPPING'

# Glue job configuration
class GlueJobConfig:
    name: str
    description: str
    role: str
    command: dict
    default_arguments: dict = None
    connections: dict = None
    max_retries: int = 0
    allocated_capacity: int = None
    timeout: int = None
    max_capacity: float = None
    security_configuration: str = None
    tags: dict = None
    notification_property: dict = None
    glue_version: str = None
    number_of_workers: int = None
    worker_type: str = None
    code_gen_configuration_nodes: dict = None

# Glue crawler configuration
class GlueCrawlerConfig:
    name: str
    role: str
    database_name: str
    targets: dict
    description: str = None
    classifiers: list = None
    table_prefix: str = None
    schema_change_policy: dict = None
    recrawl_policy: dict = None
    lineage_configuration: dict = None
    lake_formation_configuration: dict = None
    configuration: str = None
    crawler_security_configuration: str = None
    tags: dict = None

# Worker types for Glue 2.0+
class GlueWorkerType:
    STANDARD = 'Standard'
    G_1X = 'G.1X'
    G_2X = 'G.2X'
    G_025X = 'G.025X'

# Glue versions
class GlueVersion:
    VERSION_1_0 = '1.0'
    VERSION_2_0 = '2.0'
    VERSION_3_0 = '3.0'
    VERSION_4_0 = '4.0'

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json