Toolkit for Python-based database access that makes reading and writing data in databases as simple as working with JSON files
—
High-performance batch processing for large datasets using context managers to automatically handle chunked inserts and updates with configurable batch sizes and callback support. These classes optimize memory usage and database performance for bulk operations.
Batch insert operations with automatic chunking and memory management.
# Import pattern
from dataset import chunked
class ChunkedInsert:
def __init__(self, table, chunksize=1000, callback=None):
"""
Initialize chunked insert context manager.
Parameters:
- table: Table instance to insert into
- chunksize: int, number of rows per batch (default 1000)
- callback: callable, function called before each batch insert
receives the queue (list of rows) as parameter
"""
def insert(self, item):
"""
Add an item to the insert queue.
Parameters:
- item: dict, row data to insert
"""
def flush(self):
"""Force processing of queued items."""
def __enter__(self):
"""Enter context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and flush remaining items."""Batch update operations with automatic chunking and grouping by field sets.
class ChunkedUpdate:
def __init__(self, table, keys, chunksize=1000, callback=None):
"""
Initialize chunked update context manager.
Parameters:
- table: Table instance to update
- keys: list, column names to use as update filters
- chunksize: int, number of rows per batch (default 1000)
- callback: callable, function called before each batch update
receives the queue (list of rows) as parameter
"""
def update(self, item):
"""
Add an item to the update queue.
Parameters:
- item: dict, row data to update (must include key columns)
"""
def flush(self):
"""Force processing of queued items."""
def __enter__(self):
"""Enter context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and flush remaining items."""Exception types for chunked operations error handling.
class InvalidCallback(ValueError):
"""Raised when an invalid callback is provided to chunked operations."""import dataset
from dataset import chunked
db = dataset.connect('sqlite:///example.db')
table = db['products']
# Basic chunked insert
with chunked.ChunkedInsert(table) as inserter:
for i in range(10000):
inserter.insert({
'name': f'Product {i}',
'price': i * 0.99,
'category': f'Category {i % 10}'
})
# Automatically flushes remaining items on context exit# Custom chunk size for memory optimization
with chunked.ChunkedInsert(table, chunksize=500) as inserter:
for record in large_dataset:
inserter.insert({
'name': record.name,
'value': record.value,
'timestamp': record.created_at
})def progress_callback(queue):
"""Called before each batch insert."""
print(f"Inserting batch of {len(queue)} records")
# Could also log, validate, or transform data here
def validation_callback(queue):
"""Validate data before insertion."""
for item in queue:
if 'required_field' not in item:
raise ValueError(f"Missing required field in {item}")
with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:
for data in data_source:
inserter.insert(data)
# Multiple callbacks via wrapper
def combined_callback(queue):
validation_callback(queue)
progress_callback(queue)
with chunked.ChunkedInsert(table, callback=combined_callback) as inserter:
for data in data_source:
inserter.insert(data)# Update records based on ID
with chunked.ChunkedUpdate(table, keys=['id']) as updater:
for record in updated_records:
updater.update({
'id': record.id,
'name': record.new_name,
'price': record.new_price,
'updated_at': datetime.now()
})# Update using composite key (category + name)
with chunked.ChunkedUpdate(table, keys=['category', 'name']) as updater:
for update_data in updates:
updater.update({
'category': update_data.category,
'name': update_data.name,
'price': update_data.new_price,
'stock': update_data.new_stock
})def update_callback(queue):
"""Called before each batch update."""
print(f"Updating batch of {len(queue)} records")
# Group by operation type for logging
operations = {}
for item in queue:
op_type = item.get('operation_type', 'unknown')
operations[op_type] = operations.get(op_type, 0) + 1
for op_type, count in operations.items():
print(f" {op_type}: {count} records")
with chunked.ChunkedUpdate(table, keys=['id'], callback=update_callback) as updater:
for update in batch_updates:
updater.update(update)def process_large_csv(filename, table):
"""Process a large CSV file with minimal memory usage."""
import csv
with open(filename, 'r') as file:
reader = csv.DictReader(file)
with chunked.ChunkedInsert(table, chunksize=1000) as inserter:
for row in reader:
# Transform data as needed
processed_row = {
'name': row['Name'].strip(),
'email': row['Email'].lower(),
'age': int(row['Age']) if row['Age'] else None,
'created_at': datetime.now()
}
inserter.insert(processed_row)
# Process million-record CSV with constant memory usage
process_large_csv('large_dataset.csv', db['users'])def sync_external_data(external_data, table):
"""Sync data from external source with progress tracking."""
def progress_callback(queue):
# Log progress
print(f"Processing {len(queue)} records")
# Could also:
# - Update progress bar
# - Log to file
# - Send metrics to monitoring system
# - Validate data integrity
# Use upsert pattern with chunked operations
with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:
for external_record in external_data:
# Transform external format to internal format
internal_record = transform_record(external_record)
# Insert new or update existing based on external_id
table.upsert(internal_record, ['external_id'])
def transform_record(external_record):
"""Transform external record format to internal format."""
return {
'external_id': external_record['id'],
'name': external_record['full_name'],
'email': external_record['email_address'],
'last_sync': datetime.now()
}def robust_bulk_insert(data, table):
"""Bulk insert with error handling and recovery."""
failed_records = []
def error_tracking_callback(queue):
"""Track successful batches for recovery."""
try:
# This gets called before the actual insert
print(f"About to process batch of {len(queue)} records")
except Exception as e:
print(f"Callback error: {e}")
# Could log problematic records
failed_records.extend(queue)
try:
with chunked.ChunkedInsert(table,
chunksize=100, # Smaller chunks for easier recovery
callback=error_tracking_callback) as inserter:
for record in data:
try:
# Validate record before queuing
validate_record(record)
inserter.insert(record)
except ValidationError as e:
print(f"Skipping invalid record: {e}")
failed_records.append(record)
except Exception as e:
print(f"Bulk insert failed: {e}")
print(f"Failed records count: {len(failed_records)}")
# Could retry failed records individually
for record in failed_records:
try:
table.insert(record)
except Exception as record_error:
print(f"Individual insert failed: {record_error}")
def validate_record(record):
"""Validate record before insertion."""
required_fields = ['name', 'email']
for field in required_fields:
if field not in record or not record[field]:
raise ValidationError(f"Missing required field: {field}")
class ValidationError(Exception):
passimport time
def performance_comparison(data, table):
"""Compare performance of different insertion methods."""
# Method 1: Individual inserts (slowest)
start = time.time()
for record in data[:1000]: # Small sample for timing
table.insert(record)
individual_time = time.time() - start
print(f"Individual inserts: {individual_time:.2f}s")
# Method 2: insert_many (faster)
start = time.time()
table.insert_many(data[:1000])
bulk_time = time.time() - start
print(f"Bulk insert_many: {bulk_time:.2f}s")
# Method 3: ChunkedInsert (memory efficient for large datasets)
start = time.time()
with chunked.ChunkedInsert(table, chunksize=100) as inserter:
for record in data[:1000]:
inserter.insert(record)
chunked_time = time.time() - start
print(f"Chunked insert: {chunked_time:.2f}s")
print(f"Speedup (individual vs bulk): {individual_time/bulk_time:.1f}x")
print(f"Speedup (individual vs chunked): {individual_time/chunked_time:.1f}x")Install with Tessl CLI
npx tessl i tessl/pypi-dataset