Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
AWS Glue integration for serverless ETL (Extract, Transform, Load) operations and data catalog management. Provides job execution, crawler management, and data preparation capabilities for building scalable data processing workflows.
Core Glue client providing job execution and monitoring functionality.
class GlueJobHook(AwsBaseHook):
def __init__(self, job_name: str = None, desc: str = None, concurrent_run_limit: int = 1, script_location: str = None, retry_limit: int = 0, num_of_dpus: int = None, aws_conn_id: str = 'aws_default', region_name: str = None, s3_bucket: str = None, iam_role_name: str = None, create_job_kwargs: dict = None, **kwargs):
"""
Initialize Glue Job Hook.
Parameters:
- job_name: Name of the Glue job
- desc: Job description
- concurrent_run_limit: Maximum concurrent runs
- script_location: S3 location of job script
- retry_limit: Number of retries on failure
- num_of_dpus: Number of DPUs allocated to job
- aws_conn_id: AWS connection ID
- region_name: AWS region name
- s3_bucket: S3 bucket for job artifacts
- iam_role_name: IAM role for job execution
- create_job_kwargs: Additional job creation parameters
"""
def list_jobs(self) -> list:
"""
List all Glue jobs.
Returns:
List of job names
"""
def get_job_state(self, job_name: str, run_id: str) -> str:
"""
Get state of specific Glue job run.
Parameters:
- job_name: Name of the Glue job
- run_id: Job run ID
Returns:
Current job run state
"""
def initialize_job(self, job_name: str, arguments: dict = None) -> dict:
"""
Initialize and start Glue job.
Parameters:
- job_name: Name of the Glue job
- arguments: Job arguments dictionary
Returns:
Job run information
"""
def get_or_create_glue_job(self) -> str:
"""
Get existing Glue job or create new one.
Returns:
Job name
"""
def get_job_run(self, run_id: str, job_name: str) -> dict:
"""
Get details of specific job run.
Parameters:
- run_id: Job run ID
- job_name: Name of the Glue job
Returns:
Job run details
"""Hook for managing Glue crawlers that discover and catalog data.
class GlueCrawlerHook(AwsBaseHook):
def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Initialize Glue Crawler Hook.
Parameters:
- crawler_name: Name of the Glue crawler
- aws_conn_id: AWS connection ID
"""
def get_crawler(self, name: str) -> dict:
"""
Get crawler configuration.
Parameters:
- name: Crawler name
Returns:
Crawler configuration and state
"""
def start_crawler(self, name: str) -> dict:
"""
Start Glue crawler.
Parameters:
- name: Crawler name
Returns:
Start crawler response
"""
def stop_crawler(self, name: str) -> dict:
"""
Stop running crawler.
Parameters:
- name: Crawler name
Returns:
Stop crawler response
"""
def get_crawler_metrics(self, crawler_names: list = None) -> dict:
"""
Get crawler metrics.
Parameters:
- crawler_names: List of crawler names
Returns:
Crawler metrics
"""Task implementations for Glue job and crawler operations.
class GlueJobOperator(BaseOperator):
def __init__(self, job_name: str = 'aws_glue_default_job', job_desc: str = 'AWS Glue Job with Airflow', script_location: str = None, concurrent_run_limit: int = 1, script_args: dict = None, retry_limit: int = 0, num_of_dpus: int = 10, aws_conn_id: str = 'aws_default', region_name: str = None, s3_bucket: str = None, iam_role_name: str = None, create_job_kwargs: dict = None, **kwargs):
"""
Execute AWS Glue job.
Parameters:
- job_name: Name of the Glue job
- job_desc: Job description
- script_location: S3 location of job script
- concurrent_run_limit: Maximum concurrent runs
- script_args: Arguments passed to the job script
- retry_limit: Number of retries on failure
- num_of_dpus: Number of DPUs allocated to job
- aws_conn_id: AWS connection ID
- region_name: AWS region name
- s3_bucket: S3 bucket for job artifacts
- iam_role_name: IAM role for job execution
- create_job_kwargs: Additional job creation parameters
"""
class GlueCrawlerOperator(BaseOperator):
def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', poll_interval: int = 5, **kwargs):
"""
Run AWS Glue crawler.
Parameters:
- crawler_name: Name of the Glue crawler
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""Monitoring tasks for Glue job and crawler completion.
class GlueJobSensor(BaseSensorOperator):
def __init__(self, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for Glue job completion.
Parameters:
- job_name: Name of the Glue job
- run_id: Job run ID to monitor
- aws_conn_id: AWS connection ID
"""
class GlueCrawlerSensor(BaseSensorOperator):
def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for Glue crawler completion.
Parameters:
- crawler_name: Name of the Glue crawler
- aws_conn_id: AWS connection ID
"""Asynchronous triggers for Glue operations.
class GlueJobTrigger(BaseTrigger):
def __init__(self, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
"""
Asynchronous trigger for Glue job monitoring.
Parameters:
- job_name: Name of the Glue job
- run_id: Job run ID to monitor
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
dag = DAG('glue_etl_job', start_date=datetime(2023, 1, 1))
# Run Glue ETL job
run_glue_job = GlueJobOperator(
task_id='run_etl_job',
job_name='customer-data-processing',
script_location='s3://my-glue-scripts/customer_etl.py',
script_args={
'--input_path': 's3://raw-data/customers/{{ ds }}/',
'--output_path': 's3://processed-data/customers/{{ ds }}/',
'--database_name': 'analytics_db',
'--table_name': 'customer_dim'
},
retry_limit=2,
num_of_dpus=10,
aws_conn_id='aws_default',
dag=dag
)from airflow.providers.amazon.aws.operators.glue import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueCrawlerSensor
# Run crawler to discover new data
discover_data = GlueCrawlerOperator(
task_id='discover_new_data',
crawler_name='s3-data-crawler',
aws_conn_id='aws_default',
dag=dag
)
# Wait for crawler completion
wait_for_crawler = GlueCrawlerSensor(
task_id='wait_for_discovery',
crawler_name='s3-data-crawler',
timeout=1800, # 30 minutes
dag=dag
)
discover_data >> wait_for_crawlerfrom airflow.providers.amazon.aws.operators.glue_databrew import GlueDataBrewStartJobRunOperator
# Data preparation with DataBrew
prepare_data = GlueDataBrewStartJobRunOperator(
task_id='prepare_customer_data',
job_name='customer-data-preparation',
aws_conn_id='aws_default',
dag=dag
)
# Process prepared data with Glue job
process_data = GlueJobOperator(
task_id='process_prepared_data',
job_name='customer-analytics-job',
script_location='s3://my-glue-scripts/analytics.py',
script_args={
'--input_path': 's3://prepared-data/customers/{{ ds }}/',
'--output_path': 's3://analytics-data/customers/{{ ds }}/'
},
dag=dag
)
prepare_data >> process_data# Glue job states
class GlueJobState:
STARTING = 'STARTING'
RUNNING = 'RUNNING'
STOPPING = 'STOPPING'
STOPPED = 'STOPPED'
SUCCEEDED = 'SUCCEEDED'
FAILED = 'FAILED'
TIMEOUT = 'TIMEOUT'
# Glue crawler states
class GlueCrawlerState:
READY = 'READY'
RUNNING = 'RUNNING'
STOPPING = 'STOPPING'
# Glue job configuration
class GlueJobConfig:
name: str
description: str
role: str
command: dict
default_arguments: dict = None
connections: dict = None
max_retries: int = 0
allocated_capacity: int = None
timeout: int = None
max_capacity: float = None
security_configuration: str = None
tags: dict = None
notification_property: dict = None
glue_version: str = None
number_of_workers: int = None
worker_type: str = None
code_gen_configuration_nodes: dict = None
# Glue crawler configuration
class GlueCrawlerConfig:
name: str
role: str
database_name: str
targets: dict
description: str = None
classifiers: list = None
table_prefix: str = None
schema_change_policy: dict = None
recrawl_policy: dict = None
lineage_configuration: dict = None
lake_formation_configuration: dict = None
configuration: str = None
crawler_security_configuration: str = None
tags: dict = None
# Worker types for Glue 2.0+
class GlueWorkerType:
STANDARD = 'Standard'
G_1X = 'G.1X'
G_2X = 'G.2X'
G_025X = 'G.025X'
# Glue versions
class GlueVersion:
VERSION_1_0 = '1.0'
VERSION_2_0 = '2.0'
VERSION_3_0 = '3.0'
VERSION_4_0 = '4.0'Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon