0
# DynamoDB Operations
1
2
Enhanced async DynamoDB operations including batch writing capabilities and async context management for table resources. aioboto3 provides async versions of boto3's DynamoDB high-level interface with additional performance optimizations.
3
4
## Capabilities
5
6
### Table Resource Enhancement
7
8
Enhanced table resources with async batch writing capabilities. The CustomTableResource extends the standard boto3 table interface with async-specific optimizations.
9
10
```python { .api }
11
class CustomTableResource:
12
def batch_writer(
13
self,
14
overwrite_by_pkeys = None,
15
flush_amount: int = 25,
16
on_exit_loop_sleep: int = 0
17
):
18
"""
19
Create an async batch writer for efficient bulk operations.
20
21
Parameters:
22
- overwrite_by_pkeys: List of primary key names to use for overwrite detection
23
- flush_amount: Number of items to buffer before automatic flush
24
- on_exit_loop_sleep: Sleep time in seconds when exiting context manager
25
26
Returns:
27
BatchWriter: Async context manager for batch operations
28
"""
29
```
30
31
### Async Batch Writing
32
33
Async batch writer for efficient bulk DynamoDB operations with automatic batching and retry logic.
34
35
```python { .api }
36
class BatchWriter:
37
def __init__(
38
self,
39
table_name: str,
40
client,
41
flush_amount: int = 25,
42
overwrite_by_pkeys = None,
43
on_exit_loop_sleep: int = 0
44
):
45
"""
46
Initialize batch writer.
47
48
Parameters:
49
- table_name: Name of the DynamoDB table
50
- client: DynamoDB client instance
51
- flush_amount: Number of items to buffer before flushing
52
- overwrite_by_pkeys: Primary key names for overwrite detection
53
- on_exit_loop_sleep: Sleep time when exiting context
54
"""
55
56
async def __aenter__(self):
57
"""
58
Async context manager entry.
59
60
Returns:
61
self: The BatchWriter instance
62
"""
63
64
async def __aexit__(self, exc_type, exc_val, exc_tb):
65
"""
66
Async context manager exit. Flushes any remaining operations.
67
68
Parameters:
69
- exc_type: Exception type if an exception occurred
70
- exc_val: Exception value if an exception occurred
71
- exc_tb: Exception traceback if an exception occurred
72
"""
73
74
async def put_item(self, Item: dict, **kwargs):
75
"""
76
Add a put item operation to the batch.
77
78
Parameters:
79
- Item: Dictionary representing the item to put
80
- **kwargs: Additional parameters for the put operation
81
"""
82
83
async def delete_item(self, Key: dict, **kwargs):
84
"""
85
Add a delete item operation to the batch.
86
87
Parameters:
88
- Key: Dictionary representing the key of the item to delete
89
- **kwargs: Additional parameters for the delete operation
90
"""
91
92
async def _flush(self):
93
"""
94
Flush all pending batch operations to DynamoDB.
95
Internal method called automatically when buffer is full.
96
"""
97
```
98
99
### Table Operations
100
101
Standard async table operations inherited from boto3 with async/await support.
102
103
```python { .api }
104
# These are the standard boto3 table methods, now with async support
105
async def put_item(self, Item: dict, **kwargs):
106
"""Put an item into the table."""
107
108
async def get_item(self, Key: dict, **kwargs):
109
"""Get an item from the table."""
110
111
async def update_item(self, Key: dict, **kwargs):
112
"""Update an item in the table."""
113
114
async def delete_item(self, Key: dict, **kwargs):
115
"""Delete an item from the table."""
116
117
async def query(self, **kwargs):
118
"""Query items from the table."""
119
120
async def scan(self, **kwargs):
121
"""Scan items from the table."""
122
123
async def batch_get_item(self, **kwargs):
124
"""Batch get multiple items."""
125
```
126
127
## Usage Examples
128
129
### Basic Table Operations
130
131
```python
132
import aioboto3
133
from boto3.dynamodb.conditions import Key
134
135
async def basic_table_operations():
136
session = aioboto3.Session()
137
138
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
139
table = await dynamodb.Table('my-table')
140
141
# Put an item
142
await table.put_item(
143
Item={
144
'id': '123',
145
'name': 'John Doe',
146
'email': 'john@example.com'
147
}
148
)
149
150
# Get an item
151
response = await table.get_item(Key={'id': '123'})
152
item = response.get('Item')
153
154
# Query items
155
response = await table.query(
156
KeyConditionExpression=Key('id').eq('123')
157
)
158
items = response['Items']
159
160
# Update an item
161
await table.update_item(
162
Key={'id': '123'},
163
UpdateExpression='SET #name = :name',
164
ExpressionAttributeNames={'#name': 'name'},
165
ExpressionAttributeValues={':name': 'Jane Doe'}
166
)
167
168
# Delete an item
169
await table.delete_item(Key={'id': '123'})
170
```
171
172
### Batch Writing Operations
173
174
```python
175
async def batch_operations():
176
session = aioboto3.Session()
177
178
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
179
table = await dynamodb.Table('my-table')
180
181
# Use batch writer for efficient bulk operations
182
async with table.batch_writer() as batch:
183
# Add multiple items
184
for i in range(100):
185
await batch.put_item(
186
Item={
187
'id': str(i),
188
'data': f'item-{i}',
189
'timestamp': int(time.time())
190
}
191
)
192
193
# Mix puts and deletes
194
await batch.delete_item(Key={'id': '50'})
195
196
# Items are automatically flushed when buffer reaches flush_amount
197
# or when exiting the context manager
198
```
199
200
### Advanced Batch Writer Configuration
201
202
```python
203
async def advanced_batch_writer():
204
session = aioboto3.Session()
205
206
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
207
table = await dynamodb.Table('my-table')
208
209
# Configure batch writer with custom settings
210
async with table.batch_writer(
211
flush_amount=50, # Flush every 50 items instead of 25
212
overwrite_by_pkeys=['id'], # Detect overwrites by 'id' field
213
on_exit_loop_sleep=1 # Sleep 1 second when exiting
214
) as batch:
215
216
items = [
217
{'id': '1', 'name': 'Alice'},
218
{'id': '2', 'name': 'Bob'},
219
{'id': '1', 'name': 'Alice Updated'} # Will overwrite first item
220
]
221
222
for item in items:
223
await batch.put_item(Item=item)
224
```
225
226
### Error Handling
227
228
```python
229
import botocore.exceptions
230
231
async def handle_dynamodb_errors():
232
session = aioboto3.Session()
233
234
try:
235
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
236
table = await dynamodb.Table('my-table')
237
238
await table.put_item(Item={'id': '123', 'data': 'test'})
239
240
except botocore.exceptions.ClientError as e:
241
error_code = e.response['Error']['Code']
242
243
if error_code == 'ResourceNotFoundException':
244
print("Table does not exist")
245
elif error_code == 'ValidationException':
246
print("Invalid request parameters")
247
elif error_code == 'ProvisionedThroughputExceededException':
248
print("Request rate too high")
249
else:
250
print(f"Unexpected error: {error_code}")
251
```