or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-side-encryption.mddynamodb.mdexperimental.mdindex.mds3-operations.mdsession-management.md

experimental.mddocs/

0

# Experimental Features

1

2

Experimental integrations and features including Chalice framework support for serverless applications. These features are in development and may change in future versions.

3

4

## Capabilities

5

6

### AsyncChalice Framework Integration

7

8

Enhanced Chalice application class with async support and integrated aioboto3 session management for AWS Lambda functions.

9

10

```python { .api }

11

class AsyncChalice:

12

def __init__(

13

self,

14

*args,

15

aioboto3_session: Session = None,

16

**kwargs

17

):

18

"""

19

Initialize AsyncChalice application with aioboto3 integration.

20

21

Parameters:

22

- *args: Standard Chalice initialization arguments

23

- aioboto3_session: Optional aioboto3 Session instance

24

- **kwargs: Standard Chalice initialization keyword arguments

25

"""

26

27

def __call__(self, event, context):

28

"""

29

Lambda handler entry point.

30

31

Parameters:

32

- event: AWS Lambda event object

33

- context: AWS Lambda context object

34

35

Returns:

36

Response from the Chalice application

37

"""

38

39

@property

40

def aioboto3(self) -> Session:

41

"""

42

Access to the integrated aioboto3 session.

43

44

Returns:

45

Session: The aioboto3 session instance

46

"""

47

```

48

49

### Async REST API Event Handler

50

51

Enhanced REST API event handler that supports async view functions with automatic coroutine handling.

52

53

```python { .api }

54

class AsyncRestAPIEventHandler:

55

def _get_view_function_response(self, view_function, function_args):

56

"""

57

Handle both sync and async view functions.

58

59

Automatically detects coroutines and runs them in event loop.

60

61

Parameters:

62

- view_function: The view function to execute

63

- function_args: Arguments to pass to the view function

64

65

Returns:

66

Response from the view function

67

"""

68

```

69

70

## Usage Examples

71

72

### Basic AsyncChalice Application

73

74

```python

75

from chalice import Chalice

76

from aioboto3.experimental.async_chalice import AsyncChalice

77

from aioboto3 import Session

78

import asyncio

79

80

# Create AsyncChalice app with aioboto3 integration

81

app = AsyncChalice(app_name='my-async-app')

82

83

@app.route('/')

84

async def index():

85

"""Async route handler."""

86

# Access integrated aioboto3 session

87

async with app.aioboto3.client('s3') as s3:

88

response = await s3.list_buckets()

89

bucket_names = [bucket['Name'] for bucket in response['Buckets']]

90

91

return {

92

'message': 'Hello from async Chalice!',

93

'buckets': bucket_names

94

}

95

96

@app.route('/dynamo/{table_name}', methods=['GET'])

97

async def get_table_info(table_name):

98

"""Get DynamoDB table information."""

99

async with app.aioboto3.resource('dynamodb') as dynamodb:

100

table = await dynamodb.Table(table_name)

101

102

# Get table description

103

response = await table.meta.client.describe_table(TableName=table_name)

104

105

return {

106

'table_name': table_name,

107

'item_count': response['Table']['ItemCount'],

108

'status': response['Table']['TableStatus']

109

}

110

```

111

112

### Custom Session Configuration

113

114

```python

115

from aioboto3.experimental.async_chalice import AsyncChalice

116

from aioboto3 import Session

117

118

# Create custom aioboto3 session

119

custom_session = Session(

120

region_name='us-west-2',

121

profile_name='production'

122

)

123

124

# Initialize AsyncChalice with custom session

125

app = AsyncChalice(

126

app_name='my-custom-app',

127

aioboto3_session=custom_session

128

)

129

130

@app.route('/upload', methods=['POST'])

131

async def upload_file():

132

"""Upload file to S3 using custom session."""

133

request = app.current_request

134

raw_body = request.raw_body

135

136

# Use the custom session for S3 operations

137

async with app.aioboto3.client('s3') as s3:

138

await s3.put_object(

139

Bucket='my-app-uploads',

140

Key='uploaded-file.bin',

141

Body=raw_body

142

)

143

144

return {'status': 'uploaded', 'size': len(raw_body)}

145

```

146

147

### Mixed Sync/Async Routes

148

149

