CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dataset

Toolkit for Python-based database access that makes reading and writing data in databases as simple as working with JSON files

Pending
Overview
Eval results
Files

chunked-operations.mddocs/

Chunked Operations

High-performance batch processing for large datasets using context managers to automatically handle chunked inserts and updates with configurable batch sizes and callback support. These classes optimize memory usage and database performance for bulk operations.

Capabilities

Chunked Insert Operations

Batch insert operations with automatic chunking and memory management.

# Import pattern
from dataset import chunked

class ChunkedInsert:
    def __init__(self, table, chunksize=1000, callback=None):
        """
        Initialize chunked insert context manager.
        
        Parameters:
        - table: Table instance to insert into
        - chunksize: int, number of rows per batch (default 1000)
        - callback: callable, function called before each batch insert
                   receives the queue (list of rows) as parameter
        """
        
    def insert(self, item):
        """
        Add an item to the insert queue.
        
        Parameters:
        - item: dict, row data to insert
        """
        
    def flush(self):
        """Force processing of queued items."""
        
    def __enter__(self):
        """Enter context manager."""
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context manager and flush remaining items."""

Chunked Update Operations

Batch update operations with automatic chunking and grouping by field sets.

class ChunkedUpdate:
    def __init__(self, table, keys, chunksize=1000, callback=None):
        """
        Initialize chunked update context manager.
        
        Parameters:
        - table: Table instance to update
        - keys: list, column names to use as update filters
        - chunksize: int, number of rows per batch (default 1000)
        - callback: callable, function called before each batch update
                   receives the queue (list of rows) as parameter
        """
        
    def update(self, item):
        """
        Add an item to the update queue.
        
        Parameters:
        - item: dict, row data to update (must include key columns)
        """
        
    def flush(self):
        """Force processing of queued items."""
        
    def __enter__(self):
        """Enter context manager."""
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context manager and flush remaining items."""

Exception Handling

Exception types for chunked operations error handling.

class InvalidCallback(ValueError):
    """Raised when an invalid callback is provided to chunked operations."""

Usage Examples

Basic Chunked Insert

import dataset
from dataset import chunked

db = dataset.connect('sqlite:///example.db')
table = db['products']

# Basic chunked insert
with chunked.ChunkedInsert(table) as inserter:
    for i in range(10000):
        inserter.insert({
            'name': f'Product {i}',
            'price': i * 0.99,
            'category': f'Category {i % 10}'
        })
# Automatically flushes remaining items on context exit

Chunked Insert with Custom Chunk Size

# Custom chunk size for memory optimization
with chunked.ChunkedInsert(table, chunksize=500) as inserter:
    for record in large_dataset:
        inserter.insert({
            'name': record.name,
            'value': record.value,
            'timestamp': record.created_at
        })

Chunked Insert with Callback

def progress_callback(queue):
    """Called before each batch insert."""
    print(f"Inserting batch of {len(queue)} records")
    # Could also log, validate, or transform data here

def validation_callback(queue):
    """Validate data before insertion."""
    for item in queue:
        if 'required_field' not in item:
            raise ValueError(f"Missing required field in {item}")

with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:
    for data in data_source:
        inserter.insert(data)

# Multiple callbacks via wrapper
def combined_callback(queue):
    validation_callback(queue)
    progress_callback(queue)

with chunked.ChunkedInsert(table, callback=combined_callback) as inserter:
    for data in data_source:
        inserter.insert(data)

Basic Chunked Update

# Update records based on ID
with chunked.ChunkedUpdate(table, keys=['id']) as updater:
    for record in updated_records:
        updater.update({
            'id': record.id,
            'name': record.new_name,
            'price': record.new_price,
            'updated_at': datetime.now()
        })

Chunked Update with Multiple Keys

# Update using composite key (category + name)
with chunked.ChunkedUpdate(table, keys=['category', 'name']) as updater:
    for update_data in updates:
        updater.update({
            'category': update_data.category,
            'name': update_data.name,
            'price': update_data.new_price,
            'stock': update_data.new_stock
        })

Chunked Update with Callback

def update_callback(queue):
    """Called before each batch update."""
    print(f"Updating batch of {len(queue)} records")
    
    # Group by operation type for logging
    operations = {}
    for item in queue:
        op_type = item.get('operation_type', 'unknown')
        operations[op_type] = operations.get(op_type, 0) + 1
    
    for op_type, count in operations.items():
        print(f"  {op_type}: {count} records")

