or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

lambda-functions.mddocs/

0

# Lambda Function Management

1

2

AWS Lambda integration for serverless function execution within Airflow workflows. Provides function invocation, creation, management, and monitoring capabilities for event-driven processing and microservices architectures.

3

4

## Capabilities

5

6

### Lambda Hook

7

8

Core Lambda client providing low-level AWS Lambda API access for function management and execution.

9

10

```python { .api }

11

class LambdaHook(AwsBaseHook):

12

def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):

13

"""

14

Initialize Lambda Hook.

15

16

Parameters:

17

- aws_conn_id: AWS connection ID

18

"""

19

20

def invoke_lambda(self, function_name: str, invocation_type: str = 'RequestResponse', payload: str = None, log_type: str = 'None', qualifier: str = '$LATEST', **kwargs) -> dict:

21

"""

22

Invoke a Lambda function.

23

24

Parameters:

25

- function_name: Name or ARN of the Lambda function

26

- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')

27

- payload: JSON payload for function input

28

- log_type: Log type ('None' or 'Tail')

29

- qualifier: Function version or alias

30

31

Returns:

32

Response from Lambda invocation

33

"""

34

35

def create_lambda(self, function_name: str, runtime: str, role: str, handler: str, zip_file: bytes = None, code: dict = None, description: str = '', timeout: int = 3, memory_size: int = 128, **kwargs) -> dict:

36

"""

37

Create a Lambda function.

38

39

Parameters:

40

- function_name: Name of the Lambda function

41

- runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')

42

- role: ARN of the IAM role for the function

43

- handler: Entry point for the function

44

- zip_file: Deployment package as bytes

45

- code: Code configuration dictionary

46

- description: Function description

47

- timeout: Function timeout in seconds

48

- memory_size: Memory allocation in MB

49

50

Returns:

51

Function configuration

52

"""

53

54

def delete_lambda(self, function_name: str, qualifier: str = None) -> None:

55

"""

56

Delete a Lambda function.

57

58

Parameters:

59

- function_name: Name or ARN of the Lambda function

60

- qualifier: Function version or alias

61

"""

62

63

def update_lambda_code(self, function_name: str, zip_file: bytes = None, s3_bucket: str = None, s3_key: str = None, **kwargs) -> dict:

64

"""

65

Update Lambda function code.

66

67

Parameters:

68

- function_name: Name or ARN of the Lambda function

69

- zip_file: Deployment package as bytes

70

- s3_bucket: S3 bucket containing deployment package

71

- s3_key: S3 key for deployment package

72

73

Returns:

74

Updated function configuration

75

"""

76

77

def update_lambda_config(self, function_name: str, role: str = None, handler: str = None, description: str = None, timeout: int = None, memory_size: int = None, **kwargs) -> dict:

78

"""

79

Update Lambda function configuration.

80

81

Parameters:

82

- function_name: Name or ARN of the Lambda function

83

- role: ARN of the IAM role for the function

84

- handler: Entry point for the function

85

- description: Function description

86

- timeout: Function timeout in seconds

87

- memory_size: Memory allocation in MB

88

89

Returns:

90

Updated function configuration

91

"""

92

93

def get_function(self, function_name: str, qualifier: str = '$LATEST') -> dict:

94

"""

95

Get Lambda function configuration.

96

97

Parameters:

98

- function_name: Name or ARN of the Lambda function

99

- qualifier: Function version or alias

100

101

Returns:

102

Function configuration and metadata

103

"""

104

105

def list_functions(self, function_version: str = 'ALL', marker: str = None, max_items: int = None) -> list:

106

"""

107

List Lambda functions.

108

109

Parameters:

110

- function_version: Function version filter ('ALL', 'LATEST')

111

- marker: Pagination marker

112

- max_items: Maximum number of functions to return

113

114

Returns:

115

List of function configurations

116

"""

117

118

def list_versions_by_function(self, function_name: str, marker: str = None, max_items: int = None) -> list:

119

"""

120

List versions of a Lambda function.

121

122

Parameters:

123

- function_name: Name or ARN of the Lambda function

124

- marker: Pagination marker

125

- max_items: Maximum number of versions to return

126

127

Returns:

128

List of function versions

129

"""

130

131

def publish_version(self, function_name: str, code_sha256: str = None, description: str = '') -> dict:

132

"""

133

Publish a new version of a Lambda function.

134

135

Parameters:

136

- function_name: Name or ARN of the Lambda function

137

- code_sha256: SHA256 hash of deployment package

138

- description: Version description

139

140

Returns:

141

Published version configuration

142

"""

143

144

def create_alias(self, function_name: str, name: str, function_version: str, description: str = '') -> dict:

145

"""

146

Create an alias for a Lambda function version.

147

148

Parameters:

149

- function_name: Name or ARN of the Lambda function

150

- name: Alias name

151

- function_version: Function version for the alias

152

- description: Alias description

153

154

Returns:

155

Alias configuration

156

"""

157

158

def update_alias(self, function_name: str, name: str, function_version: str = None, description: str = None) -> dict:

159

"""

160

Update a Lambda function alias.

161

162

Parameters:

163

- function_name: Name or ARN of the Lambda function

164

- name: Alias name

165

- function_version: Function version for the alias

166

- description: Alias description

167

168

Returns:

169

Updated alias configuration

170

"""

171

172

def delete_alias(self, function_name: str, name: str) -> None:

173

"""

174

Delete a Lambda function alias.

175

176

Parameters:

177

- function_name: Name or ARN of the Lambda function

178

- name: Alias name

179

"""

180

181

def get_policy(self, function_name: str, qualifier: str = None) -> dict:

182

"""

183

Get Lambda function policy.

184

185

Parameters:

186

- function_name: Name or ARN of the Lambda function

187

- qualifier: Function version or alias

188

189

Returns:

190

Function policy

191

"""

192

193

def add_permission(self, function_name: str, statement_id: str, action: str, principal: str, source_arn: str = None, **kwargs) -> dict:

194

"""

195

Add permission to Lambda function policy.

196

197

Parameters:

198

- function_name: Name or ARN of the Lambda function

199

- statement_id: Unique statement identifier

200

- action: AWS Lambda action (e.g., 'lambda:InvokeFunction')

201

- principal: Principal being granted permission

202

- source_arn: ARN of the resource invoking the function

203

204

Returns:

205

Statement that was added

206

"""

207

208

def remove_permission(self, function_name: str, statement_id: str, qualifier: str = None) -> None:

209

"""

210

Remove permission from Lambda function policy.

211

212

Parameters:

213

- function_name: Name or ARN of the Lambda function

214

- statement_id: Statement identifier to remove

215

- qualifier: Function version or alias

216

"""

217

```

