0
# Experimental Features
1
2
Experimental integrations and features including Chalice framework support for serverless applications. These features are in development and may change in future versions.
3
4
## Capabilities
5
6
### AsyncChalice Framework Integration
7
8
Enhanced Chalice application class with async support and integrated aioboto3 session management for AWS Lambda functions.
9
10
```python { .api }
11
class AsyncChalice:
12
def __init__(
13
self,
14
*args,
15
aioboto3_session: Session = None,
16
**kwargs
17
):
18
"""
19
Initialize AsyncChalice application with aioboto3 integration.
20
21
Parameters:
22
- *args: Standard Chalice initialization arguments
23
- aioboto3_session: Optional aioboto3 Session instance
24
- **kwargs: Standard Chalice initialization keyword arguments
25
"""
26
27
def __call__(self, event, context):
28
"""
29
Lambda handler entry point.
30
31
Parameters:
32
- event: AWS Lambda event object
33
- context: AWS Lambda context object
34
35
Returns:
36
Response from the Chalice application
37
"""
38
39
@property
40
def aioboto3(self) -> Session:
41
"""
42
Access to the integrated aioboto3 session.
43
44
Returns:
45
Session: The aioboto3 session instance
46
"""
47
```
48
49
### Async REST API Event Handler
50
51
Enhanced REST API event handler that supports async view functions with automatic coroutine handling.
52
53
```python { .api }
54
class AsyncRestAPIEventHandler:
55
def _get_view_function_response(self, view_function, function_args):
56
"""
57
Handle both sync and async view functions.
58
59
Automatically detects coroutines and runs them in event loop.
60
61
Parameters:
62
- view_function: The view function to execute
63
- function_args: Arguments to pass to the view function
64
65
Returns:
66
Response from the view function
67
"""
68
```
69
70
## Usage Examples
71
72
### Basic AsyncChalice Application
73
74
```python
75
from chalice import Chalice
76
from aioboto3.experimental.async_chalice import AsyncChalice
77
from aioboto3 import Session
78
import asyncio
79
80
# Create AsyncChalice app with aioboto3 integration
81
app = AsyncChalice(app_name='my-async-app')
82
83
@app.route('/')
84
async def index():
85
"""Async route handler."""
86
# Access integrated aioboto3 session
87
async with app.aioboto3.client('s3') as s3:
88
response = await s3.list_buckets()
89
bucket_names = [bucket['Name'] for bucket in response['Buckets']]
90
91
return {
92
'message': 'Hello from async Chalice!',
93
'buckets': bucket_names
94
}
95
96
@app.route('/dynamo/{table_name}', methods=['GET'])
97
async def get_table_info(table_name):
98
"""Get DynamoDB table information."""
99
async with app.aioboto3.resource('dynamodb') as dynamodb:
100
table = await dynamodb.Table(table_name)
101
102
# Get table description
103
response = await table.meta.client.describe_table(TableName=table_name)
104
105
return {
106
'table_name': table_name,
107
'item_count': response['Table']['ItemCount'],
108
'status': response['Table']['TableStatus']
109
}
110
```
111
112
### Custom Session Configuration
113
114
```python
115
from aioboto3.experimental.async_chalice import AsyncChalice
116
from aioboto3 import Session
117
118
# Create custom aioboto3 session
119
custom_session = Session(
120
region_name='us-west-2',
121
profile_name='production'
122
)
123
124
# Initialize AsyncChalice with custom session
125
app = AsyncChalice(
126
app_name='my-custom-app',
127
aioboto3_session=custom_session
128
)
129
130
@app.route('/upload', methods=['POST'])
131
async def upload_file():
132
"""Upload file to S3 using custom session."""
133
request = app.current_request
134
raw_body = request.raw_body
135
136
# Use the custom session for S3 operations
137
async with app.aioboto3.client('s3') as s3:
138
await s3.put_object(
139
Bucket='my-app-uploads',
140
Key='uploaded-file.bin',
141
Body=raw_body
142
)
143
144
return {'status': 'uploaded', 'size': len(raw_body)}
145
```
146
147
### Mixed Sync/Async Routes
148
149
```python
150
from aioboto3.experimental.async_chalice import AsyncChalice
151
152
app = AsyncChalice(app_name='mixed-app')
153
154
@app.route('/sync')
155
def sync_route():
156
"""Traditional synchronous route."""
157
return {'type': 'sync', 'message': 'This is a sync route'}
158
159
@app.route('/async')
160
async def async_route():
161
"""Async route with AWS operations."""
162
async with app.aioboto3.client('ssm') as ssm:
163
response = await ssm.get_parameter(
164
Name='/my-app/config/database-url'
165
)
166
167
return {
168
'type': 'async',
169
'parameter_value': response['Parameter']['Value']
170
}
171
172
@app.route('/batch-operations', methods=['POST'])
173
async def batch_operations():
174
"""Perform batch AWS operations."""
175
request = app.current_request
176
items = request.json_body.get('items', [])
177
178
async with app.aioboto3.resource('dynamodb') as dynamodb:
179
table = await dynamodb.Table('my-items')
180
181
# Use batch writer for efficient operations
182
async with table.batch_writer() as batch:
183
for item in items:
184
await batch.put_item(Item=item)
185
186
return {'processed': len(items)}
187
```
188
189
### Error Handling in Async Routes
190
191
```python
192
from chalice import BadRequestError, InternalServerError
193
from botocore.exceptions import ClientError
194
195
app = AsyncChalice(app_name='error-handling-app')
196
197
@app.route('/safe-operation/{resource_id}', methods=['GET'])
198
async def safe_operation(resource_id):
199
"""Route with comprehensive error handling."""
200
try:
201
async with app.aioboto3.client('dynamodb') as dynamodb:
202
response = await dynamodb.get_item(
203
TableName='my-resources',
204
Key={'id': {'S': resource_id}}
205
)
206
207
if 'Item' not in response:
208
raise BadRequestError(f"Resource {resource_id} not found")
209
210
return {
211
'resource_id': resource_id,
212
'data': response['Item']
213
}
214
215
except ClientError as e:
216
error_code = e.response['Error']['Code']
217
218
if error_code == 'ResourceNotFoundException':
219
raise BadRequestError("Table does not exist")
220
elif error_code == 'AccessDeniedException':
221
raise InternalServerError("Access denied to DynamoDB")
222
else:
223
raise InternalServerError(f"AWS error: {error_code}")
224
225
except Exception as e:
226
app.log.error(f"Unexpected error: {e}")
227
raise InternalServerError("Internal server error")
228
```
229
230
### Integration with Other AWS Services
231
232
```python
233
import json
234
from datetime import datetime
235
236
app = AsyncChalice(app_name='multi-service-app')
237
238
@app.route('/process-data', methods=['POST'])
239
async def process_data():
240
"""Process data using multiple AWS services."""
241
request = app.current_request
242
data = request.json_body
243
244
results = {}
245
246
# Store in DynamoDB
247
async with app.aioboto3.resource('dynamodb') as dynamodb:
248
table = await dynamodb.Table('processed-data')
249
250
item = {
251
'id': data.get('id'),
252
'processed_at': datetime.utcnow().isoformat(),
253
'data': data
254
}
255
256
await table.put_item(Item=item)
257
results['dynamodb'] = 'stored'
258
259
# Send to SQS
260
async with app.aioboto3.client('sqs') as sqs:
261
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
262
263
await sqs.send_message(
264
QueueUrl=queue_url,
265
MessageBody=json.dumps(data)
266
)
267
results['sqs'] = 'queued'
268
269
# Upload to S3
270
async with app.aioboto3.client('s3') as s3:
271
key = f"processed/{data.get('id')}.json"
272
273
await s3.put_object(
274
Bucket='my-processed-data',
275
Key=key,
276
Body=json.dumps(item),
277
ContentType='application/json'
278
)
279
results['s3'] = key
280
281
return results
282
```
283
284
### Lambda Context Access
285
286
```python
287
app = AsyncChalice(app_name='context-app')
288
289
@app.route('/lambda-info')
290
async def lambda_info():
291
"""Access Lambda context information."""
292
# Access Lambda context through app.lambda_context
293
context = app.lambda_context
294
295
return {
296
'function_name': context.function_name,
297
'function_version': context.function_version,
298
'memory_limit': context.memory_limit_in_mb,
299
'request_id': context.aws_request_id,
300
'remaining_time': context.get_remaining_time_in_millis()
301
}
302
303
@app.route('/performance-test')
304
async def performance_test():
305
"""Test async performance in Lambda."""
306
import time
307
308
start_time = time.time()
309
310
# Perform concurrent operations
311
tasks = []
312
async with app.aioboto3.client('s3') as s3:
313
# Create multiple concurrent S3 operations
314
for i in range(5):
315
task = s3.list_objects_v2(Bucket='my-test-bucket', MaxKeys=1)
316
tasks.append(task)
317
318
# Wait for all operations to complete
319
results = await asyncio.gather(*tasks)
320
321
end_time = time.time()
322
323
return {
324
'operations': len(results),
325
'duration_seconds': end_time - start_time,
326
'remaining_lambda_time': app.lambda_context.get_remaining_time_in_millis()
327
}
328
```