CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

lambda-functions.mddocs/

Lambda Function Management

AWS Lambda integration for serverless function execution within Airflow workflows. Provides function invocation, creation, management, and monitoring capabilities for event-driven processing and microservices architectures.

Capabilities

Lambda Hook

Core Lambda client providing low-level AWS Lambda API access for function management and execution.

class LambdaHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Initialize Lambda Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        """

    def invoke_lambda(self, function_name: str, invocation_type: str = 'RequestResponse', payload: str = None, log_type: str = 'None', qualifier: str = '$LATEST', **kwargs) -> dict:
        """
        Invoke a Lambda function.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
        - payload: JSON payload for function input
        - log_type: Log type ('None' or 'Tail')
        - qualifier: Function version or alias
        
        Returns:
        Response from Lambda invocation
        """

    def create_lambda(self, function_name: str, runtime: str, role: str, handler: str, zip_file: bytes = None, code: dict = None, description: str = '', timeout: int = 3, memory_size: int = 128, **kwargs) -> dict:
        """
        Create a Lambda function.
        
        Parameters:
        - function_name: Name of the Lambda function
        - runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')
        - role: ARN of the IAM role for the function
        - handler: Entry point for the function
        - zip_file: Deployment package as bytes
        - code: Code configuration dictionary
        - description: Function description
        - timeout: Function timeout in seconds
        - memory_size: Memory allocation in MB
        
        Returns:
        Function configuration
        """

    def delete_lambda(self, function_name: str, qualifier: str = None) -> None:
        """
        Delete a Lambda function.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - qualifier: Function version or alias
        """

    def update_lambda_code(self, function_name: str, zip_file: bytes = None, s3_bucket: str = None, s3_key: str = None, **kwargs) -> dict:
        """
        Update Lambda function code.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - zip_file: Deployment package as bytes
        - s3_bucket: S3 bucket containing deployment package
        - s3_key: S3 key for deployment package
        
        Returns:
        Updated function configuration
        """

    def update_lambda_config(self, function_name: str, role: str = None, handler: str = None, description: str = None, timeout: int = None, memory_size: int = None, **kwargs) -> dict:
        """
        Update Lambda function configuration.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - role: ARN of the IAM role for the function
        - handler: Entry point for the function
        - description: Function description
        - timeout: Function timeout in seconds
        - memory_size: Memory allocation in MB
        
        Returns:
        Updated function configuration
        """

    def get_function(self, function_name: str, qualifier: str = '$LATEST') -> dict:
        """
        Get Lambda function configuration.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - qualifier: Function version or alias
        
        Returns:
        Function configuration and metadata
        """

    def list_functions(self, function_version: str = 'ALL', marker: str = None, max_items: int = None) -> list:
        """
        List Lambda functions.
        
        Parameters:
        - function_version: Function version filter ('ALL', 'LATEST')
        - marker: Pagination marker
        - max_items: Maximum number of functions to return
        
        Returns:
        List of function configurations
        """

    def list_versions_by_function(self, function_name: str, marker: str = None, max_items: int = None) -> list:
        """
        List versions of a Lambda function.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - marker: Pagination marker
        - max_items: Maximum number of versions to return
        
        Returns:
        List of function versions
        """

    def publish_version(self, function_name: str, code_sha256: str = None, description: str = '') -> dict:
        """
        Publish a new version of a Lambda function.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - code_sha256: SHA256 hash of deployment package
        - description: Version description
        
        Returns:
        Published version configuration
        """

    def create_alias(self, function_name: str, name: str, function_version: str, description: str = '') -> dict:
        """
        Create an alias for a Lambda function version.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - name: Alias name
        - function_version: Function version for the alias
        - description: Alias description
        
        Returns:
        Alias configuration
        """

    def update_alias(self, function_name: str, name: str, function_version: str = None, description: str = None) -> dict:
        """
        Update a Lambda function alias.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - name: Alias name
        - function_version: Function version for the alias
        - description: Alias description
        
        Returns:
        Updated alias configuration
        """

    def delete_alias(self, function_name: str, name: str) -> None:
        """
        Delete a Lambda function alias.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - name: Alias name
        """

    def get_policy(self, function_name: str, qualifier: str = None) -> dict:
        """
        Get Lambda function policy.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - qualifier: Function version or alias
        
        Returns:
        Function policy
        """

    def add_permission(self, function_name: str, statement_id: str, action: str, principal: str, source_arn: str = None, **kwargs) -> dict:
        """
        Add permission to Lambda function policy.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - statement_id: Unique statement identifier
        - action: AWS Lambda action (e.g., 'lambda:InvokeFunction')
        - principal: Principal being granted permission
        - source_arn: ARN of the resource invoking the function
        
        Returns:
        Statement that was added
        """

    def remove_permission(self, function_name: str, statement_id: str, qualifier: str = None) -> None:
        """
        Remove permission from Lambda function policy.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - statement_id: Statement identifier to remove
        - qualifier: Function version or alias
        """

Lambda Operators

Task implementations for Lambda operations that can be used directly in Airflow DAGs.

class LambdaInvokeFunctionOperator(BaseOperator):
    def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', log_type: str = 'None', qualifier: str = '$LATEST', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Invoke a Lambda function.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - payload: JSON payload for function input
        - invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
        - log_type: Log type ('None' or 'Tail')
        - qualifier: Function version or alias
        - aws_conn_id: AWS connection ID
        """

