0
# Chunked Operations
1
2
High-performance batch processing for large datasets using context managers to automatically handle chunked inserts and updates with configurable batch sizes and callback support. These classes optimize memory usage and database performance for bulk operations.
3
4
## Capabilities
5
6
### Chunked Insert Operations
7
8
Batch insert operations with automatic chunking and memory management.
9
10
```python { .api }
11
# Import pattern
12
from dataset import chunked
13
14
class ChunkedInsert:
15
def __init__(self, table, chunksize=1000, callback=None):
16
"""
17
Initialize chunked insert context manager.
18
19
Parameters:
20
- table: Table instance to insert into
21
- chunksize: int, number of rows per batch (default 1000)
22
- callback: callable, function called before each batch insert
23
receives the queue (list of rows) as parameter
24
"""
25
26
def insert(self, item):
27
"""
28
Add an item to the insert queue.
29
30
Parameters:
31
- item: dict, row data to insert
32
"""
33
34
def flush(self):
35
"""Force processing of queued items."""
36
37
def __enter__(self):
38
"""Enter context manager."""
39
40
def __exit__(self, exc_type, exc_val, exc_tb):
41
"""Exit context manager and flush remaining items."""
42
```
43
44
### Chunked Update Operations
45
46
Batch update operations with automatic chunking and grouping by field sets.
47
48
```python { .api }
49
class ChunkedUpdate:
50
def __init__(self, table, keys, chunksize=1000, callback=None):
51
"""
52
Initialize chunked update context manager.
53
54
Parameters:
55
- table: Table instance to update
56
- keys: list, column names to use as update filters
57
- chunksize: int, number of rows per batch (default 1000)
58
- callback: callable, function called before each batch update
59
receives the queue (list of rows) as parameter
60
"""
61
62
def update(self, item):
63
"""
64
Add an item to the update queue.
65
66
Parameters:
67
- item: dict, row data to update (must include key columns)
68
"""
69
70
def flush(self):
71
"""Force processing of queued items."""
72
73
def __enter__(self):
74
"""Enter context manager."""
75
76
def __exit__(self, exc_type, exc_val, exc_tb):
77
"""Exit context manager and flush remaining items."""
78
```
79
80
### Exception Handling
81
82
Exception types for chunked operations error handling.
83
84
```python { .api }
85
class InvalidCallback(ValueError):
86
"""Raised when an invalid callback is provided to chunked operations."""
87
```
88
89
## Usage Examples
90
91
### Basic Chunked Insert
92
93
```python
94
import dataset
95
from dataset import chunked
96
97
db = dataset.connect('sqlite:///example.db')
98
table = db['products']
99
100
# Basic chunked insert
101
with chunked.ChunkedInsert(table) as inserter:
102
for i in range(10000):
103
inserter.insert({
104
'name': f'Product {i}',
105
'price': i * 0.99,
106
'category': f'Category {i % 10}'
107
})
108
# Automatically flushes remaining items on context exit
109
```
110
111
### Chunked Insert with Custom Chunk Size
112
113
```python
114
# Custom chunk size for memory optimization
115
with chunked.ChunkedInsert(table, chunksize=500) as inserter:
116
for record in large_dataset:
117
inserter.insert({
118
'name': record.name,
119
'value': record.value,
120
'timestamp': record.created_at
121
})
122
```
123
124
### Chunked Insert with Callback
125
126
```python
127
def progress_callback(queue):
128
"""Called before each batch insert."""
129
print(f"Inserting batch of {len(queue)} records")
130
# Could also log, validate, or transform data here
131
132
def validation_callback(queue):
133
"""Validate data before insertion."""
134
for item in queue:
135
if 'required_field' not in item:
136
raise ValueError(f"Missing required field in {item}")
137
138
with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:
139
for data in data_source:
140
inserter.insert(data)
141
142
# Multiple callbacks via wrapper
143
def combined_callback(queue):
144
validation_callback(queue)
145
progress_callback(queue)
146
147
with chunked.ChunkedInsert(table, callback=combined_callback) as inserter:
148
for data in data_source:
149
inserter.insert(data)
150
```
151
152
### Basic Chunked Update
153
154
```python
155
# Update records based on ID
156
with chunked.ChunkedUpdate(table, keys=['id']) as updater:
157
for record in updated_records:
158
updater.update({
159
'id': record.id,
160
'name': record.new_name,
161
'price': record.new_price,
162
'updated_at': datetime.now()
163
})
164
```
165
166
### Chunked Update with Multiple Keys
167
168
```python
169
# Update using composite key (category + name)
170
with chunked.ChunkedUpdate(table, keys=['category', 'name']) as updater:
171
for update_data in updates:
172
updater.update({
173
'category': update_data.category,
174
'name': update_data.name,
175
'price': update_data.new_price,
176
'stock': update_data.new_stock
177
})
178
```
179
180
### Chunked Update with Callback
181
182
```python
183
def update_callback(queue):
184
"""Called before each batch update."""
185
print(f"Updating batch of {len(queue)} records")
186
187
# Group by operation type for logging
188
operations = {}
189
for item in queue:
190
op_type = item.get('operation_type', 'unknown')
191
operations[op_type] = operations.get(op_type, 0) + 1
192
193
for op_type, count in operations.items():
194
print(f" {op_type}: {count} records")
195
196
with chunked.ChunkedUpdate(table, keys=['id'], callback=update_callback) as updater:
197
for update in batch_updates:
198
updater.update(update)
199
```
200
201
### Memory-Efficient Large Dataset Processing
202
203
```python
204
def process_large_csv(filename, table):
205
"""Process a large CSV file with minimal memory usage."""
206
import csv
207
208
with open(filename, 'r') as file:
209
reader = csv.DictReader(file)
210
211
with chunked.ChunkedInsert(table, chunksize=1000) as inserter:
212
for row in reader:
213
# Transform data as needed
214
processed_row = {
215
'name': row['Name'].strip(),
216
'email': row['Email'].lower(),
217
'age': int(row['Age']) if row['Age'] else None,
218
'created_at': datetime.now()
219
}
220
inserter.insert(processed_row)
221
222
# Process million-record CSV with constant memory usage
223
process_large_csv('large_dataset.csv', db['users'])
224
```
225
226
### Data Synchronization Pattern
227
228
```python
229
def sync_external_data(external_data, table):
230
"""Sync data from external source with progress tracking."""
231
232
def progress_callback(queue):
233
# Log progress
234
print(f"Processing {len(queue)} records")
235
236
# Could also:
237
# - Update progress bar
238
# - Log to file
239
# - Send metrics to monitoring system
240
# - Validate data integrity
241
242
# Use upsert pattern with chunked operations
243
with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:
244
for external_record in external_data:
245
# Transform external format to internal format
246
internal_record = transform_record(external_record)
247
248
# Insert new or update existing based on external_id
249
table.upsert(internal_record, ['external_id'])
250
251
def transform_record(external_record):
252
"""Transform external record format to internal format."""
253
return {
254
'external_id': external_record['id'],
255
'name': external_record['full_name'],
256
'email': external_record['email_address'],
257
'last_sync': datetime.now()
258
}
259
```
260
261
### Error Handling and Recovery
262
263
```python
264
def robust_bulk_insert(data, table):
265
"""Bulk insert with error handling and recovery."""
266
267
failed_records = []
268
269
def error_tracking_callback(queue):
270
"""Track successful batches for recovery."""
271
try:
272
# This gets called before the actual insert
273
print(f"About to process batch of {len(queue)} records")
274
except Exception as e:
275
print(f"Callback error: {e}")
276
# Could log problematic records
277
failed_records.extend(queue)
278
279
try:
280
with chunked.ChunkedInsert(table,
281
chunksize=100, # Smaller chunks for easier recovery
282
callback=error_tracking_callback) as inserter:
283
for record in data:
284
try:
285
# Validate record before queuing
286
validate_record(record)
287
inserter.insert(record)
288
except ValidationError as e:
289
print(f"Skipping invalid record: {e}")
290
failed_records.append(record)
291
292
except Exception as e:
293
print(f"Bulk insert failed: {e}")
294
print(f"Failed records count: {len(failed_records)}")
295
296
# Could retry failed records individually
297
for record in failed_records:
298
try:
299
table.insert(record)
300
except Exception as record_error:
301
print(f"Individual insert failed: {record_error}")
302
303
def validate_record(record):
304
"""Validate record before insertion."""
305
required_fields = ['name', 'email']
306
for field in required_fields:
307
if field not in record or not record[field]:
308
raise ValidationError(f"Missing required field: {field}")
309
310
class ValidationError(Exception):
311
pass
312
```
313
314
### Performance Comparison
315
316
```python
317
import time
318
319
def performance_comparison(data, table):
320
"""Compare performance of different insertion methods."""
321
322
# Method 1: Individual inserts (slowest)
323
start = time.time()
324
for record in data[:1000]: # Small sample for timing
325
table.insert(record)
326
individual_time = time.time() - start
327
print(f"Individual inserts: {individual_time:.2f}s")
328
329
# Method 2: insert_many (faster)
330
start = time.time()
331
table.insert_many(data[:1000])
332
bulk_time = time.time() - start
333
print(f"Bulk insert_many: {bulk_time:.2f}s")
334
335
# Method 3: ChunkedInsert (memory efficient for large datasets)
336
start = time.time()
337
with chunked.ChunkedInsert(table, chunksize=100) as inserter:
338
for record in data[:1000]:
339
inserter.insert(record)
340
chunked_time = time.time() - start
341
print(f"Chunked insert: {chunked_time:.2f}s")
342
343
print(f"Speedup (individual vs bulk): {individual_time/bulk_time:.1f}x")
344
print(f"Speedup (individual vs chunked): {individual_time/chunked_time:.1f}x")
345
```