or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arctic-connection.mdasync-operations.mdbson-store.mdchunk-store.mddate-utilities.mdindex.mdtick-store.mdversion-store.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Asynchronous execution framework for Arctic operations enabling concurrent data processing and improved performance for batch operations. Provides thread pool management, request tracking, and concurrent execution capabilities for high-throughput data operations.

3

4

## Capabilities

5

6

### Asynchronous Request Submission

7

8

Functions for submitting Arctic operations for asynchronous execution with configurable thread pools.

9

10

```python { .api }

11

def async_arctic_submit(store, fun, is_modifier, *args, **kwargs):

12

"""

13

Submit Arctic operation for asynchronous execution.

14

15

Parameters:

16

- store: Arctic store instance (VersionStore, TickStore, etc.)

17

- fun: Function/method to execute asynchronously

18

- is_modifier: Whether the operation modifies data (affects request tracking)

19

- *args: Positional arguments for the function

20

- **kwargs: Keyword arguments for the function

21

22

Returns:

23

AsyncRequest: Request object for tracking execution status

24

25

Example:

26

request = async_arctic_submit(version_store, 'read', False, 'AAPL')

27

"""

28

```

29

30

### Request Management

31

32

Functions for waiting on and managing asynchronous request completion.

33

34

```python { .api }

35

def async_wait_request(request, timeout=None):

36

"""

37

Wait for single asynchronous request to complete.

38

39

Parameters:

40

- request: AsyncRequest object from async_arctic_submit

41

- timeout: Maximum time to wait in seconds (None = no timeout)

42

43

Returns:

44

Result of the asynchronous operation

45

46

Raises:

47

- RequestDurationException: If operation times out

48

- Original exception: If the async operation failed

49

"""

50

51

def async_wait_requests(requests, timeout=None):

52

"""

53

Wait for multiple asynchronous requests to complete.

54

55

Parameters:

56

- requests: List of AsyncRequest objects

57

- timeout: Maximum time to wait for all requests in seconds

58

59

Returns:

60

List of results in the same order as input requests

61

62

Raises:

63

- RequestDurationException: If any operation times out

64

- Original exceptions: If any async operations failed

65

"""

66

```

67

68

### Thread Pool Management

69

70

Functions for managing the asynchronous execution thread pool and system resources.

71

72

```python { .api }

73

def async_shutdown(timeout=None):

74

"""

75

Shutdown the asynchronous thread pool gracefully.

76

77

Parameters:

78

- timeout: Maximum time to wait for shutdown in seconds

79

80

Stops accepting new requests and waits for current operations

81

to complete before shutting down the thread pool.

82

"""

83

84

def async_await_termination(timeout=None):

85

"""

86

Wait for all asynchronous operations to complete and terminate.

87

88

Parameters:

89

- timeout: Maximum time to wait for termination in seconds

90

91

Blocks until all submitted requests have completed execution

92

and the thread pool has been fully terminated.

93

"""

94

95

def async_reset_pool(pool_size=None, timeout=None):

96

"""

97

Reset the asynchronous thread pool with new configuration.

98

99

Parameters:

100

- pool_size: New thread pool size (None = use default)

101

- timeout: Timeout for shutting down existing pool

102

103

Shuts down the current thread pool and creates a new one

104

with the specified size for handling async operations.

105

"""

106

```

107

108

### Statistics and Monitoring

109

110

Functions for monitoring asynchronous operation performance and usage.

111

112

```python { .api }

113

def async_total_requests():

114

"""

115

Get total number of asynchronous requests processed.

116

117

Returns:

118

int: Total count of requests submitted since startup

119

120

Useful for monitoring throughput and system usage patterns.

121

"""

122

```

123

124

### Async Arctic Instance

125

126

Singleton instance for managing global asynchronous operations.

127

128

```python { .api }

129

ASYNC_ARCTIC = AsyncArctic()

130

"""

131

Global singleton instance for asynchronous Arctic operations.

132

133

Provides centralized management of the thread pool and request

134

tracking for all async operations across Arctic stores.

135

"""

136

```

137

138

## Exception Types

139

140

### Asynchronous Operation Exceptions

141

142

Exception types specific to asynchronous operations and request handling.

143

144

