0
# Asynchronous Operations
1
2
Asynchronous execution framework for Arctic operations enabling concurrent data processing and improved performance for batch operations. Provides thread pool management, request tracking, and concurrent execution capabilities for high-throughput data operations.
3
4
## Capabilities
5
6
### Asynchronous Request Submission
7
8
Functions for submitting Arctic operations for asynchronous execution with configurable thread pools.
9
10
```python { .api }
11
def async_arctic_submit(store, fun, is_modifier, *args, **kwargs):
12
"""
13
Submit Arctic operation for asynchronous execution.
14
15
Parameters:
16
- store: Arctic store instance (VersionStore, TickStore, etc.)
17
- fun: Function/method to execute asynchronously
18
- is_modifier: Whether the operation modifies data (affects request tracking)
19
- *args: Positional arguments for the function
20
- **kwargs: Keyword arguments for the function
21
22
Returns:
23
AsyncRequest: Request object for tracking execution status
24
25
Example:
26
request = async_arctic_submit(version_store, 'read', False, 'AAPL')
27
"""
28
```
29
30
### Request Management
31
32
Functions for waiting on and managing asynchronous request completion.
33
34
```python { .api }
35
def async_wait_request(request, timeout=None):
36
"""
37
Wait for single asynchronous request to complete.
38
39
Parameters:
40
- request: AsyncRequest object from async_arctic_submit
41
- timeout: Maximum time to wait in seconds (None = no timeout)
42
43
Returns:
44
Result of the asynchronous operation
45
46
Raises:
47
- RequestDurationException: If operation times out
48
- Original exception: If the async operation failed
49
"""
50
51
def async_wait_requests(requests, timeout=None):
52
"""
53
Wait for multiple asynchronous requests to complete.
54
55
Parameters:
56
- requests: List of AsyncRequest objects
57
- timeout: Maximum time to wait for all requests in seconds
58
59
Returns:
60
List of results in the same order as input requests
61
62
Raises:
63
- RequestDurationException: If any operation times out
64
- Original exceptions: If any async operations failed
65
"""
66
```
67
68
### Thread Pool Management
69
70
Functions for managing the asynchronous execution thread pool and system resources.
71
72
```python { .api }
73
def async_shutdown(timeout=None):
74
"""
75
Shutdown the asynchronous thread pool gracefully.
76
77
Parameters:
78
- timeout: Maximum time to wait for shutdown in seconds
79
80
Stops accepting new requests and waits for current operations
81
to complete before shutting down the thread pool.
82
"""
83
84
def async_await_termination(timeout=None):
85
"""
86
Wait for all asynchronous operations to complete and terminate.
87
88
Parameters:
89
- timeout: Maximum time to wait for termination in seconds
90
91
Blocks until all submitted requests have completed execution
92
and the thread pool has been fully terminated.
93
"""
94
95
def async_reset_pool(pool_size=None, timeout=None):
96
"""
97
Reset the asynchronous thread pool with new configuration.
98
99
Parameters:
100
- pool_size: New thread pool size (None = use default)
101
- timeout: Timeout for shutting down existing pool
102
103
Shuts down the current thread pool and creates a new one
104
with the specified size for handling async operations.
105
"""
106
```
107
108
### Statistics and Monitoring
109
110
Functions for monitoring asynchronous operation performance and usage.
111
112
```python { .api }
113
def async_total_requests():
114
"""
115
Get total number of asynchronous requests processed.
116
117
Returns:
118
int: Total count of requests submitted since startup
119
120
Useful for monitoring throughput and system usage patterns.
121
"""
122
```
123
124
### Async Arctic Instance
125
126
Singleton instance for managing global asynchronous operations.
127
128
```python { .api }
129
ASYNC_ARCTIC = AsyncArctic()
130
"""
131
Global singleton instance for asynchronous Arctic operations.
132
133
Provides centralized management of the thread pool and request
134
tracking for all async operations across Arctic stores.
135
"""
136
```
137
138
## Exception Types
139
140
### Asynchronous Operation Exceptions
141
142
Exception types specific to asynchronous operations and request handling.
143
144
```python { .api }
145
class AsyncArcticException(ArcticException):
146
"""
147
Base exception for asynchronous Arctic operations.
148
149
Raised when async-specific errors occur during request
150
submission, execution, or result retrieval.
151
"""
152
153
class RequestDurationException(AsyncArcticException):
154
"""
155
Exception raised when asynchronous requests exceed timeout limits.
156
157
Raised by async_wait_request and async_wait_requests when
158
operations take longer than the specified timeout period.
159
"""
160
```
161
162
## Usage Examples
163
164
### Basic Asynchronous Operations
165
166
```python
167
from arctic import Arctic, VERSION_STORE
168
from arctic.asynchronous import (
169
async_arctic_submit, async_wait_request, async_wait_requests
170
)
171
import time
172
173
# Setup
174
arctic_conn = Arctic('mongodb://localhost:27017')
175
lib = arctic_conn['market_data']
176
177
# Submit multiple read requests asynchronously
178
symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
179
requests = []
180
181
start_time = time.time()
182
183
# Submit all requests for concurrent execution
184
for symbol in symbols:
185
request = async_arctic_submit(lib, 'read', symbol)
186
requests.append(request)
187
188
print(f"Submitted {len(requests)} requests in {time.time() - start_time:.3f}s")
189
190
# Wait for all requests to complete
191
results = async_wait_requests(requests, timeout=60)
192
print(f"Completed all requests in {time.time() - start_time:.3f}s")
193
194
# Process results
195
for symbol, result in zip(symbols, results):
196
print(f"{symbol}: {result.data.shape[0]} data points")
197
```
198
199
### Individual Request Handling
200
201
```python
202
# Submit single request
203
request = async_arctic_submit(lib, 'read', 'AAPL')
204
205
# Do other work while request executes
206
print("Request submitted, doing other work...")
207
time.sleep(1)
208
209
# Wait for specific request
210
try:
211
result = async_wait_request(request, timeout=30)
212
print(f"AAPL data: {result.data.shape}")
213
except RequestDurationException:
214
print("Request timed out")
215
except Exception as e:
216
print(f"Request failed: {e}")
217
```
218
219
### Batch Write Operations
220
221
```python
222
import pandas as pd
223
import numpy as np
224
225
# Generate sample data for multiple symbols
226
symbols_data = {}
227
for symbol in ['AAPL', 'GOOGL', 'MSFT']:
228
dates = pd.date_range('2020-01-01', periods=1000, freq='min')
229
data = pd.DataFrame({
230
'price': np.random.randn(1000).cumsum() + 100,
231
'volume': np.random.randint(100, 1000, 1000)
232
}, index=dates)
233
symbols_data[symbol] = data
234
235
# Submit write operations asynchronously
236
write_requests = []
237
for symbol, data in symbols_data.items():
238
request = async_arctic_submit(lib, 'write', symbol, data)
239
write_requests.append((symbol, request))
240
241
# Wait for all writes to complete
242
for symbol, request in write_requests:
243
try:
244
result = async_wait_request(request, timeout=60)
245
print(f"Successfully wrote {symbol}")
246
except Exception as e:
247
print(f"Failed to write {symbol}: {e}")
248
```
249
250
### Mixed Read and Write Operations
251
252
```python
253
from arctic.date import DateRange
254
from datetime import datetime, timedelta
255
256
# Submit mixed operations
257
operations = []
258
259
# Read operations
260
for symbol in ['AAPL', 'GOOGL']:
261
date_range = DateRange(
262
datetime(2020, 1, 1),
263
datetime(2020, 1, 31)
264
)
265
request = async_arctic_submit(lib, 'read', symbol, date_range=date_range)
266
operations.append(('read', symbol, request))
267
268
# Metadata operations
269
for symbol in ['MSFT', 'AMZN']:
270
request = async_arctic_submit(lib, 'read_metadata', symbol)
271
operations.append(('metadata', symbol, request))
272
273
# List operations
274
list_request = async_arctic_submit(lib, 'list_symbols')
275
operations.append(('list', 'all', list_request))
276
277
# Process all operations
278
results = []
279
for op_type, symbol, request in operations:
280
try:
281
result = async_wait_request(request, timeout=30)
282
results.append((op_type, symbol, result))
283
print(f"Completed {op_type} for {symbol}")
284
except Exception as e:
285
print(f"Failed {op_type} for {symbol}: {e}")
286
```
287
288
### Thread Pool Management
289
290
```python
291
from arctic.asynchronous import (
292
async_reset_pool, async_shutdown, async_total_requests,
293
async_await_termination
294
)
295
296
# Check current usage
297
total_requests = async_total_requests()
298
print(f"Total requests processed: {total_requests}")
299
300
# Reset thread pool with custom size
301
async_reset_pool(pool_size=8, timeout=10)
302
print("Thread pool reset with 8 threads")
303
304
# Submit high-volume operations
305
batch_requests = []
306
for i in range(50):
307
symbol = f'SYM{i:03d}'
308
request = async_arctic_submit(lib, 'has_symbol', symbol)
309
batch_requests.append(request)
310
311
# Wait for batch completion
312
batch_results = async_wait_requests(batch_requests, timeout=120)
313
print(f"Processed {len(batch_results)} operations")
314
315
# Graceful shutdown
316
print("Shutting down async operations...")
317
async_shutdown(timeout=30)
318
async_await_termination(timeout=60)
319
print("Async system shutdown complete")
320
```
321
322
### Error Handling and Timeouts
323
324
```python
325
# Handle timeouts and errors gracefully
326
def safe_async_operation(store, operation, *args, **kwargs):
327
"""Safely execute async operation with error handling."""
328
try:
329
request = async_arctic_submit(store, operation, *args, **kwargs)
330
result = async_wait_request(request, timeout=30)
331
return result, None
332
except RequestDurationException:
333
return None, "Operation timed out"
334
except Exception as e:
335
return None, f"Operation failed: {str(e)}"
336
337
# Use safe wrapper for critical operations
338
symbols = ['AAPL', 'INVALID_SYMBOL', 'GOOGL']
339
for symbol in symbols:
340
result, error = safe_async_operation(lib, 'read', symbol)
341
if error:
342
print(f"{symbol}: {error}")
343
else:
344
print(f"{symbol}: Success - {result.data.shape[0]} rows")
345
346
# Batch operations with error handling
347
success_count = 0
348
error_count = 0
349
350
requests = []
351
for symbol in symbols:
352
request = async_arctic_submit(lib, 'read', symbol)
353
requests.append((symbol, request))
354
355
for symbol, request in requests:
356
try:
357
result = async_wait_request(request, timeout=10)
358
success_count += 1
359
print(f"{symbol}: Success")
360
except RequestDurationException:
361
error_count += 1
362
print(f"{symbol}: Timeout")
363
except Exception as e:
364
error_count += 1
365
print(f"{symbol}: Error - {e}")
366
367
print(f"Results: {success_count} success, {error_count} errors")
368
```
369
370
### Performance Optimization
371
372
```python
373
# Optimize for different workload patterns
374
375
# High-throughput metadata checks
376
metadata_requests = []
377
symbols_to_check = [f'SYMBOL_{i:04d}' for i in range(100)]
378
379
start_time = time.time()
380
for symbol in symbols_to_check:
381
request = async_arctic_submit(lib, 'has_symbol', symbol)
382
metadata_requests.append(request)
383
384
# Process in batches to avoid overwhelming the system
385
batch_size = 20
386
results = []
387
388
for i in range(0, len(metadata_requests), batch_size):
389
batch = metadata_requests[i:i + batch_size]
390
batch_results = async_wait_requests(batch, timeout=30)
391
results.extend(batch_results)
392
print(f"Processed batch {i//batch_size + 1}")
393
394
total_time = time.time() - start_time
395
print(f"Checked {len(symbols_to_check)} symbols in {total_time:.2f}s")
396
print(f"Rate: {len(symbols_to_check)/total_time:.1f} operations/second")
397
398
# Memory-efficient large data reads
399
large_symbols = ['LARGE_DATASET_1', 'LARGE_DATASET_2', 'LARGE_DATASET_3']
400
401
# Process one at a time to manage memory usage
402
for symbol in large_symbols:
403
request = async_arctic_submit(lib, 'read', symbol)
404
try:
405
result = async_wait_request(request, timeout=300) # Longer timeout
406
# Process result immediately to free memory
407
data_size = result.data.memory_usage().sum()
408
print(f"{symbol}: {data_size / 1024**2:.1f} MB")
409
del result # Explicit cleanup
410
except Exception as e:
411
print(f"Failed to process {symbol}: {e}")
412
```