AHL Research Versioned TimeSeries and Tick store for high-performance financial data storage and analysis
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Configurable chunked storage for large datasets with custom serialization strategies and date-based chunking. Supports append/update operations, audit trails, and flexible data organization patterns optimized for handling massive datasets that don't fit in memory.
Chunked storage system for large datasets with configurable chunking strategies and serialization options.
class ChunkStore:
"""
Chunked storage for large datasets with configurable organization.
Provides flexible data chunking strategies, custom serialization,
and efficient append/update operations for datasets that exceed
memory capacity or require specific organization patterns.
"""Operations for managing chunked data symbols including listing, existence checking, and renaming.
def list_symbols(self, partial_match=None):
"""
List available symbols with optional pattern matching.
Parameters:
- partial_match: Partial string to match symbol names
Returns:
List of symbol names in the chunk store
"""
def has_symbol(self, symbol):
"""
Check if symbol exists in chunk store.
Parameters:
- symbol: Symbol name to check
Returns:
bool: True if symbol exists
"""
def rename(self, from_symbol, to_symbol, audit=None):
"""
Rename symbol in chunk store.
Parameters:
- from_symbol: Current symbol name
- to_symbol: New symbol name
- audit: Optional audit metadata for the operation
Raises:
- NoDataFoundException: If source symbol doesn't exist
- ArcticException: If target symbol already exists
"""
def delete(self, symbol, chunk_range=None, audit=None):
"""
Delete symbol or specific chunk range.
Parameters:
- symbol: Symbol name to delete
- chunk_range: Specific chunk range to delete (default: all chunks)
- audit: Optional audit metadata for the operation
Raises:
- NoDataFoundException: If symbol doesn't exist
"""Methods for retrieving chunked data with range filtering and chunk iteration.
def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
"""
Read chunked data with optional filtering and range selection.
Parameters:
- symbol: Symbol name to read
- chunk_range: Specific chunk range to read (default: all chunks)
- filter_data: Apply data filtering during read (default: True)
- **kwargs: Additional read parameters for chunker/serializer
Returns:
Reconstructed data object (type depends on serializer)
Raises:
- NoDataFoundException: If symbol or chunk range doesn't exist
"""
def read_metadata(self, symbol):
"""
Read symbol metadata without loading chunk data.
Parameters:
- symbol: Symbol name
Returns:
dict: Symbol metadata including chunk information
Raises:
- NoDataFoundException: If symbol doesn't exist
"""
def read_audit_log(self, symbol=None):
"""
Read audit log for chunk operations.
Parameters:
- symbol: Filter by specific symbol (default: all symbols)
Returns:
List of audit log entries with operation details
"""Methods for storing chunked data with configurable chunking strategies and metadata support.
def write(self, symbol, item, metadata=None, chunker=None, audit=None, **kwargs):
"""
Write data using specified chunking strategy.
Parameters:
- symbol: Symbol name to write
- item: Data to store (DataFrame, array, or custom object)
- metadata: Optional metadata dictionary
- chunker: Chunking strategy object (default: DateChunker)
- audit: Optional audit metadata for the operation
- **kwargs: Additional parameters for chunker/serializer
Raises:
- QuotaExceededException: If write would exceed storage quota
- ArcticException: If chunking or serialization fails
"""
def append(self, symbol, item, upsert=False, metadata=None, audit=None, **kwargs):
"""
Append data to existing symbol or create if doesn't exist.
Parameters:
- symbol: Symbol name
- item: Data to append
- upsert: Create symbol if doesn't exist (default: False)
- metadata: Optional metadata dictionary
- audit: Optional audit metadata for the operation
- **kwargs: Additional parameters for append operation
Returns:
Operation result information
Raises:
- NoDataFoundException: If symbol doesn't exist and upsert=False
- OverlappingDataException: If appended data overlaps existing chunks
"""
def update(self, symbol, item, metadata=None, chunk_range=None,
upsert=False, audit=None, **kwargs):
"""
Update data in specific chunk range.
Parameters:
- symbol: Symbol name
- item: Data to update with
- metadata: Optional metadata dictionary
- chunk_range: Specific chunks to update (default: auto-detect)
- upsert: Create symbol if doesn't exist (default: False)
- audit: Optional audit metadata for the operation
- **kwargs: Additional parameters for update operation
Returns:
Operation result information
Raises:
- NoDataFoundException: If symbol doesn't exist and upsert=False
"""
def write_metadata(self, symbol, metadata):
"""
Write metadata for symbol without changing data.
Parameters:
- symbol: Symbol name
- metadata: Metadata dictionary to write
Raises:
- NoDataFoundException: If symbol doesn't exist
"""Methods for managing and iterating over data chunks with range queries.
def get_chunk_ranges(self, symbol, chunk_range=None, reverse=False):
"""
Get chunk ranges for symbol.
Parameters:
- symbol: Symbol name
- chunk_range: Filter to specific range (default: all chunks)
- reverse: Return ranges in reverse order (default: False)
Returns:
List of chunk range objects
Raises:
- NoDataFoundException: If symbol doesn't exist
"""
def iterator(self, symbol, chunk_range=None, **kwargs):
"""
Create iterator over symbol chunks.
Parameters:
- symbol: Symbol name
- chunk_range: Iterate over specific range (default: all chunks)
- **kwargs: Additional iterator parameters
Returns:
Generator yielding (chunk_range, data) tuples
Raises:
- NoDataFoundException: If symbol doesn't exist
"""
def reverse_iterator(self, symbol, chunk_range=None, **kwargs):
"""
Create reverse iterator over symbol chunks.
Parameters:
- symbol: Symbol name
- chunk_range: Iterate over specific range (default: all chunks)
- **kwargs: Additional iterator parameters
Returns:
Generator yielding (chunk_range, data) tuples in reverse order
Raises:
- NoDataFoundException: If symbol doesn't exist
"""Methods for retrieving detailed information about symbols and storage statistics.
def get_info(self, symbol):
"""
Get detailed information about symbol's chunks and storage.
Parameters:
- symbol: Symbol name
Returns:
dict: Comprehensive information including chunk counts, sizes, ranges
Raises:
- NoDataFoundException: If symbol doesn't exist
"""
def stats(self):
"""
Get chunk store statistics and performance metrics.
Returns:
dict: Statistics including symbol counts, storage usage, chunk distribution
"""Default chunking strategy that organizes data by date ranges.
class DateChunker:
"""
Date-based chunking strategy for time series data.
Automatically partitions data based on date boundaries,
enabling efficient date range queries and updates.
"""
TYPE = 'date'Simple chunking strategy that stores data as single chunks.
class PassthroughChunker:
"""
Pass-through chunking strategy with no automatic partitioning.
Stores data as provided without automatic chunking,
suitable for data that doesn't benefit from partitioning.
"""
TYPE = 'passthrough'from arctic import Arctic, CHUNK_STORE
from arctic.chunkstore.date_chunker import DateChunker
import pandas as pd
import numpy as np
# Setup chunk store
arctic_conn = Arctic('mongodb://localhost:27017')
arctic_conn.initialize_library('chunks', CHUNK_STORE)
chunk_lib = arctic_conn['chunks']
# Create large dataset
dates = pd.date_range('2020-01-01', periods=1000000, freq='min')
large_data = pd.DataFrame({
'value1': np.random.randn(1000000),
'value2': np.random.randn(1000000),
'category': np.random.choice(['A', 'B', 'C'], 1000000)
}, index=dates)
# Write with date-based chunking
metadata = {'source': 'simulation', 'data_type': 'time_series'}
chunk_lib.write('large_dataset', large_data,
metadata=metadata,
chunker=DateChunker())from arctic.date import DateRange
from datetime import datetime
# Read entire dataset (automatically reconstructed from chunks)
full_data = chunk_lib.read('large_dataset')
print(f"Full dataset shape: {full_data.shape}")
# Read specific date range
jan_range = DateRange(datetime(2020, 1, 1), datetime(2020, 2, 1))
jan_data = chunk_lib.read('large_dataset', chunk_range=jan_range)
print(f"January data shape: {jan_data.shape}")
# Iterate over chunks
for chunk_range, chunk_data in chunk_lib.iterator('large_dataset'):
print(f"Chunk {chunk_range}: {chunk_data.shape}")
# Process chunk individually to save memory
# Get chunk information
chunks = chunk_lib.get_chunk_ranges('large_dataset')
print(f"Total chunks: {len(chunks)}")
for chunk in chunks[:5]: # First 5 chunks
print(f"Chunk range: {chunk}")# Create additional data to append
new_dates = pd.date_range('2020-02-01', periods=100000, freq='min')
new_data = pd.DataFrame({
'value1': np.random.randn(100000),
'value2': np.random.randn(100000),
'category': np.random.choice(['A', 'B', 'C'], 100000)
}, index=new_dates)
# Append new data
chunk_lib.append('large_dataset', new_data,
audit={'operation': 'monthly_update', 'user': 'system'})
# Update specific chunk range
update_range = DateRange(datetime(2020, 1, 15), datetime(2020, 1, 16))
update_data = chunk_lib.read('large_dataset', chunk_range=update_range)
update_data['value1'] *= 1.1 # Apply 10% adjustment
chunk_lib.update('large_dataset', update_data,
chunk_range=update_range,
audit={'operation': 'correction', 'reason': 'data_adjustment'})# List all symbols
symbols = chunk_lib.list_symbols()
print(f"Available symbols: {symbols}")
# Check if symbol exists
exists = chunk_lib.has_symbol('large_dataset')
print(f"Symbol exists: {exists}")
# Get detailed symbol information
info = chunk_lib.get_info('large_dataset')
print(f"Symbol info: {info}")
# Read metadata
metadata = chunk_lib.read_metadata('large_dataset')
print(f"Metadata: {metadata}")
# Update metadata
chunk_lib.write_metadata('large_dataset', {
'source': 'simulation',
'data_type': 'time_series',
'last_updated': datetime.now().isoformat(),
'version': '2.0'
})
# Rename symbol
chunk_lib.rename('large_dataset', 'historical_data',
audit={'operation': 'rename', 'reason': 'restructuring'})# Read audit log
audit_entries = chunk_lib.read_audit_log('historical_data')
for entry in audit_entries[-10:]: # Last 10 entries
print(f"{entry['date']}: {entry['operation']}")
# Get store statistics
stats = chunk_lib.stats()
print(f"Store statistics: {stats}")
# Clean up
chunk_lib.delete('historical_data',
audit={'operation': 'cleanup', 'reason': 'demo_complete'})from arctic.chunkstore.passthrough_chunker import PassthroughChunker
# Use passthrough chunker for non-time-series data
static_data = pd.DataFrame({
'id': range(10000),
'name': [f'item_{i}' for i in range(10000)],
'value': np.random.randn(10000)
})
chunk_lib.write('static_reference', static_data,
chunker=PassthroughChunker(),
metadata={'type': 'reference_data'})
# Custom chunking parameters
chunk_lib.write('custom_chunks', large_data,
chunker=DateChunker(),
chunk_size='1D', # Daily chunks
metadata={'chunking': 'daily'})Install with Tessl CLI
npx tessl i tessl/pypi-arctic