218

219

### Lambda Operators

220

221

Task implementations for Lambda operations that can be used directly in Airflow DAGs.

222

223

```python { .api }

224

class LambdaInvokeFunctionOperator(BaseOperator):

225

def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', log_type: str = 'None', qualifier: str = '$LATEST', aws_conn_id: str = 'aws_default', **kwargs):

226

"""

227

Invoke a Lambda function.

228

229

Parameters:

230

- function_name: Name or ARN of the Lambda function

231

- payload: JSON payload for function input

232

- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')

233

- log_type: Log type ('None' or 'Tail')

234

- qualifier: Function version or alias

235

- aws_conn_id: AWS connection ID

236

"""

237

238

class LambdaCreateFunctionOperator(BaseOperator):

239

def __init__(self, function_name: str, runtime: str, role: str, handler: str, code: dict, description: str = '', timeout: int = 3, memory_size: int = 128, aws_conn_id: str = 'aws_default', **kwargs):

240

"""

241

Create a Lambda function.

242

243

Parameters:

244

- function_name: Name of the Lambda function

245

- runtime: Runtime environment (e.g., 'python3.9', 'nodejs18.x')

246

- role: ARN of the IAM role for the function

247

- handler: Entry point for the function

248

- code: Code configuration dictionary

249

- description: Function description

250

- timeout: Function timeout in seconds

251

- memory_size: Memory allocation in MB

252

- aws_conn_id: AWS connection ID

253

"""

254

```

255

