CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-arctic

AHL Research Versioned TimeSeries and Tick store for high-performance financial data storage and analysis

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

chunk-store.mddocs/

Chunk Store Operations

Configurable chunked storage for large datasets with custom serialization strategies and date-based chunking. Supports append/update operations, audit trails, and flexible data organization patterns optimized for handling massive datasets that don't fit in memory.

Capabilities

ChunkStore Class

Chunked storage system for large datasets with configurable chunking strategies and serialization options.

class ChunkStore:
    """
    Chunked storage for large datasets with configurable organization.
    
    Provides flexible data chunking strategies, custom serialization,
    and efficient append/update operations for datasets that exceed
    memory capacity or require specific organization patterns.
    """

Symbol Management

Operations for managing chunked data symbols including listing, existence checking, and renaming.

def list_symbols(self, partial_match=None):
    """
    List available symbols with optional pattern matching.
    
    Parameters:
    - partial_match: Partial string to match symbol names
    
    Returns:
    List of symbol names in the chunk store
    """

def has_symbol(self, symbol):
    """
    Check if symbol exists in chunk store.
    
    Parameters:
    - symbol: Symbol name to check
    
    Returns:
    bool: True if symbol exists
    """

def rename(self, from_symbol, to_symbol, audit=None):
    """
    Rename symbol in chunk store.
    
    Parameters:
    - from_symbol: Current symbol name
    - to_symbol: New symbol name
    - audit: Optional audit metadata for the operation
    
    Raises:
    - NoDataFoundException: If source symbol doesn't exist
    - ArcticException: If target symbol already exists
    """

def delete(self, symbol, chunk_range=None, audit=None):
    """
    Delete symbol or specific chunk range.
    
    Parameters:
    - symbol: Symbol name to delete
    - chunk_range: Specific chunk range to delete (default: all chunks)
    - audit: Optional audit metadata for the operation
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

Read Operations

Methods for retrieving chunked data with range filtering and chunk iteration.

def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
    """
    Read chunked data with optional filtering and range selection.
    
    Parameters:
    - symbol: Symbol name to read
    - chunk_range: Specific chunk range to read (default: all chunks)
    - filter_data: Apply data filtering during read (default: True)
    - **kwargs: Additional read parameters for chunker/serializer
    
    Returns:
    Reconstructed data object (type depends on serializer)
    
    Raises:
    - NoDataFoundException: If symbol or chunk range doesn't exist
    """

def read_metadata(self, symbol):
    """
    Read symbol metadata without loading chunk data.
    
    Parameters:
    - symbol: Symbol name
    
    Returns:
    dict: Symbol metadata including chunk information
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

def read_audit_log(self, symbol=None):
    """
    Read audit log for chunk operations.
    
    Parameters:
    - symbol: Filter by specific symbol (default: all symbols)
    
    Returns:
    List of audit log entries with operation details
    """

Write Operations

Methods for storing chunked data with configurable chunking strategies and metadata support.

def write(self, symbol, item, metadata=None, chunker=None, audit=None, **kwargs):
    """
    Write data using specified chunking strategy.
    
    Parameters:
    - symbol: Symbol name to write
    - item: Data to store (DataFrame, array, or custom object)
    - metadata: Optional metadata dictionary
    - chunker: Chunking strategy object (default: DateChunker)
    - audit: Optional audit metadata for the operation
    - **kwargs: Additional parameters for chunker/serializer
    
    Raises:
    - QuotaExceededException: If write would exceed storage quota
    - ArcticException: If chunking or serialization fails
    """

def append(self, symbol, item, upsert=False, metadata=None, audit=None, **kwargs):
    """
    Append data to existing symbol or create if doesn't exist.
    
    Parameters:
    - symbol: Symbol name
    - item: Data to append
    - upsert: Create symbol if doesn't exist (default: False)
    - metadata: Optional metadata dictionary
    - audit: Optional audit metadata for the operation
    - **kwargs: Additional parameters for append operation
    
    Returns:
    Operation result information
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist and upsert=False
    - OverlappingDataException: If appended data overlaps existing chunks
    """