```python { .api }

145

class AsyncArcticException(ArcticException):

146

"""

147

Base exception for asynchronous Arctic operations.

148

149

Raised when async-specific errors occur during request

150

submission, execution, or result retrieval.

151

"""

152

153

class RequestDurationException(AsyncArcticException):

154

"""

155

Exception raised when asynchronous requests exceed timeout limits.

156

157

Raised by async_wait_request and async_wait_requests when

158

operations take longer than the specified timeout period.

159

"""

160

```

161

162

## Usage Examples

163

164

### Basic Asynchronous Operations

165

166

```python

167

from arctic import Arctic, VERSION_STORE

168

from arctic.asynchronous import (

169

async_arctic_submit, async_wait_request, async_wait_requests

170

)

171

import time

172

173

# Setup

174

arctic_conn = Arctic('mongodb://localhost:27017')

175

lib = arctic_conn['market_data']

176

177

# Submit multiple read requests asynchronously

178

symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']

179

requests = []

180

181

start_time = time.time()

182

183

# Submit all requests for concurrent execution

184

for symbol in symbols:

185

request = async_arctic_submit(lib, 'read', symbol)

186

requests.append(request)

187

188

print(f"Submitted {len(requests)} requests in {time.time() - start_time:.3f}s")

189

190

# Wait for all requests to complete

191

results = async_wait_requests(requests, timeout=60)

192

print(f"Completed all requests in {time.time() - start_time:.3f}s")

193

194

# Process results

195

for symbol, result in zip(symbols, results):

196

print(f"{symbol}: {result.data.shape[0]} data points")

197

```

198

199

### Individual Request Handling

200

201

```python

202

# Submit single request

203

request = async_arctic_submit(lib, 'read', 'AAPL')

204

205

# Do other work while request executes

206

print("Request submitted, doing other work...")

207

time.sleep(1)

208

209

# Wait for specific request

210

try:

211

result = async_wait_request(request, timeout=30)

212

print(f"AAPL data: {result.data.shape}")

213

except RequestDurationException:

214

print("Request timed out")

215

except Exception as e:

216

print(f"Request failed: {e}")

217

```

218

219

### Batch Write Operations

220

221

```python

222

import pandas as pd

223

import numpy as np

224

225

# Generate sample data for multiple symbols

226

symbols_data = {}

227

for symbol in ['AAPL', 'GOOGL', 'MSFT']:

228

dates = pd.date_range('2020-01-01', periods=1000, freq='min')

229

data = pd.DataFrame({

230

'price': np.random.randn(1000).cumsum() + 100,

231

'volume': np.random.randint(100, 1000, 1000)

232

}, index=dates)

233

symbols_data[symbol] = data

234

235

# Submit write operations asynchronously

236

write_requests = []

237

for symbol, data in symbols_data.items():

238

request = async_arctic_submit(lib, 'write', symbol, data)

239

write_requests.append((symbol, request))

240

241

# Wait for all writes to complete

242

for symbol, request in write_requests:

243

try:

244

result = async_wait_request(request, timeout=60)

245

print(f"Successfully wrote {symbol}")

246

except Exception as e:

247

print(f"Failed to write {symbol}: {e}")

248

```

249

250

### Mixed Read and Write Operations

251

252

```python

253

from arctic.date import DateRange

254

from datetime import datetime, timedelta

255

256

# Submit mixed operations

257

operations = []

258

259

# Read operations

260

for symbol in ['AAPL', 'GOOGL']:

261

date_range = DateRange(

262

datetime(2020, 1, 1),

263

datetime(2020, 1, 31)

264

)

265

request = async_arctic_submit(lib, 'read', symbol, date_range=date_range)

266

operations.append(('read', symbol, request))

267

268

# Metadata operations

269

for symbol in ['MSFT', 'AMZN']:

270

request = async_arctic_submit(lib, 'read_metadata', symbol)

271

operations.append(('metadata', symbol, request))

272

273

# List operations

274

list_request = async_arctic_submit(lib, 'list_symbols')

275

operations.append(('list', 'all', list_request))

276

277

# Process all operations

278

results = []

279

for op_type, symbol, request in operations:

280

try:

281

result = async_wait_request(request, timeout=30)

282

results.append((op_type, symbol, result))

283

print(f"Completed {op_type} for {symbol}")

284

except Exception as e:

285

print(f"Failed {op_type} for {symbol}: {e}")

286

```

287

288

### Thread Pool Management

289

290

```python

291

from arctic.asynchronous import (

292

async_reset_pool, async_shutdown, async_total_requests,

293

async_await_termination

294

)

295

296

# Check current usage

297

total_requests = async_total_requests()

298

print(f"Total requests processed: {total_requests}")

299

300

# Reset thread pool with custom size

301

async_reset_pool(pool_size=8, timeout=10)

302

print("Thread pool reset with 8 threads")

303

304

# Submit high-volume operations

305

batch_requests = []

306

for i in range(50):

307

symbol = f'SYM{i:03d}'

308

request = async_arctic_submit(lib, 'has_symbol', symbol)

309

batch_requests.append(request)

310

311

# Wait for batch completion

312

batch_results = async_wait_requests(batch_requests, timeout=120)

313

print(f"Processed {len(batch_results)} operations")

314

315

# Graceful shutdown

316

print("Shutting down async operations...")

317

async_shutdown(timeout=30)

318

async_await_termination(timeout=60)

319

print("Async system shutdown complete")

320

```

321

322

### Error Handling and Timeouts

323

324

```python

325

# Handle timeouts and errors gracefully

326

def safe_async_operation(store, operation, *args, **kwargs):

327

"""Safely execute async operation with error handling."""

328

try:

329

request = async_arctic_submit(store, operation, *args, **kwargs)

330

result = async_wait_request(request, timeout=30)

331

return result, None

332

except RequestDurationException:

333

return None, "Operation timed out"

334

except Exception as e:

335

return None, f"Operation failed: {str(e)}"

336

337

# Use safe wrapper for critical operations

338

symbols = ['AAPL', 'INVALID_SYMBOL', 'GOOGL']

339

for symbol in symbols:

340

result, error = safe_async_operation(lib, 'read', symbol)

341

if error:

342

print(f"{symbol}: {error}")

343

else:

344

print(f"{symbol}: Success - {result.data.shape[0]} rows")

345

346

# Batch operations with error handling

347

success_count = 0

348

error_count = 0

349

350

requests = []

351

for symbol in symbols:

352

request = async_arctic_submit(lib, 'read', symbol)

353

requests.append((symbol, request))

354

355

for symbol, request in requests:

356

try:

357

result = async_wait_request(request, timeout=10)

358

success_count += 1

359

print(f"{symbol}: Success")

360

except RequestDurationException:

361

error_count += 1

362

print(f"{symbol}: Timeout")

363

except Exception as e:

364

error_count += 1

365

print(f"{symbol}: Error - {e}")

366

367

print(f"Results: {success_count} success, {error_count} errors")

368

```

369

370

### Performance Optimization

371

372

```python

373

# Optimize for different workload patterns

374

375

# High-throughput metadata checks

376

metadata_requests = []

377

symbols_to_check = [f'SYMBOL_{i:04d}' for i in range(100)]

378

379

start_time = time.time()

380

for symbol in symbols_to_check:

381

request = async_arctic_submit(lib, 'has_symbol', symbol)

382

metadata_requests.append(request)

383

384

# Process in batches to avoid overwhelming the system

385

batch_size = 20

386

results = []

387

388

for i in range(0, len(metadata_requests), batch_size):

389

batch = metadata_requests[i:i + batch_size]

390

batch_results = async_wait_requests(batch, timeout=30)

391

results.extend(batch_results)

392

print(f"Processed batch {i//batch_size + 1}")

393

394

total_time = time.time() - start_time

395

print(f"Checked {len(symbols_to_check)} symbols in {total_time:.2f}s")

396

print(f"Rate: {len(symbols_to_check)/total_time:.1f} operations/second")

397

398

# Memory-efficient large data reads

399

large_symbols = ['LARGE_DATASET_1', 'LARGE_DATASET_2', 'LARGE_DATASET_3']

400

401

# Process one at a time to manage memory usage

402

for symbol in large_symbols:

403

request = async_arctic_submit(lib, 'read', symbol)

404

try:

405

result = async_wait_request(request, timeout=300) # Longer timeout

406

# Process result immediately to free memory

407

data_size = result.data.memory_usage().sum()

408

print(f"{symbol}: {data_size / 1024**2:.1f} MB")

409

del result # Explicit cleanup

410

except Exception as e:

411

print(f"Failed to process {symbol}: {e}")

412

```