0
# Lambda Function Management
1
2
AWS Lambda integration for serverless function execution within Airflow workflows. Provides function invocation, creation, management, and monitoring capabilities for event-driven processing and microservices architectures.
3
4
## Capabilities
5
6
### Lambda Hook
7
8
Core Lambda client providing low-level AWS Lambda API access for function management and execution.
9
10
```python { .api }
11
class LambdaHook(AwsBaseHook):
12
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
13
"""
14
Initialize Lambda Hook.
15
16
Parameters:
17
- aws_conn_id: AWS connection ID
18
"""
19
20
def invoke_lambda(self, function_name: str, invocation_type: str = 'RequestResponse', payload: str = None, log_type: str = 'None', qualifier: str = '$LATEST', **kwargs) -> dict:
21
"""
22
Invoke a Lambda function.
23
24
Parameters:
25
- function_name: Name or ARN of the Lambda function
26
- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
27
- payload: JSON payload for function input
28
- log_type: Log type ('None' or 'Tail')
29
- qualifier: Function version or alias
30
31
Returns:
32
Response from Lambda invocation
33
"""
34
35
def create_lambda(self, function_name: str, runtime: str, role: str, handler: str, zip_file: bytes = None, code: dict = None, description: str = '', timeout: int = 3, memory_size: int = 128, **kwargs) -> dict:
36
"""
37
Create a Lambda function.
38
39
Parameters:
40
- function_name: Name of the Lambda function
41
- runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')
42
- role: ARN of the IAM role for the function
43
- handler: Entry point for the function
44
- zip_file: Deployment package as bytes
45
- code: Code configuration dictionary
46
- description: Function description
47
- timeout: Function timeout in seconds
48
- memory_size: Memory allocation in MB
49
50
Returns:
51
Function configuration
52
"""
53
54
def delete_lambda(self, function_name: str, qualifier: str = None) -> None:
55
"""
56
Delete a Lambda function.
57
58
Parameters:
59
- function_name: Name or ARN of the Lambda function
60
- qualifier: Function version or alias
61
"""
62
63
def update_lambda_code(self, function_name: str, zip_file: bytes = None, s3_bucket: str = None, s3_key: str = None, **kwargs) -> dict:
64
"""
65
Update Lambda function code.
66
67
Parameters:
68
- function_name: Name or ARN of the Lambda function
69
- zip_file: Deployment package as bytes
70
- s3_bucket: S3 bucket containing deployment package
71
- s3_key: S3 key for deployment package
72
73
Returns:
74
Updated function configuration
75
"""
76
77
def update_lambda_config(self, function_name: str, role: str = None, handler: str = None, description: str = None, timeout: int = None, memory_size: int = None, **kwargs) -> dict:
78
"""
79
Update Lambda function configuration.
80
81
Parameters:
82
- function_name: Name or ARN of the Lambda function
83
- role: ARN of the IAM role for the function
84
- handler: Entry point for the function
85
- description: Function description
86
- timeout: Function timeout in seconds
87
- memory_size: Memory allocation in MB
88
89
Returns:
90
Updated function configuration
91
"""
92
93
def get_function(self, function_name: str, qualifier: str = '$LATEST') -> dict:
94
"""
95
Get Lambda function configuration.
96
97
Parameters:
98
- function_name: Name or ARN of the Lambda function
99
- qualifier: Function version or alias
100
101
Returns:
102
Function configuration and metadata
103
"""
104
105
def list_functions(self, function_version: str = 'ALL', marker: str = None, max_items: int = None) -> list:
106
"""
107
List Lambda functions.
108
109
Parameters:
110
- function_version: Function version filter ('ALL', 'LATEST')
111
- marker: Pagination marker
112
- max_items: Maximum number of functions to return
113
114
Returns:
115
List of function configurations
116
"""
117
118
def list_versions_by_function(self, function_name: str, marker: str = None, max_items: int = None) -> list:
119
"""
120
List versions of a Lambda function.
121
122
Parameters:
123
- function_name: Name or ARN of the Lambda function
124
- marker: Pagination marker
125
- max_items: Maximum number of versions to return
126
127
Returns:
128
List of function versions
129
"""
130
131
def publish_version(self, function_name: str, code_sha256: str = None, description: str = '') -> dict:
132
"""
133
Publish a new version of a Lambda function.
134
135
Parameters:
136
- function_name: Name or ARN of the Lambda function
137
- code_sha256: SHA256 hash of deployment package
138
- description: Version description
139
140
Returns:
141
Published version configuration
142
"""
143
144
def create_alias(self, function_name: str, name: str, function_version: str, description: str = '') -> dict:
145
"""
146
Create an alias for a Lambda function version.
147
148
Parameters:
149
- function_name: Name or ARN of the Lambda function
150
- name: Alias name
151
- function_version: Function version for the alias
152
- description: Alias description
153
154
Returns:
155
Alias configuration
156
"""
157
158
def update_alias(self, function_name: str, name: str, function_version: str = None, description: str = None) -> dict:
159
"""
160
Update a Lambda function alias.
161
162
Parameters:
163
- function_name: Name or ARN of the Lambda function
164
- name: Alias name
165
- function_version: Function version for the alias
166
- description: Alias description
167
168
Returns:
169
Updated alias configuration
170
"""
171
172
def delete_alias(self, function_name: str, name: str) -> None:
173
"""
174
Delete a Lambda function alias.
175
176
Parameters:
177
- function_name: Name or ARN of the Lambda function
178
- name: Alias name
179
"""
180
181
def get_policy(self, function_name: str, qualifier: str = None) -> dict:
182
"""
183
Get Lambda function policy.
184
185
Parameters:
186
- function_name: Name or ARN of the Lambda function
187
- qualifier: Function version or alias
188
189
Returns:
190
Function policy
191
"""
192
193
def add_permission(self, function_name: str, statement_id: str, action: str, principal: str, source_arn: str = None, **kwargs) -> dict:
194
"""
195
Add permission to Lambda function policy.
196
197
Parameters:
198
- function_name: Name or ARN of the Lambda function
199
- statement_id: Unique statement identifier
200
- action: AWS Lambda action (e.g., 'lambda:InvokeFunction')
201
- principal: Principal being granted permission
202
- source_arn: ARN of the resource invoking the function
203
204
Returns:
205
Statement that was added
206
"""
207
208
def remove_permission(self, function_name: str, statement_id: str, qualifier: str = None) -> None:
209
"""
210
Remove permission from Lambda function policy.
211
212
Parameters:
213
- function_name: Name or ARN of the Lambda function
214
- statement_id: Statement identifier to remove
215
- qualifier: Function version or alias
216
"""
217
```
218
219
### Lambda Operators
220
221
Task implementations for Lambda operations that can be used directly in Airflow DAGs.
222
223
```python { .api }
224
class LambdaInvokeFunctionOperator(BaseOperator):
225
def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', log_type: str = 'None', qualifier: str = '$LATEST', aws_conn_id: str = 'aws_default', **kwargs):
226
"""
227
Invoke a Lambda function.
228
229
Parameters:
230
- function_name: Name or ARN of the Lambda function
231
- payload: JSON payload for function input
232
- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
233
- log_type: Log type ('None' or 'Tail')
234
- qualifier: Function version or alias
235
- aws_conn_id: AWS connection ID
236
"""
237
238
class LambdaCreateFunctionOperator(BaseOperator):
239
def __init__(self, function_name: str, runtime: str, role: str, handler: str, code: dict, description: str = '', timeout: int = 3, memory_size: int = 128, aws_conn_id: str = 'aws_default', **kwargs):
240
"""
241
Create a Lambda function.
242
243
Parameters:
244
- function_name: Name of the Lambda function
245
- runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')
246
- role: ARN of the IAM role for the function
247
- handler: Entry point for the function
248
- code: Code configuration dictionary
249
- description: Function description
250
- timeout: Function timeout in seconds
251
- memory_size: Memory allocation in MB
252
- aws_conn_id: AWS connection ID
253
"""
254
```
255
256
### Lambda Sensors
257
258
Monitoring tasks that wait for specific Lambda function states or execution conditions.
259
260
```python { .api }
261
class LambdaFunctionStateSensor(BaseSensorOperator):
262
def __init__(self, function_name: str, qualifier: str = '$LATEST', target_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
263
"""
264
Wait for Lambda function to reach target state.
265
266
Parameters:
267
- function_name: Name or ARN of the Lambda function
268
- qualifier: Function version or alias
269
- target_states: List of target function states
270
- aws_conn_id: AWS connection ID
271
"""
272
```
273
274
### Lambda Triggers
275
276
Asynchronous triggers for efficient Lambda function monitoring.
277
278
```python { .api }
279
class LambdaInvokeFunctionTrigger(BaseTrigger):
280
def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', aws_conn_id: str = 'aws_default', **kwargs):
281
"""
282
Asynchronous trigger for Lambda function invocation.
283
284
Parameters:
285
- function_name: Name or ARN of the Lambda function
286
- payload: JSON payload for function input
287
- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')
288
- aws_conn_id: AWS connection ID
289
"""
290
```
291
292
## Usage Examples
293
294
### Basic Lambda Invocation
295
296
```python
297
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
298
299
# Initialize hook
300
lambda_hook = LambdaHook(aws_conn_id='my_aws_conn')
301
302
# Invoke function synchronously
303
response = lambda_hook.invoke_lambda(
304
function_name='data-processor',
305
payload='{"input_data": "sample", "operation": "transform"}',
306
invocation_type='RequestResponse'
307
)
308
309
print(f"Function response: {response['Payload'].read()}")
310
print(f"Status code: {response['StatusCode']}")
311
312
# Invoke function asynchronously
313
lambda_hook.invoke_lambda(
314
function_name='notification-sender',
315
payload='{"message": "Processing complete", "recipient": "admin@example.com"}',
316
invocation_type='Event'
317
)
318
```
319
320
### Lambda Function Management
321
322
```python
323
# Create a new function
324
function_config = lambda_hook.create_lambda(
325
function_name='my-data-processor',
326
runtime='python3.9',
327
role='arn:aws:iam::123456789012:role/lambda-execution-role',
328
handler='lambda_function.lambda_handler',
329
code={
330
'S3Bucket': 'my-lambda-deployments',
331
'S3Key': 'functions/data-processor-v1.0.0.zip'
332
},
333
description='Processes incoming data files',
334
timeout=300,
335
memory_size=512,
336
environment={'Variables': {'ENVIRONMENT': 'production'}}
337
)
338
339
# Update function code
340
lambda_hook.update_lambda_code(
341
function_name='my-data-processor',
342
s3_bucket='my-lambda-deployments',
343
s3_key='functions/data-processor-v1.1.0.zip'
344
)
345
346
# Publish a new version
347
version = lambda_hook.publish_version(
348
function_name='my-data-processor',
349
description='Bug fixes and performance improvements'
350
)
351
352
# Create alias for production
353
lambda_hook.create_alias(
354
function_name='my-data-processor',
355
name='PROD',
356
function_version=version['Version'],
357
description='Production alias'
358
)
359
```
360
361
### Lambda DAG Operations
362
363
```python
364
from airflow import DAG
365
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
366
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
367
368
dag = DAG('lambda_workflow', start_date=datetime(2023, 1, 1))
369
370
wait_for_input = S3KeySensor(
371
task_id='wait_for_input',
372
bucket_name='data-input-bucket',
373
bucket_key='incoming/{{ ds }}/data.json',
374
timeout=3600,
375
dag=dag
376
)
377
378
process_data = LambdaInvokeFunctionOperator(
379
task_id='process_data',
380
function_name='data-processor:PROD',
381
payload='{"bucket": "data-input-bucket", "key": "incoming/{{ ds }}/data.json", "output_bucket": "data-output-bucket"}',
382
invocation_type='RequestResponse',
383
log_type='Tail',
384
aws_conn_id='aws_default',
385
dag=dag
386
)
387
388
send_notification = LambdaInvokeFunctionOperator(
389
task_id='send_notification',
390
function_name='notification-sender',
391
payload='{"message": "Data processing completed for {{ ds }}", "channel": "data-team"}',
392
invocation_type='Event',
393
dag=dag
394
)
395
396
wait_for_input >> process_data >> send_notification
397
```
398
399
### Advanced Lambda Usage
400
401
```python
402
# Parallel function invocations with different configurations
403
parallel_processors = []
404
for region in ['us-east-1', 'us-west-2', 'eu-west-1']:
405
task = LambdaInvokeFunctionOperator(
406
task_id=f'process_data_{region}',
407
function_name=f'regional-processor-{region}',
408
payload=f'{{"region": "{region}", "date": "{{{{ ds }}}}"}}',
409
aws_conn_id=f'aws_{region}',
410
dag=dag
411
)
412
parallel_processors.append(task)
413
414
# Fan-out/fan-in pattern
415
fan_out = LambdaInvokeFunctionOperator(
416
task_id='distribute_work',
417
function_name='work-distributor',
418
payload='{"batch_size": 1000, "total_records": 50000}',
419
dag=dag
420
)
421
422
aggregate_results = LambdaInvokeFunctionOperator(
423
task_id='aggregate_results',
424
function_name='result-aggregator',
425
payload='{"batch_count": 50}',
426
trigger_rule='all_success',
427
dag=dag
428
)
429
430
fan_out >> parallel_processors >> aggregate_results
431
```
432
433
## Types
434
435
```python { .api }
436
# Lambda function identifiers
437
FunctionName = str
438
FunctionArn = str
439
QualifiedFunctionName = str # function_name:qualifier
440
441
# Lambda runtime environments
442
class LambdaRuntime:
443
PYTHON_3_8 = 'python3.8'
444
PYTHON_3_9 = 'python3.9'
445
PYTHON_3_10 = 'python3.10'
446
PYTHON_3_11 = 'python3.11'
447
NODEJS_18_X = 'nodejs18.x'
448
NODEJS_20_X = 'nodejs20.x'
449
JAVA_8 = 'java8'
450
JAVA_11 = 'java11'
451
JAVA_17 = 'java17'
452
DOTNET_6 = 'dotnet6'
453
GO_1_X = 'go1.x'
454
RUBY_2_7 = 'ruby2.7'
455
PROVIDED = 'provided'
456
PROVIDED_AL2 = 'provided.al2'
457
458
# Invocation types
459
class InvocationType:
460
REQUEST_RESPONSE = 'RequestResponse' # Synchronous
461
EVENT = 'Event' # Asynchronous
462
DRY_RUN = 'DryRun' # Validate parameters and access
463
464
# Function states
465
class FunctionState:
466
PENDING = 'Pending'
467
ACTIVE = 'Active'
468
INACTIVE = 'Inactive'
469
FAILED = 'Failed'
470
471
# Function configuration
472
class LambdaFunctionConfig:
473
function_name: str
474
function_arn: str
475
runtime: str
476
role: str
477
handler: str
478
code_size: int
479
description: str
480
timeout: int
481
memory_size: int
482
last_modified: str
483
code_sha256: str
484
version: str
485
environment: dict
486
dead_letter_config: dict
487
kms_key_arn: str
488
tracing_config: dict
489
layers: list
490
state: str
491
state_reason: str
492
493
# Code configuration
494
class CodeConfig:
495
s3_bucket: str = None
496
s3_key: str = None
497
s3_object_version: str = None
498
zip_file: bytes = None
499
image_uri: str = None
500
501
# Environment variables
502
class EnvironmentConfig:
503
variables: dict
504
505
# Dead letter queue configuration
506
class DeadLetterConfig:
507
target_arn: str
508
509
# Tracing configuration
510
class TracingConfig:
511
mode: str # 'Active' or 'PassThrough'
512
513
# VPC configuration
514
class VpcConfig:
515
subnet_ids: list
516
security_group_ids: list
517
```