Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
AWS Lambda integration for serverless function execution within Airflow workflows. Provides function invocation, creation, management, and monitoring capabilities for event-driven processing and microservices architectures.
Core Lambda client providing low-level AWS Lambda API access for function management and execution.
class LambdaHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
"""
Initialize Lambda Hook.
Parameters:
- aws_conn_id: AWS connection ID
"""
def invoke_lambda(self, function_name: str, invocation_type: str = 'RequestResponse', payload: str = None, log_type: str = 'None', qualifier: str = '$LATEST', **kwargs) -> dict:
"""
Invoke a Lambda function.
Parameters:
- function_name: Name or ARN of the Lambda function
- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
- payload: JSON payload for function input
- log_type: Log type ('None' or 'Tail')
- qualifier: Function version or alias
Returns:
Response from Lambda invocation
"""
def create_lambda(self, function_name: str, runtime: str, role: str, handler: str, zip_file: bytes = None, code: dict = None, description: str = '', timeout: int = 3, memory_size: int = 128, **kwargs) -> dict:
"""
Create a Lambda function.
Parameters:
- function_name: Name of the Lambda function
- runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')
- role: ARN of the IAM role for the function
- handler: Entry point for the function
- zip_file: Deployment package as bytes
- code: Code configuration dictionary
- description: Function description
- timeout: Function timeout in seconds
- memory_size: Memory allocation in MB
Returns:
Function configuration
"""
def delete_lambda(self, function_name: str, qualifier: str = None) -> None:
"""
Delete a Lambda function.
Parameters:
- function_name: Name or ARN of the Lambda function
- qualifier: Function version or alias
"""
def update_lambda_code(self, function_name: str, zip_file: bytes = None, s3_bucket: str = None, s3_key: str = None, **kwargs) -> dict:
"""
Update Lambda function code.
Parameters:
- function_name: Name or ARN of the Lambda function
- zip_file: Deployment package as bytes
- s3_bucket: S3 bucket containing deployment package
- s3_key: S3 key for deployment package
Returns:
Updated function configuration
"""
def update_lambda_config(self, function_name: str, role: str = None, handler: str = None, description: str = None, timeout: int = None, memory_size: int = None, **kwargs) -> dict:
"""
Update Lambda function configuration.
Parameters:
- function_name: Name or ARN of the Lambda function
- role: ARN of the IAM role for the function
- handler: Entry point for the function
- description: Function description
- timeout: Function timeout in seconds
- memory_size: Memory allocation in MB
Returns:
Updated function configuration
"""
def get_function(self, function_name: str, qualifier: str = '$LATEST') -> dict:
"""
Get Lambda function configuration.
Parameters:
- function_name: Name or ARN of the Lambda function
- qualifier: Function version or alias
Returns:
Function configuration and metadata
"""
def list_functions(self, function_version: str = 'ALL', marker: str = None, max_items: int = None) -> list:
"""
List Lambda functions.
Parameters:
- function_version: Function version filter ('ALL', 'LATEST')
- marker: Pagination marker
- max_items: Maximum number of functions to return
Returns:
List of function configurations
"""
def list_versions_by_function(self, function_name: str, marker: str = None, max_items: int = None) -> list:
"""
List versions of a Lambda function.
Parameters:
- function_name: Name or ARN of the Lambda function
- marker: Pagination marker
- max_items: Maximum number of versions to return
Returns:
List of function versions
"""
def publish_version(self, function_name: str, code_sha256: str = None, description: str = '') -> dict:
"""
Publish a new version of a Lambda function.
Parameters:
- function_name: Name or ARN of the Lambda function
- code_sha256: SHA256 hash of deployment package
- description: Version description
Returns:
Published version configuration
"""
def create_alias(self, function_name: str, name: str, function_version: str, description: str = '') -> dict:
"""
Create an alias for a Lambda function version.
Parameters:
- function_name: Name or ARN of the Lambda function
- name: Alias name
- function_version: Function version for the alias
- description: Alias description
Returns:
Alias configuration
"""
def update_alias(self, function_name: str, name: str, function_version: str = None, description: str = None) -> dict:
"""
Update a Lambda function alias.
Parameters:
- function_name: Name or ARN of the Lambda function
- name: Alias name
- function_version: Function version for the alias
- description: Alias description
Returns:
Updated alias configuration
"""
def delete_alias(self, function_name: str, name: str) -> None:
"""
Delete a Lambda function alias.
Parameters:
- function_name: Name or ARN of the Lambda function
- name: Alias name
"""
def get_policy(self, function_name: str, qualifier: str = None) -> dict:
"""
Get Lambda function policy.
Parameters:
- function_name: Name or ARN of the Lambda function
- qualifier: Function version or alias
Returns:
Function policy
"""
def add_permission(self, function_name: str, statement_id: str, action: str, principal: str, source_arn: str = None, **kwargs) -> dict:
"""
Add permission to Lambda function policy.
Parameters:
- function_name: Name or ARN of the Lambda function
- statement_id: Unique statement identifier
- action: AWS Lambda action (e.g., 'lambda:InvokeFunction')
- principal: Principal being granted permission
- source_arn: ARN of the resource invoking the function
Returns:
Statement that was added
"""
def remove_permission(self, function_name: str, statement_id: str, qualifier: str = None) -> None:
"""
Remove permission from Lambda function policy.
Parameters:
- function_name: Name or ARN of the Lambda function
- statement_id: Statement identifier to remove
- qualifier: Function version or alias
"""Task implementations for Lambda operations that can be used directly in Airflow DAGs.
class LambdaInvokeFunctionOperator(BaseOperator):
def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', log_type: str = 'None', qualifier: str = '$LATEST', aws_conn_id: str = 'aws_default', **kwargs):
"""
Invoke a Lambda function.
Parameters:
- function_name: Name or ARN of the Lambda function
- payload: JSON payload for function input
- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
- log_type: Log type ('None' or 'Tail')
- qualifier: Function version or alias
- aws_conn_id: AWS connection ID
"""
class LambdaCreateFunctionOperator(BaseOperator):
def __init__(self, function_name: str, runtime: str, role: str, handler: str, code: dict, description: str = '', timeout: int = 3, memory_size: int = 128, aws_conn_id: str = 'aws_default', **kwargs):
"""
Create a Lambda function.
Parameters:
- function_name: Name of the Lambda function
- runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')
- role: ARN of the IAM role for the function
- handler: Entry point for the function
- code: Code configuration dictionary
- description: Function description
- timeout: Function timeout in seconds
- memory_size: Memory allocation in MB
- aws_conn_id: AWS connection ID
"""Monitoring tasks that wait for specific Lambda function states or execution conditions.
class LambdaFunctionStateSensor(BaseSensorOperator):
def __init__(self, function_name: str, qualifier: str = '$LATEST', target_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for Lambda function to reach target state.
Parameters:
- function_name: Name or ARN of the Lambda function
- qualifier: Function version or alias
- target_states: List of target function states
- aws_conn_id: AWS connection ID
"""Asynchronous triggers for efficient Lambda function monitoring.
class LambdaInvokeFunctionTrigger(BaseTrigger):
def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', aws_conn_id: str = 'aws_default', **kwargs):
"""
Asynchronous trigger for Lambda function invocation.
Parameters:
- function_name: Name or ARN of the Lambda function
- payload: JSON payload for function input
- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
- aws_conn_id: AWS connection ID
"""from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
# Initialize hook
lambda_hook = LambdaHook(aws_conn_id='my_aws_conn')
# Invoke function synchronously
response = lambda_hook.invoke_lambda(
function_name='data-processor',
payload='{"input_data": "sample", "operation": "transform"}',
invocation_type='RequestResponse'
)
print(f"Function response: {response['Payload'].read()}")
print(f"Status code: {response['StatusCode']}")
# Invoke function asynchronously
lambda_hook.invoke_lambda(
function_name='notification-sender',
payload='{"message": "Processing complete", "recipient": "admin@example.com"}',
invocation_type='Event'
)# Create a new function
function_config = lambda_hook.create_lambda(
function_name='my-data-processor',
runtime='python3.9',
role='arn:aws:iam::123456789012:role/lambda-execution-role',
handler='lambda_function.lambda_handler',
code={
'S3Bucket': 'my-lambda-deployments',
'S3Key': 'functions/data-processor-v1.0.0.zip'
},
description='Processes incoming data files',
timeout=300,
memory_size=512,
environment={'Variables': {'ENVIRONMENT': 'production'}}
)
# Update function code
lambda_hook.update_lambda_code(
function_name='my-data-processor',
s3_bucket='my-lambda-deployments',
s3_key='functions/data-processor-v1.1.0.zip'
)
# Publish a new version
version = lambda_hook.publish_version(
function_name='my-data-processor',
description='Bug fixes and performance improvements'
)
# Create alias for production
lambda_hook.create_alias(
function_name='my-data-processor',
name='PROD',
function_version=version['Version'],
description='Production alias'
)from airflow import DAG
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
dag = DAG('lambda_workflow', start_date=datetime(2023, 1, 1))
wait_for_input = S3KeySensor(
task_id='wait_for_input',
bucket_name='data-input-bucket',
bucket_key='incoming/{{ ds }}/data.json',
timeout=3600,
dag=dag
)
process_data = LambdaInvokeFunctionOperator(
task_id='process_data',
function_name='data-processor:PROD',
payload='{"bucket": "data-input-bucket", "key": "incoming/{{ ds }}/data.json", "output_bucket": "data-output-bucket"}',
invocation_type='RequestResponse',
log_type='Tail',
aws_conn_id='aws_default',
dag=dag
)
send_notification = LambdaInvokeFunctionOperator(
task_id='send_notification',
function_name='notification-sender',
payload='{"message": "Data processing completed for {{ ds }}", "channel": "data-team"}',
invocation_type='Event',
dag=dag
)
wait_for_input >> process_data >> send_notification# Parallel function invocations with different configurations
parallel_processors = []
for region in ['us-east-1', 'us-west-2', 'eu-west-1']:
task = LambdaInvokeFunctionOperator(
task_id=f'process_data_{region}',
function_name=f'regional-processor-{region}',
payload=f'{{"region": "{region}", "date": "{{{{ ds }}}}"}}',
aws_conn_id=f'aws_{region}',
dag=dag
)
parallel_processors.append(task)
# Fan-out/fan-in pattern
fan_out = LambdaInvokeFunctionOperator(
task_id='distribute_work',
function_name='work-distributor',
payload='{"batch_size": 1000, "total_records": 50000}',
dag=dag
)
aggregate_results = LambdaInvokeFunctionOperator(
task_id='aggregate_results',
function_name='result-aggregator',
payload='{"batch_count": 50}',
trigger_rule='all_success',
dag=dag
)
fan_out >> parallel_processors >> aggregate_results# Lambda function identifiers
FunctionName = str
FunctionArn = str
QualifiedFunctionName = str # function_name:qualifier
# Lambda runtime environments
class LambdaRuntime:
PYTHON_3_8 = 'python3.8'
PYTHON_3_9 = 'python3.9'
PYTHON_3_10 = 'python3.10'
PYTHON_3_11 = 'python3.11'
NODEJS_18_X = 'nodejs18.x'
NODEJS_20_X = 'nodejs20.x'
JAVA_8 = 'java8'
JAVA_11 = 'java11'
JAVA_17 = 'java17'
DOTNET_6 = 'dotnet6'
GO_1_X = 'go1.x'
RUBY_2_7 = 'ruby2.7'
PROVIDED = 'provided'
PROVIDED_AL2 = 'provided.al2'
# Invocation types
class InvocationType:
REQUEST_RESPONSE = 'RequestResponse' # Synchronous
EVENT = 'Event' # Asynchronous
DRY_RUN = 'DryRun' # Validate parameters and access
# Function states
class FunctionState:
PENDING = 'Pending'
ACTIVE = 'Active'
INACTIVE = 'Inactive'
FAILED = 'Failed'
# Function configuration
class LambdaFunctionConfig:
function_name: str
function_arn: str
runtime: str
role: str
handler: str
code_size: int
description: str
timeout: int
memory_size: int
last_modified: str
code_sha256: str
version: str
environment: dict
dead_letter_config: dict
kms_key_arn: str
tracing_config: dict
layers: list
state: str
state_reason: str
# Code configuration
class CodeConfig:
s3_bucket: str = None
s3_key: str = None
s3_object_version: str = None
zip_file: bytes = None
image_uri: str = None
# Environment variables
class EnvironmentConfig:
variables: dict
# Dead letter queue configuration
class DeadLetterConfig:
target_arn: str
# Tracing configuration
class TracingConfig:
mode: str # 'Active' or 'PassThrough'
# VPC configuration
class VpcConfig:
subnet_ids: list
security_group_ids: listInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon