CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-arctic

AHL Research Versioned TimeSeries and Tick store for high-performance financial data storage and analysis

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Asynchronous execution framework for Arctic operations enabling concurrent data processing and improved performance for batch operations. Provides thread pool management, request tracking, and concurrent execution capabilities for high-throughput data operations.

Capabilities

Asynchronous Request Submission

Functions for submitting Arctic operations for asynchronous execution with configurable thread pools.

def async_arctic_submit(store, fun, is_modifier, *args, **kwargs):
    """
    Submit Arctic operation for asynchronous execution.
    
    Parameters:
    - store: Arctic store instance (VersionStore, TickStore, etc.)
    - fun: Function/method to execute asynchronously
    - is_modifier: Whether the operation modifies data (affects request tracking)
    - *args: Positional arguments for the function
    - **kwargs: Keyword arguments for the function
    
    Returns:
    AsyncRequest: Request object for tracking execution status
    
    Example:
    request = async_arctic_submit(version_store, 'read', False, 'AAPL')
    """

Request Management

Functions for waiting on and managing asynchronous request completion.

def async_wait_request(request, timeout=None):
    """
    Wait for single asynchronous request to complete.
    
    Parameters:
    - request: AsyncRequest object from async_arctic_submit
    - timeout: Maximum time to wait in seconds (None = no timeout)
    
    Returns:
    Result of the asynchronous operation
    
    Raises:
    - RequestDurationException: If operation times out
    - Original exception: If the async operation failed
    """

def async_wait_requests(requests, timeout=None):
    """
    Wait for multiple asynchronous requests to complete.
    
    Parameters:
    - requests: List of AsyncRequest objects
    - timeout: Maximum time to wait for all requests in seconds
    
    Returns:
    List of results in the same order as input requests
    
    Raises:
    - RequestDurationException: If any operation times out
    - Original exceptions: If any async operations failed
    """

Thread Pool Management

Functions for managing the asynchronous execution thread pool and system resources.

def async_shutdown(timeout=None):
    """
    Shutdown the asynchronous thread pool gracefully.
    
    Parameters:
    - timeout: Maximum time to wait for shutdown in seconds
    
    Stops accepting new requests and waits for current operations
    to complete before shutting down the thread pool.
    """

def async_await_termination(timeout=None):
    """
    Wait for all asynchronous operations to complete and terminate.
    
    Parameters:
    - timeout: Maximum time to wait for termination in seconds
    
    Blocks until all submitted requests have completed execution
    and the thread pool has been fully terminated.
    """

def async_reset_pool(pool_size=None, timeout=None):
    """
    Reset the asynchronous thread pool with new configuration.
    
    Parameters:
    - pool_size: New thread pool size (None = use default)
    - timeout: Timeout for shutting down existing pool
    
    Shuts down the current thread pool and creates a new one
    with the specified size for handling async operations.
    """

Statistics and Monitoring

Functions for monitoring asynchronous operation performance and usage.

def async_total_requests():
    """
    Get total number of asynchronous requests processed.
    
    Returns:
    int: Total count of requests submitted since startup
    
    Useful for monitoring throughput and system usage patterns.
    """

Async Arctic Instance

Singleton instance for managing global asynchronous operations.

ASYNC_ARCTIC = AsyncArctic()
"""
Global singleton instance for asynchronous Arctic operations.

Provides centralized management of the thread pool and request
tracking for all async operations across Arctic stores.
"""

Exception Types

Asynchronous Operation Exceptions

Exception types specific to asynchronous operations and request handling.

class AsyncArcticException(ArcticException):
    """
    Base exception for asynchronous Arctic operations.
    
    Raised when async-specific errors occur during request
    submission, execution, or result retrieval.
    """

class RequestDurationException(AsyncArcticException):
    """
    Exception raised when asynchronous requests exceed timeout limits.
    
    Raised by async_wait_request and async_wait_requests when
    operations take longer than the specified timeout period.
    """

Usage Examples

Basic Asynchronous Operations

from arctic import Arctic, VERSION_STORE
from arctic.asynchronous import (
    async_arctic_submit, async_wait_request, async_wait_requests
)
import time

# Setup
arctic_conn = Arctic('mongodb://localhost:27017')
lib = arctic_conn['market_data']

# Submit multiple read requests asynchronously
symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
requests = []

start_time = time.time()

# Submit all requests for concurrent execution
for symbol in symbols:
    request = async_arctic_submit(lib, 'read', symbol)
    requests.append(request)

print(f"Submitted {len(requests)} requests in {time.time() - start_time:.3f}s")

# Wait for all requests to complete
results = async_wait_requests(requests, timeout=60)
print(f"Completed all requests in {time.time() - start_time:.3f}s")

# Process results
for symbol, result in zip(symbols, results):
    print(f"{symbol}: {result.data.shape[0]} data points")

Individual Request Handling

# Submit single request
request = async_arctic_submit(lib, 'read', 'AAPL')

# Do other work while request executes
print("Request submitted, doing other work...")
time.sleep(1)

# Wait for specific request
try:
    result = async_wait_request(request, timeout=30)
    print(f"AAPL data: {result.data.shape}")
except RequestDurationException:
    print("Request timed out")
except Exception as e:
    print(f"Request failed: {e}")

Batch Write Operations

import pandas as pd
import numpy as np

# Generate sample data for multiple symbols
symbols_data = {}
for symbol in ['AAPL', 'GOOGL', 'MSFT']:
    dates = pd.date_range('2020-01-01', periods=1000, freq='min')
    data = pd.DataFrame({
        'price': np.random.randn(1000).cumsum() + 100,
        'volume': np.random.randint(100, 1000, 1000)
    }, index=dates)
    symbols_data[symbol] = data