def update(self, symbol, item, metadata=None, chunk_range=None, 
           upsert=False, audit=None, **kwargs):
    """
    Update data in specific chunk range.
    
    Parameters:
    - symbol: Symbol name
    - item: Data to update with
    - metadata: Optional metadata dictionary
    - chunk_range: Specific chunks to update (default: auto-detect)
    - upsert: Create symbol if doesn't exist (default: False)
    - audit: Optional audit metadata for the operation
    - **kwargs: Additional parameters for update operation
    
    Returns:
    Operation result information
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist and upsert=False
    """

def write_metadata(self, symbol, metadata):
    """
    Write metadata for symbol without changing data.
    
    Parameters:
    - symbol: Symbol name
    - metadata: Metadata dictionary to write
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

Chunk Management

Methods for managing and iterating over data chunks with range queries.

def get_chunk_ranges(self, symbol, chunk_range=None, reverse=False):
    """
    Get chunk ranges for symbol.
    
    Parameters:
    - symbol: Symbol name
    - chunk_range: Filter to specific range (default: all chunks)
    - reverse: Return ranges in reverse order (default: False)
    
    Returns:
    List of chunk range objects
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

def iterator(self, symbol, chunk_range=None, **kwargs):
    """
    Create iterator over symbol chunks.
    
    Parameters:
    - symbol: Symbol name
    - chunk_range: Iterate over specific range (default: all chunks)
    - **kwargs: Additional iterator parameters
    
    Returns:
    Generator yielding (chunk_range, data) tuples
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

def reverse_iterator(self, symbol, chunk_range=None, **kwargs):
    """
    Create reverse iterator over symbol chunks.
    
    Parameters:
    - symbol: Symbol name  
    - chunk_range: Iterate over specific range (default: all chunks)
    - **kwargs: Additional iterator parameters
    
    Returns:
    Generator yielding (chunk_range, data) tuples in reverse order
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

Information and Statistics

Methods for retrieving detailed information about symbols and storage statistics.

def get_info(self, symbol):
    """
    Get detailed information about symbol's chunks and storage.
    
    Parameters:
    - symbol: Symbol name
    
    Returns:
    dict: Comprehensive information including chunk counts, sizes, ranges
    
    Raises:
    - NoDataFoundException: If symbol doesn't exist
    """

def stats(self):
    """
    Get chunk store statistics and performance metrics.
    
    Returns:
    dict: Statistics including symbol counts, storage usage, chunk distribution
    """

Chunking Strategies

DateChunker

Default chunking strategy that organizes data by date ranges.

class DateChunker:
    """
    Date-based chunking strategy for time series data.
    
    Automatically partitions data based on date boundaries,
    enabling efficient date range queries and updates.
    """
    TYPE = 'date'

PassthroughChunker

Simple chunking strategy that stores data as single chunks.

class PassthroughChunker:
    """
    Pass-through chunking strategy with no automatic partitioning.
    
    Stores data as provided without automatic chunking,
    suitable for data that doesn't benefit from partitioning.
    """
    TYPE = 'passthrough'

Usage Examples

Basic Chunked Storage

from arctic import Arctic, CHUNK_STORE
from arctic.chunkstore.date_chunker import DateChunker
import pandas as pd
import numpy as np

# Setup chunk store
arctic_conn = Arctic('mongodb://localhost:27017')
arctic_conn.initialize_library('chunks', CHUNK_STORE)
chunk_lib = arctic_conn['chunks']

# Create large dataset
dates = pd.date_range('2020-01-01', periods=1000000, freq='min')
large_data = pd.DataFrame({
    'value1': np.random.randn(1000000),
    'value2': np.random.randn(1000000),
    'category': np.random.choice(['A', 'B', 'C'], 1000000)
}, index=dates)

# Write with date-based chunking
metadata = {'source': 'simulation', 'data_type': 'time_series'}
chunk_lib.write('large_dataset', large_data, 
                metadata=metadata,
                chunker=DateChunker())

Reading and Chunk Iteration