```python

150

from aioboto3.experimental.async_chalice import AsyncChalice

151

152

app = AsyncChalice(app_name='mixed-app')

153

154

@app.route('/sync')

155

def sync_route():

156

"""Traditional synchronous route."""

157

return {'type': 'sync', 'message': 'This is a sync route'}

158

159

@app.route('/async')

160

async def async_route():

161

"""Async route with AWS operations."""

162

async with app.aioboto3.client('ssm') as ssm:

163

response = await ssm.get_parameter(

164

Name='/my-app/config/database-url'

165

)

166

167

return {

168

'type': 'async',

169

'parameter_value': response['Parameter']['Value']

170

}

171

172

@app.route('/batch-operations', methods=['POST'])

173

async def batch_operations():

174

"""Perform batch AWS operations."""

175

request = app.current_request

176

items = request.json_body.get('items', [])

177

178

async with app.aioboto3.resource('dynamodb') as dynamodb:

179

table = await dynamodb.Table('my-items')

180

181

# Use batch writer for efficient operations

182

async with table.batch_writer() as batch:

183

for item in items:

184

await batch.put_item(Item=item)

185

186

return {'processed': len(items)}

187

```

188

189

### Error Handling in Async Routes

190

191

```python

192

from chalice import BadRequestError, InternalServerError

193

from botocore.exceptions import ClientError

194

195

app = AsyncChalice(app_name='error-handling-app')

196

197

@app.route('/safe-operation/{resource_id}', methods=['GET'])

198

async def safe_operation(resource_id):

199

"""Route with comprehensive error handling."""

200

try:

201

async with app.aioboto3.client('dynamodb') as dynamodb:

202

response = await dynamodb.get_item(

203

TableName='my-resources',

204

Key={'id': {'S': resource_id}}

205

)

206

207

if 'Item' not in response:

208

raise BadRequestError(f"Resource {resource_id} not found")

209

210

return {

211

'resource_id': resource_id,

212

'data': response['Item']

213

}

214

215

except ClientError as e:

216

error_code = e.response['Error']['Code']

217

218

if error_code == 'ResourceNotFoundException':

219

raise BadRequestError("Table does not exist")

220

elif error_code == 'AccessDeniedException':

221

raise InternalServerError("Access denied to DynamoDB")

222

else:

223

raise InternalServerError(f"AWS error: {error_code}")

224

225

except Exception as e:

226

app.log.error(f"Unexpected error: {e}")

227

raise InternalServerError("Internal server error")

228

```

229

230

### Integration with Other AWS Services

231

232

```python

233

import json

234

from datetime import datetime

235

236

app = AsyncChalice(app_name='multi-service-app')

237

238

@app.route('/process-data', methods=['POST'])

239

async def process_data():

240

"""Process data using multiple AWS services."""

241

request = app.current_request

242

data = request.json_body

243

244

results = {}

245

246

# Store in DynamoDB

247

async with app.aioboto3.resource('dynamodb') as dynamodb:

248

table = await dynamodb.Table('processed-data')

249

250

item = {

251

'id': data.get('id'),

252

'processed_at': datetime.utcnow().isoformat(),

253

'data': data

254

}

255

256

await table.put_item(Item=item)

257

results['dynamodb'] = 'stored'

258

259

# Send to SQS

260

async with app.aioboto3.client('sqs') as sqs:

261

queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'

262

263

await sqs.send_message(

264

QueueUrl=queue_url,

265

MessageBody=json.dumps(data)

266

)

267

results['sqs'] = 'queued'

268

269

# Upload to S3

270

async with app.aioboto3.client('s3') as s3:

271

key = f"processed/{data.get('id')}.json"

272

273

await s3.put_object(

274

Bucket='my-processed-data',

275

Key=key,

276

Body=json.dumps(item),

277

ContentType='application/json'

278

)

279

results['s3'] = key

280

281

return results

282

```

283

284

### Lambda Context Access

285

286

```python

287

app = AsyncChalice(app_name='context-app')

288

289

@app.route('/lambda-info')

290

async def lambda_info():

291

"""Access Lambda context information."""

292

# Access Lambda context through app.lambda_context

293

context = app.lambda_context

294

295

return {

296

'function_name': context.function_name,

297

'function_version': context.function_version,

298

'memory_limit': context.memory_limit_in_mb,

299

'request_id': context.aws_request_id,

300

'remaining_time': context.get_remaining_time_in_millis()

301

}

302

303

@app.route('/performance-test')

304

async def performance_test():

305

"""Test async performance in Lambda."""

306

import time

307

308

start_time = time.time()

309

310

# Perform concurrent operations

311

tasks = []

312

async with app.aioboto3.client('s3') as s3:

313

# Create multiple concurrent S3 operations

314

for i in range(5):

315

task = s3.list_objects_v2(Bucket='my-test-bucket', MaxKeys=1)

316

tasks.append(task)

317

318

# Wait for all operations to complete

319

results = await asyncio.gather(*tasks)

320

321

end_time = time.time()

322

323

return {

324

'operations': len(results),

325

'duration_seconds': end_time - start_time,

326

'remaining_lambda_time': app.lambda_context.get_remaining_time_in_millis()

327

}

328

```