# Submit write operations asynchronously
write_requests = []
for symbol, data in symbols_data.items():
    request = async_arctic_submit(lib, 'write', symbol, data)
    write_requests.append((symbol, request))

# Wait for all writes to complete
for symbol, request in write_requests:
    try:
        result = async_wait_request(request, timeout=60)
        print(f"Successfully wrote {symbol}")
    except Exception as e:
        print(f"Failed to write {symbol}: {e}")

Mixed Read and Write Operations

from arctic.date import DateRange
from datetime import datetime, timedelta

# Submit mixed operations
operations = []

# Read operations
for symbol in ['AAPL', 'GOOGL']:
    date_range = DateRange(
        datetime(2020, 1, 1),
        datetime(2020, 1, 31)
    )
    request = async_arctic_submit(lib, 'read', symbol, date_range=date_range)
    operations.append(('read', symbol, request))

# Metadata operations
for symbol in ['MSFT', 'AMZN']:
    request = async_arctic_submit(lib, 'read_metadata', symbol)
    operations.append(('metadata', symbol, request))

# List operations
list_request = async_arctic_submit(lib, 'list_symbols')
operations.append(('list', 'all', list_request))

# Process all operations
results = []
for op_type, symbol, request in operations:
    try:
        result = async_wait_request(request, timeout=30)
        results.append((op_type, symbol, result))
        print(f"Completed {op_type} for {symbol}")
    except Exception as e:
        print(f"Failed {op_type} for {symbol}: {e}")

Thread Pool Management

from arctic.asynchronous import (
    async_reset_pool, async_shutdown, async_total_requests,
    async_await_termination
)

# Check current usage
total_requests = async_total_requests()
print(f"Total requests processed: {total_requests}")

# Reset thread pool with custom size
async_reset_pool(pool_size=8, timeout=10)
print("Thread pool reset with 8 threads")

# Submit high-volume operations
batch_requests = []
for i in range(50):
    symbol = f'SYM{i:03d}'
    request = async_arctic_submit(lib, 'has_symbol', symbol)
    batch_requests.append(request)

# Wait for batch completion
batch_results = async_wait_requests(batch_requests, timeout=120)
print(f"Processed {len(batch_results)} operations")

# Graceful shutdown
print("Shutting down async operations...")
async_shutdown(timeout=30)
async_await_termination(timeout=60)
print("Async system shutdown complete")

Error Handling and Timeouts

# Handle timeouts and errors gracefully
def safe_async_operation(store, operation, *args, **kwargs):
    """Safely execute async operation with error handling."""
    try:
        request = async_arctic_submit(store, operation, *args, **kwargs)
        result = async_wait_request(request, timeout=30)
        return result, None
    except RequestDurationException:
        return None, "Operation timed out"
    except Exception as e:
        return None, f"Operation failed: {str(e)}"

# Use safe wrapper for critical operations
symbols = ['AAPL', 'INVALID_SYMBOL', 'GOOGL']
for symbol in symbols:
    result, error = safe_async_operation(lib, 'read', symbol)
    if error:
        print(f"{symbol}: {error}")
    else:
        print(f"{symbol}: Success - {result.data.shape[0]} rows")

# Batch operations with error handling
success_count = 0
error_count = 0

requests = []
for symbol in symbols:
    request = async_arctic_submit(lib, 'read', symbol)
    requests.append((symbol, request))

for symbol, request in requests:
    try:
        result = async_wait_request(request, timeout=10)
        success_count += 1
        print(f"{symbol}: Success")
    except RequestDurationException:
        error_count += 1
        print(f"{symbol}: Timeout")
    except Exception as e:
        error_count += 1
        print(f"{symbol}: Error - {e}")

print(f"Results: {success_count} success, {error_count} errors")

Performance Optimization

# Optimize for different workload patterns

# High-throughput metadata checks
metadata_requests = []
symbols_to_check = [f'SYMBOL_{i:04d}' for i in range(100)]

start_time = time.time()
for symbol in symbols_to_check:
    request = async_arctic_submit(lib, 'has_symbol', symbol)
    metadata_requests.append(request)

# Process in batches to avoid overwhelming the system
batch_size = 20
results = []

for i in range(0, len(metadata_requests), batch_size):
    batch = metadata_requests[i:i + batch_size]
    batch_results = async_wait_requests(batch, timeout=30)
    results.extend(batch_results)
    print(f"Processed batch {i//batch_size + 1}")

total_time = time.time() - start_time
print(f"Checked {len(symbols_to_check)} symbols in {total_time:.2f}s")
print(f"Rate: {len(symbols_to_check)/total_time:.1f} operations/second")

# Memory-efficient large data reads
large_symbols = ['LARGE_DATASET_1', 'LARGE_DATASET_2', 'LARGE_DATASET_3']

# Process one at a time to manage memory usage
for symbol in large_symbols:
    request = async_arctic_submit(lib, 'read', symbol)
    try:
        result = async_wait_request(request, timeout=300)  # Longer timeout
        # Process result immediately to free memory
        data_size = result.data.memory_usage().sum()
        print(f"{symbol}: {data_size / 1024**2:.1f} MB")
        del result  # Explicit cleanup
    except Exception as e:
        print(f"Failed to process {symbol}: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-arctic

docs

arctic-connection.md

async-operations.md

bson-store.md

chunk-store.md

date-utilities.md

index.md

tick-store.md

version-store.md

tile.json