256

### Lambda Sensors

257

258

Monitoring tasks that wait for specific Lambda function states or execution conditions.

259

260

```python { .api }

261

class LambdaFunctionStateSensor(BaseSensorOperator):

262

def __init__(self, function_name: str, qualifier: str = '$LATEST', target_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):

263

"""

264

Wait for Lambda function to reach target state.

265

266

Parameters:

267

- function_name: Name or ARN of the Lambda function

268

- qualifier: Function version or alias

269

- target_states: List of target function states

270

- aws_conn_id: AWS connection ID

271

"""

272

```

273

274

### Lambda Triggers

275

276

Asynchronous triggers for efficient Lambda function monitoring.

277

278

```python { .api }

279

class LambdaInvokeFunctionTrigger(BaseTrigger):

280

def __init__(self, function_name: str, payload: str = None, invocation_type: str = 'RequestResponse', aws_conn_id: str = 'aws_default', **kwargs):

281

"""

282

Asynchronous trigger for Lambda function invocation.

283

284

Parameters:

285

- function_name: Name or ARN of the Lambda function

286

- payload: JSON payload for function input

287

- invocation_type: Invocation type ('RequestResponse', 'Event', 'DryRun')

288

- aws_conn_id: AWS connection ID

289

"""

290

```

291

292

## Usage Examples

293

294

### Basic Lambda Invocation

295

296

```python

297

from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook

298

299

# Initialize hook

300

lambda_hook = LambdaHook(aws_conn_id='my_aws_conn')

301

302

# Invoke function synchronously

303

response = lambda_hook.invoke_lambda(

304

function_name='data-processor',

305

payload='{"input_data": "sample", "operation": "transform"}',

306

invocation_type='RequestResponse'

307

)

308

309

print(f"Function response: {response['Payload'].read()}")

310

print(f"Status code: {response['StatusCode']}")

311

312

# Invoke function asynchronously

313

lambda_hook.invoke_lambda(

314

function_name='notification-sender',

315

payload='{"message": "Processing complete", "recipient": "admin@example.com"}',

316

invocation_type='Event'

317

)

318

```

319

320

### Lambda Function Management

321

322

```python

323

# Create a new function

324

function_config = lambda_hook.create_lambda(

325

function_name='my-data-processor',

326

runtime='python3.9',

327

role='arn:aws:iam::123456789012:role/lambda-execution-role',

328

handler='lambda_function.lambda_handler',

329

code={

330

'S3Bucket': 'my-lambda-deployments',

331

'S3Key': 'functions/data-processor-v1.0.0.zip'

332

},

333

description='Processes incoming data files',

334

timeout=300,

335

memory_size=512,

336

environment={'Variables': {'ENVIRONMENT': 'production'}}

337

)

338

339

# Update function code

340

lambda_hook.update_lambda_code(

341

function_name='my-data-processor',

342

s3_bucket='my-lambda-deployments',

343

s3_key='functions/data-processor-v1.1.0.zip'

344

)

345

346

# Publish a new version

347

version = lambda_hook.publish_version(

348

function_name='my-data-processor',

349

description='Bug fixes and performance improvements'

350

)

351

352

# Create alias for production

353

lambda_hook.create_alias(

354

function_name='my-data-processor',

355

name='PROD',

356

function_version=version['Version'],

357

description='Production alias'

358

)

359

```

360

361

### Lambda DAG Operations

362

363

```python

364

from airflow import DAG

365

from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator

366

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

367

368

dag = DAG('lambda_workflow', start_date=datetime(2023, 1, 1))

369

370

wait_for_input = S3KeySensor(

371

task_id='wait_for_input',

372

bucket_name='data-input-bucket',

373

bucket_key='incoming/{{ ds }}/data.json',

374

timeout=3600,

375

dag=dag

376

)

377

378

process_data = LambdaInvokeFunctionOperator(

379

task_id='process_data',

380

function_name='data-processor:PROD',

381

payload='{"bucket": "data-input-bucket", "key": "incoming/{{ ds }}/data.json", "output_bucket": "data-output-bucket"}',

382

invocation_type='RequestResponse',

383

log_type='Tail',

384

aws_conn_id='aws_default',

385

dag=dag

386

)

387

388

send_notification = LambdaInvokeFunctionOperator(

389

task_id='send_notification',

390

function_name='notification-sender',

391

payload='{"message": "Data processing completed for {{ ds }}", "channel": "data-team"}',

392

invocation_type='Event',

393

dag=dag

394

)

395

396

wait_for_input >> process_data >> send_notification

397

```