with chunked.ChunkedUpdate(table, keys=['id'], callback=update_callback) as updater:
    for update in batch_updates:
        updater.update(update)

Memory-Efficient Large Dataset Processing

def process_large_csv(filename, table):
    """Process a large CSV file with minimal memory usage."""
    import csv
    
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)
        
        with chunked.ChunkedInsert(table, chunksize=1000) as inserter:
            for row in reader:
                # Transform data as needed
                processed_row = {
                    'name': row['Name'].strip(),
                    'email': row['Email'].lower(),
                    'age': int(row['Age']) if row['Age'] else None,
                    'created_at': datetime.now()
                }
                inserter.insert(processed_row)

# Process million-record CSV with constant memory usage
process_large_csv('large_dataset.csv', db['users'])

Data Synchronization Pattern

def sync_external_data(external_data, table):
    """Sync data from external source with progress tracking."""
    
    def progress_callback(queue):
        # Log progress
        print(f"Processing {len(queue)} records")
        
        # Could also:
        # - Update progress bar
        # - Log to file
        # - Send metrics to monitoring system
        # - Validate data integrity
    
    # Use upsert pattern with chunked operations
    with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:
        for external_record in external_data:
            # Transform external format to internal format
            internal_record = transform_record(external_record)
            
            # Insert new or update existing based on external_id
            table.upsert(internal_record, ['external_id'])

def transform_record(external_record):
    """Transform external record format to internal format."""
    return {
        'external_id': external_record['id'],
        'name': external_record['full_name'],
        'email': external_record['email_address'],
        'last_sync': datetime.now()
    }

Error Handling and Recovery

def robust_bulk_insert(data, table):
    """Bulk insert with error handling and recovery."""
    
    failed_records = []
    
    def error_tracking_callback(queue):
        """Track successful batches for recovery."""
        try:
            # This gets called before the actual insert
            print(f"About to process batch of {len(queue)} records")
        except Exception as e:
            print(f"Callback error: {e}")
            # Could log problematic records
            failed_records.extend(queue)
    
    try:
        with chunked.ChunkedInsert(table, 
                                 chunksize=100,  # Smaller chunks for easier recovery
                                 callback=error_tracking_callback) as inserter:
            for record in data:
                try:
                    # Validate record before queuing
                    validate_record(record)
                    inserter.insert(record)
                except ValidationError as e:
                    print(f"Skipping invalid record: {e}")
                    failed_records.append(record)
                    
    except Exception as e:
        print(f"Bulk insert failed: {e}")
        print(f"Failed records count: {len(failed_records)}")
        
        # Could retry failed records individually
        for record in failed_records:
            try:
                table.insert(record)
            except Exception as record_error:
                print(f"Individual insert failed: {record_error}")

def validate_record(record):
    """Validate record before insertion."""
    required_fields = ['name', 'email']
    for field in required_fields:
        if field not in record or not record[field]:
            raise ValidationError(f"Missing required field: {field}")

class ValidationError(Exception):
    pass

Performance Comparison

import time

def performance_comparison(data, table):
    """Compare performance of different insertion methods."""
    
    # Method 1: Individual inserts (slowest)
    start = time.time()
    for record in data[:1000]:  # Small sample for timing
        table.insert(record)
    individual_time = time.time() - start
    print(f"Individual inserts: {individual_time:.2f}s")
    
    # Method 2: insert_many (faster)
    start = time.time()
    table.insert_many(data[:1000])
    bulk_time = time.time() - start
    print(f"Bulk insert_many: {bulk_time:.2f}s")
    
    # Method 3: ChunkedInsert (memory efficient for large datasets)
    start = time.time()
    with chunked.ChunkedInsert(table, chunksize=100) as inserter:
        for record in data[:1000]:
            inserter.insert(record)
    chunked_time = time.time() - start
    print(f"Chunked insert: {chunked_time:.2f}s")
    
    print(f"Speedup (individual vs bulk): {individual_time/bulk_time:.1f}x")
    print(f"Speedup (individual vs chunked): {individual_time/chunked_time:.1f}x")

Install with Tessl CLI

npx tessl i tessl/pypi-dataset

docs

chunked-operations.md

database-operations.md

index.md

table-operations.md

tile.json