from arctic.date import DateRange
from datetime import datetime

# Read entire dataset (automatically reconstructed from chunks)
full_data = chunk_lib.read('large_dataset')
print(f"Full dataset shape: {full_data.shape}")

# Read specific date range
jan_range = DateRange(datetime(2020, 1, 1), datetime(2020, 2, 1))
jan_data = chunk_lib.read('large_dataset', chunk_range=jan_range)
print(f"January data shape: {jan_data.shape}")

# Iterate over chunks
for chunk_range, chunk_data in chunk_lib.iterator('large_dataset'):
    print(f"Chunk {chunk_range}: {chunk_data.shape}")
    # Process chunk individually to save memory
    
# Get chunk information
chunks = chunk_lib.get_chunk_ranges('large_dataset')
print(f"Total chunks: {len(chunks)}")
for chunk in chunks[:5]:  # First 5 chunks
    print(f"Chunk range: {chunk}")

Append and Update Operations

# Create additional data to append
new_dates = pd.date_range('2020-02-01', periods=100000, freq='min')
new_data = pd.DataFrame({
    'value1': np.random.randn(100000),
    'value2': np.random.randn(100000), 
    'category': np.random.choice(['A', 'B', 'C'], 100000)
}, index=new_dates)

# Append new data
chunk_lib.append('large_dataset', new_data, 
                 audit={'operation': 'monthly_update', 'user': 'system'})

# Update specific chunk range
update_range = DateRange(datetime(2020, 1, 15), datetime(2020, 1, 16))
update_data = chunk_lib.read('large_dataset', chunk_range=update_range)
update_data['value1'] *= 1.1  # Apply 10% adjustment

chunk_lib.update('large_dataset', update_data, 
                 chunk_range=update_range,
                 audit={'operation': 'correction', 'reason': 'data_adjustment'})

Symbol Management and Metadata

# List all symbols
symbols = chunk_lib.list_symbols()
print(f"Available symbols: {symbols}")

# Check if symbol exists
exists = chunk_lib.has_symbol('large_dataset')
print(f"Symbol exists: {exists}")

# Get detailed symbol information
info = chunk_lib.get_info('large_dataset')
print(f"Symbol info: {info}")

# Read metadata
metadata = chunk_lib.read_metadata('large_dataset')
print(f"Metadata: {metadata}")

# Update metadata
chunk_lib.write_metadata('large_dataset', {
    'source': 'simulation',
    'data_type': 'time_series',
    'last_updated': datetime.now().isoformat(),
    'version': '2.0'
})

# Rename symbol
chunk_lib.rename('large_dataset', 'historical_data',
                 audit={'operation': 'rename', 'reason': 'restructuring'})

Audit Trail and Statistics

# Read audit log
audit_entries = chunk_lib.read_audit_log('historical_data')
for entry in audit_entries[-10:]:  # Last 10 entries
    print(f"{entry['date']}: {entry['operation']}")

# Get store statistics
stats = chunk_lib.stats()
print(f"Store statistics: {stats}")

# Clean up
chunk_lib.delete('historical_data', 
                 audit={'operation': 'cleanup', 'reason': 'demo_complete'})

Advanced Chunking Strategies

from arctic.chunkstore.passthrough_chunker import PassthroughChunker

# Use passthrough chunker for non-time-series data
static_data = pd.DataFrame({
    'id': range(10000),
    'name': [f'item_{i}' for i in range(10000)],
    'value': np.random.randn(10000)
})

chunk_lib.write('static_reference', static_data,
                chunker=PassthroughChunker(),
                metadata={'type': 'reference_data'})

# Custom chunking parameters
chunk_lib.write('custom_chunks', large_data,
                chunker=DateChunker(),
                chunk_size='1D',  # Daily chunks
                metadata={'chunking': 'daily'})

Install with Tessl CLI

npx tessl i tessl/pypi-arctic

docs

arctic-connection.md

async-operations.md

bson-store.md

chunk-store.md

date-utilities.md

index.md

tick-store.md

version-store.md

tile.json