398

399

### Advanced Lambda Usage

400

401

```python

402

# Parallel function invocations with different configurations

403

parallel_processors = []

404

for region in ['us-east-1', 'us-west-2', 'eu-west-1']:

405

task = LambdaInvokeFunctionOperator(

406

task_id=f'process_data_{region}',

407

function_name=f'regional-processor-{region}',

408

payload=f'{{"region": "{region}", "date": "{{{{ ds }}}}"}}',

409

aws_conn_id=f'aws_{region}',

410

dag=dag

411

)

412

parallel_processors.append(task)

413

414

# Fan-out/fan-in pattern

415

fan_out = LambdaInvokeFunctionOperator(

416

task_id='distribute_work',

417

function_name='work-distributor',

418

payload='{"batch_size": 1000, "total_records": 50000}',

419

dag=dag

420

)

421

422

aggregate_results = LambdaInvokeFunctionOperator(

423

task_id='aggregate_results',

424

function_name='result-aggregator',

425

payload='{"batch_count": 50}',

426

trigger_rule='all_success',

427

dag=dag

428

)

429

430

fan_out >> parallel_processors >> aggregate_results

431

```

432

433

## Types

434

435

```python { .api }

436

# Lambda function identifiers

437

FunctionName = str

438

FunctionArn = str

439

QualifiedFunctionName = str # function_name:qualifier

440

441

# Lambda runtime environments

442

class LambdaRuntime:

443

PYTHON_3_8 = 'python3.8'

444

PYTHON_3_9 = 'python3.9'

445

PYTHON_3_10 = 'python3.10'

446

PYTHON_3_11 = 'python3.11'

447

NODEJS_18_X = 'nodejs18.x'

448

NODEJS_20_X = 'nodejs20.x'

449

JAVA_8 = 'java8'

450

JAVA_11 = 'java11'

451

JAVA_17 = 'java17'

452

DOTNET_6 = 'dotnet6'

453

GO_1_X = 'go1.x'

454

RUBY_2_7 = 'ruby2.7'

455

PROVIDED = 'provided'

456

PROVIDED_AL2 = 'provided.al2'

457

458

# Invocation types

459

class InvocationType:

460

REQUEST_RESPONSE = 'RequestResponse' # Synchronous

461

EVENT = 'Event' # Asynchronous

462

DRY_RUN = 'DryRun' # Validate parameters and access

463

464

# Function states

465

class FunctionState:

466

PENDING = 'Pending'

467

ACTIVE = 'Active'

468

INACTIVE = 'Inactive'

469

FAILED = 'Failed'

470

471

# Function configuration

472

class LambdaFunctionConfig:

473

function_name: str

474

function_arn: str

475

runtime: str

476

role: str

477

handler: str

478

code_size: int

479

description: str

480

timeout: int

481

memory_size: int

482

last_modified: str

483

code_sha256: str

484

version: str

485

environment: dict

486

dead_letter_config: dict

487

kms_key_arn: str

488

tracing_config: dict

489

layers: list

490

state: str

491

state_reason: str

492

493

# Code configuration

494

class CodeConfig:

495

s3_bucket: str = None

496

s3_key: str = None

497

s3_object_version: str = None

498

zip_file: bytes = None

499

image_uri: str = None

500

501

# Environment variables

502

class EnvironmentConfig:

503

variables: dict

504

505

# Dead letter queue configuration

506

class DeadLetterConfig:

507

target_arn: str

508

509

# Tracing configuration

510

class TracingConfig:

511

mode: str # 'Active' or 'PassThrough'

512

513

# VPC configuration

514

class VpcConfig:

515

subnet_ids: list

516

security_group_ids: list

517

```