AHL Research Versioned TimeSeries and Tick store for high-performance financial data storage and analysis
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asynchronous execution framework for Arctic operations enabling concurrent data processing and improved performance for batch operations. Provides thread pool management, request tracking, and concurrent execution capabilities for high-throughput data operations.
Functions for submitting Arctic operations for asynchronous execution with configurable thread pools.
def async_arctic_submit(store, fun, is_modifier, *args, **kwargs):
"""
Submit Arctic operation for asynchronous execution.
Parameters:
- store: Arctic store instance (VersionStore, TickStore, etc.)
- fun: Function/method to execute asynchronously
- is_modifier: Whether the operation modifies data (affects request tracking)
- *args: Positional arguments for the function
- **kwargs: Keyword arguments for the function
Returns:
AsyncRequest: Request object for tracking execution status
Example:
request = async_arctic_submit(version_store, 'read', False, 'AAPL')
"""Functions for waiting on and managing asynchronous request completion.
def async_wait_request(request, timeout=None):
"""
Wait for single asynchronous request to complete.
Parameters:
- request: AsyncRequest object from async_arctic_submit
- timeout: Maximum time to wait in seconds (None = no timeout)
Returns:
Result of the asynchronous operation
Raises:
- RequestDurationException: If operation times out
- Original exception: If the async operation failed
"""
def async_wait_requests(requests, timeout=None):
"""
Wait for multiple asynchronous requests to complete.
Parameters:
- requests: List of AsyncRequest objects
- timeout: Maximum time to wait for all requests in seconds
Returns:
List of results in the same order as input requests
Raises:
- RequestDurationException: If any operation times out
- Original exceptions: If any async operations failed
"""Functions for managing the asynchronous execution thread pool and system resources.
def async_shutdown(timeout=None):
"""
Shutdown the asynchronous thread pool gracefully.
Parameters:
- timeout: Maximum time to wait for shutdown in seconds
Stops accepting new requests and waits for current operations
to complete before shutting down the thread pool.
"""
def async_await_termination(timeout=None):
"""
Wait for all asynchronous operations to complete and terminate.
Parameters:
- timeout: Maximum time to wait for termination in seconds
Blocks until all submitted requests have completed execution
and the thread pool has been fully terminated.
"""
def async_reset_pool(pool_size=None, timeout=None):
"""
Reset the asynchronous thread pool with new configuration.
Parameters:
- pool_size: New thread pool size (None = use default)
- timeout: Timeout for shutting down existing pool
Shuts down the current thread pool and creates a new one
with the specified size for handling async operations.
"""Functions for monitoring asynchronous operation performance and usage.
def async_total_requests():
"""
Get total number of asynchronous requests processed.
Returns:
int: Total count of requests submitted since startup
Useful for monitoring throughput and system usage patterns.
"""Singleton instance for managing global asynchronous operations.
ASYNC_ARCTIC = AsyncArctic()
"""
Global singleton instance for asynchronous Arctic operations.
Provides centralized management of the thread pool and request
tracking for all async operations across Arctic stores.
"""Exception types specific to asynchronous operations and request handling.
class AsyncArcticException(ArcticException):
"""
Base exception for asynchronous Arctic operations.
Raised when async-specific errors occur during request
submission, execution, or result retrieval.
"""
class RequestDurationException(AsyncArcticException):
"""
Exception raised when asynchronous requests exceed timeout limits.
Raised by async_wait_request and async_wait_requests when
operations take longer than the specified timeout period.
"""from arctic import Arctic, VERSION_STORE
from arctic.asynchronous import (
async_arctic_submit, async_wait_request, async_wait_requests
)
import time
# Setup
arctic_conn = Arctic('mongodb://localhost:27017')
lib = arctic_conn['market_data']
# Submit multiple read requests asynchronously
symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
requests = []
start_time = time.time()
# Submit all requests for concurrent execution
for symbol in symbols:
request = async_arctic_submit(lib, 'read', symbol)
requests.append(request)
print(f"Submitted {len(requests)} requests in {time.time() - start_time:.3f}s")
# Wait for all requests to complete
results = async_wait_requests(requests, timeout=60)
print(f"Completed all requests in {time.time() - start_time:.3f}s")
# Process results
for symbol, result in zip(symbols, results):
print(f"{symbol}: {result.data.shape[0]} data points")# Submit single request
request = async_arctic_submit(lib, 'read', 'AAPL')
# Do other work while request executes
print("Request submitted, doing other work...")
time.sleep(1)
# Wait for specific request
try:
result = async_wait_request(request, timeout=30)
print(f"AAPL data: {result.data.shape}")
except RequestDurationException:
print("Request timed out")
except Exception as e:
print(f"Request failed: {e}")import pandas as pd
import numpy as np
# Generate sample data for multiple symbols
symbols_data = {}
for symbol in ['AAPL', 'GOOGL', 'MSFT']:
dates = pd.date_range('2020-01-01', periods=1000, freq='min')
data = pd.DataFrame({
'price': np.random.randn(1000).cumsum() + 100,
'volume': np.random.randint(100, 1000, 1000)
}, index=dates)
symbols_data[symbol] = data
# Submit write operations asynchronously
write_requests = []
for symbol, data in symbols_data.items():
request = async_arctic_submit(lib, 'write', symbol, data)
write_requests.append((symbol, request))
# Wait for all writes to complete
for symbol, request in write_requests:
try:
result = async_wait_request(request, timeout=60)
print(f"Successfully wrote {symbol}")
except Exception as e:
print(f"Failed to write {symbol}: {e}")from arctic.date import DateRange
from datetime import datetime, timedelta
# Submit mixed operations
operations = []
# Read operations
for symbol in ['AAPL', 'GOOGL']:
date_range = DateRange(
datetime(2020, 1, 1),
datetime(2020, 1, 31)
)
request = async_arctic_submit(lib, 'read', symbol, date_range=date_range)
operations.append(('read', symbol, request))
# Metadata operations
for symbol in ['MSFT', 'AMZN']:
request = async_arctic_submit(lib, 'read_metadata', symbol)
operations.append(('metadata', symbol, request))
# List operations
list_request = async_arctic_submit(lib, 'list_symbols')
operations.append(('list', 'all', list_request))
# Process all operations
results = []
for op_type, symbol, request in operations:
try:
result = async_wait_request(request, timeout=30)
results.append((op_type, symbol, result))
print(f"Completed {op_type} for {symbol}")
except Exception as e:
print(f"Failed {op_type} for {symbol}: {e}")from arctic.asynchronous import (
async_reset_pool, async_shutdown, async_total_requests,
async_await_termination
)
# Check current usage
total_requests = async_total_requests()
print(f"Total requests processed: {total_requests}")
# Reset thread pool with custom size
async_reset_pool(pool_size=8, timeout=10)
print("Thread pool reset with 8 threads")
# Submit high-volume operations
batch_requests = []
for i in range(50):
symbol = f'SYM{i:03d}'
request = async_arctic_submit(lib, 'has_symbol', symbol)
batch_requests.append(request)
# Wait for batch completion
batch_results = async_wait_requests(batch_requests, timeout=120)
print(f"Processed {len(batch_results)} operations")
# Graceful shutdown
print("Shutting down async operations...")
async_shutdown(timeout=30)
async_await_termination(timeout=60)
print("Async system shutdown complete")# Handle timeouts and errors gracefully
def safe_async_operation(store, operation, *args, **kwargs):
"""Safely execute async operation with error handling."""
try:
request = async_arctic_submit(store, operation, *args, **kwargs)
result = async_wait_request(request, timeout=30)
return result, None
except RequestDurationException:
return None, "Operation timed out"
except Exception as e:
return None, f"Operation failed: {str(e)}"
# Use safe wrapper for critical operations
symbols = ['AAPL', 'INVALID_SYMBOL', 'GOOGL']
for symbol in symbols:
result, error = safe_async_operation(lib, 'read', symbol)
if error:
print(f"{symbol}: {error}")
else:
print(f"{symbol}: Success - {result.data.shape[0]} rows")
# Batch operations with error handling
success_count = 0
error_count = 0
requests = []
for symbol in symbols:
request = async_arctic_submit(lib, 'read', symbol)
requests.append((symbol, request))
for symbol, request in requests:
try:
result = async_wait_request(request, timeout=10)
success_count += 1
print(f"{symbol}: Success")
except RequestDurationException:
error_count += 1
print(f"{symbol}: Timeout")
except Exception as e:
error_count += 1
print(f"{symbol}: Error - {e}")
print(f"Results: {success_count} success, {error_count} errors")# Optimize for different workload patterns
# High-throughput metadata checks
metadata_requests = []
symbols_to_check = [f'SYMBOL_{i:04d}' for i in range(100)]
start_time = time.time()
for symbol in symbols_to_check:
request = async_arctic_submit(lib, 'has_symbol', symbol)
metadata_requests.append(request)
# Process in batches to avoid overwhelming the system
batch_size = 20
results = []
for i in range(0, len(metadata_requests), batch_size):
batch = metadata_requests[i:i + batch_size]
batch_results = async_wait_requests(batch, timeout=30)
results.extend(batch_results)
print(f"Processed batch {i//batch_size + 1}")
total_time = time.time() - start_time
print(f"Checked {len(symbols_to_check)} symbols in {total_time:.2f}s")
print(f"Rate: {len(symbols_to_check)/total_time:.1f} operations/second")
# Memory-efficient large data reads
large_symbols = ['LARGE_DATASET_1', 'LARGE_DATASET_2', 'LARGE_DATASET_3']
# Process one at a time to manage memory usage
for symbol in large_symbols:
request = async_arctic_submit(lib, 'read', symbol)
try:
result = async_wait_request(request, timeout=300) # Longer timeout
# Process result immediately to free memory
data_size = result.data.memory_usage().sum()
print(f"{symbol}: {data_size / 1024**2:.1f} MB")
del result # Explicit cleanup
except Exception as e:
print(f"Failed to process {symbol}: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-arctic