class LambdaCreateFunctionOperator(BaseOperator):
    def __init__(self, function_name: str, runtime: str, role: str, handler: str, code: dict, description: str = '', timeout: int = 3, memory_size: int = 128, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Create a Lambda function.
        
        Parameters:
        - function_name: Name of the Lambda function
        - runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')
        - role: ARN of the IAM role for the function
        - handler: Entry point for the function
        - code: Code configuration dictionary
        - description: Function description
        - timeout: Function timeout in seconds
        - memory_size: Memory allocation in MB
        - aws_conn_id: AWS connection ID
        """

Lambda Sensors

Monitoring tasks that wait for specific Lambda function states or execution conditions.

class LambdaFunctionStateSensor(BaseSensorOperator):
    def __init__(self, function_name: str, qualifier: str = '$LATEST', target_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for Lambda function to reach target state.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - qualifier: Function version or alias
        - target_states: List of target function states
        - aws_conn_id: AWS connection ID
        """

Lambda Triggers

Asynchronous triggers for efficient Lambda function monitoring.

class LambdaInvokeFunctionTrigger(BaseTrigger):
    def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', aws_conn_id: str = 'aws_default', **kwargs):
        """
        Asynchronous trigger for Lambda function invocation.
        
        Parameters:
        - function_name: Name or ARN of the Lambda function
        - payload: JSON payload for function input
        - invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
        - aws_conn_id: AWS connection ID
        """

Usage Examples

Basic Lambda Invocation

from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook

# Initialize hook
lambda_hook = LambdaHook(aws_conn_id='my_aws_conn')

# Invoke function synchronously
response = lambda_hook.invoke_lambda(
    function_name='data-processor',
    payload='{"input_data": "sample", "operation": "transform"}',
    invocation_type='RequestResponse'
)

print(f"Function response: {response['Payload'].read()}")
print(f"Status code: {response['StatusCode']}")

# Invoke function asynchronously
lambda_hook.invoke_lambda(
    function_name='notification-sender',
    payload='{"message": "Processing complete", "recipient": "admin@example.com"}',
    invocation_type='Event'
)

Lambda Function Management

# Create a new function
function_config = lambda_hook.create_lambda(
    function_name='my-data-processor',
    runtime='python3.9',
    role='arn:aws:iam::123456789012:role/lambda-execution-role',
    handler='lambda_function.lambda_handler',
    code={
        'S3Bucket': 'my-lambda-deployments',
        'S3Key': 'functions/data-processor-v1.0.0.zip'
    },
    description='Processes incoming data files',
    timeout=300,
    memory_size=512,
    environment={'Variables': {'ENVIRONMENT': 'production'}}
)

# Update function code
lambda_hook.update_lambda_code(
    function_name='my-data-processor',
    s3_bucket='my-lambda-deployments',
    s3_key='functions/data-processor-v1.1.0.zip'
)

# Publish a new version
version = lambda_hook.publish_version(
    function_name='my-data-processor',
    description='Bug fixes and performance improvements'
)

# Create alias for production
lambda_hook.create_alias(
    function_name='my-data-processor',
    name='PROD',
    function_version=version['Version'],
    description='Production alias'
)

Lambda DAG Operations

from airflow import DAG
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

dag = DAG('lambda_workflow', start_date=datetime(2023, 1, 1))

wait_for_input = S3KeySensor(
    task_id='wait_for_input',
    bucket_name='data-input-bucket',
    bucket_key='incoming/{{ ds }}/data.json',
    timeout=3600,
    dag=dag
)

process_data = LambdaInvokeFunctionOperator(
    task_id='process_data',
    function_name='data-processor:PROD',
    payload='{"bucket": "data-input-bucket", "key": "incoming/{{ ds }}/data.json", "output_bucket": "data-output-bucket"}',
    invocation_type='RequestResponse',
    log_type='Tail',
    aws_conn_id='aws_default',
    dag=dag
)

send_notification = LambdaInvokeFunctionOperator(
    task_id='send_notification',
    function_name='notification-sender',
    payload='{"message": "Data processing completed for {{ ds }}", "channel": "data-team"}',
    invocation_type='Event',
    dag=dag
)

wait_for_input >> process_data >> send_notification

Advanced Lambda Usage

# Parallel function invocations with different configurations
parallel_processors = []
for region in ['us-east-1', 'us-west-2', 'eu-west-1']:
    task = LambdaInvokeFunctionOperator(
        task_id=f'process_data_{region}',
        function_name=f'regional-processor-{region}',
        payload=f'{{"region": "{region}", "date": "{{{{ ds }}}}"}}',
        aws_conn_id=f'aws_{region}',
        dag=dag
    )
    parallel_processors.append(task)

# Fan-out/fan-in pattern
fan_out = LambdaInvokeFunctionOperator(
    task_id='distribute_work',
    function_name='work-distributor',
    payload='{"batch_size": 1000, "total_records": 50000}',
    dag=dag
)

aggregate_results = LambdaInvokeFunctionOperator(
    task_id='aggregate_results',
    function_name='result-aggregator',
    payload='{"batch_count": 50}',
    trigger_rule='all_success',
    dag=dag
)

fan_out >> parallel_processors >> aggregate_results

Types

# Lambda function identifiers
FunctionName = str
FunctionArn = str
QualifiedFunctionName = str  # function_name:qualifier

# Lambda runtime environments
class LambdaRuntime:
    PYTHON_3_8 = 'python3.8'
    PYTHON_3_9 = 'python3.9'
    PYTHON_3_10 = 'python3.10'
    PYTHON_3_11 = 'python3.11'
    NODEJS_18_X = 'nodejs18.x'
    NODEJS_20_X = 'nodejs20.x'
    JAVA_8 = 'java8'
    JAVA_11 = 'java11'
    JAVA_17 = 'java17'
    DOTNET_6 = 'dotnet6'
    GO_1_X = 'go1.x'
    RUBY_2_7 = 'ruby2.7'
    PROVIDED = 'provided'
    PROVIDED_AL2 = 'provided.al2'

# Invocation types
class InvocationType:
    REQUEST_RESPONSE = 'RequestResponse'  # Synchronous
    EVENT = 'Event'  # Asynchronous
    DRY_RUN = 'DryRun'  # Validate parameters and access

# Function states
class FunctionState:
    PENDING = 'Pending'
    ACTIVE = 'Active'
    INACTIVE = 'Inactive'
    FAILED = 'Failed'

# Function configuration
class LambdaFunctionConfig:
    function_name: str
    function_arn: str
    runtime: str
    role: str
    handler: str
    code_size: int
    description: str
    timeout: int
    memory_size: int
    last_modified: str
    code_sha256: str
    version: str
    environment: dict
    dead_letter_config: dict
    kms_key_arn: str
    tracing_config: dict
    layers: list
    state: str
    state_reason: str

# Code configuration
class CodeConfig:
    s3_bucket: str = None
    s3_key: str = None
    s3_object_version: str = None
    zip_file: bytes = None
    image_uri: str = None

# Environment variables
class EnvironmentConfig:
    variables: dict

# Dead letter queue configuration
class DeadLetterConfig:
    target_arn: str

# Tracing configuration
class TracingConfig:
    mode: str  # 'Active' or 'PassThrough'

# VPC configuration
class VpcConfig:
    subnet_ids: list
    security_group_ids: list

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json