or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdcryptocurrency.mdindex.mdmarket-data.mdstreaming.mdtrading-operations.md

async-operations.mddocs/

0

# Async Market Data Operations

1

2

Asynchronous market data operations optimized for high-performance data retrieval with pandas DataFrame outputs. Designed for concurrent data fetching and analysis workflows.

3

4

## Capabilities

5

6

### Async Client Initialization

7

8

Create asynchronous REST client for non-blocking market data operations.

9

10

```python { .api }

11

class AsyncRest:

12

def __init__(

13

self,

14

key_id: str = None,

15

secret_key: str = None,

16

data_url: str = None,

17

api_version: str = None,

18

raw_data: bool = False

19

): ...

20

```

21

22

**Usage Example:**

23

24

```python

25

from alpaca_trade_api import AsyncRest

26

import asyncio

27

28

# Initialize async client

29

async_api = AsyncRest('your-key-id', 'your-secret-key')

30

```

31

32

### Async Historical Data

33

34

Retrieve historical market data asynchronously for improved performance when fetching data for multiple symbols.

35

36

```python { .api }

37

async def get_bars_async(

38

symbol: str,

39

start: str,

40

end: str,

41

timeframe: TimeFrame,

42

limit: int = None,

43

adjustment: str = None

44

) -> Tuple[str, pd.DataFrame]:

45

"""Get historical bars asynchronously."""

46

47

async def get_trades_async(

48

symbol: str,

49

start: str,

50

end: str,

51

limit: int = None

52

) -> Tuple[str, pd.DataFrame]:

53

"""Get historical trades asynchronously."""

54

55

async def get_quotes_async(

56

symbol: str,

57

start: str,

58

end: str,

59

limit: int = None

60

) -> Tuple[str, pd.DataFrame]:

61

"""Get historical quotes asynchronously."""

62

```

63

64

**Usage Examples:**

65

66

```python

67

import asyncio

68

import pandas as pd

69

from alpaca_trade_api import AsyncRest, TimeFrame

70

71

async def fetch_multiple_symbols():

72

async_api = AsyncRest('key', 'secret')

73

74

symbols = ['AAPL', 'TSLA', 'GOOGL', 'MSFT', 'AMZN']

75

76

# Fetch bars for multiple symbols concurrently

77

tasks = [

78

async_api.get_bars_async(

79

symbol,

80

'2023-01-01',

81

'2023-01-31',

82

TimeFrame.Day

83

)

84

for symbol in symbols

85

]

86

87

results = await asyncio.gather(*tasks)

88

89

# Process results

90

for symbol, df in results:

91

print(f"{symbol}: {len(df)} bars, Avg Volume: {df['volume'].mean():,.0f}")

92

93

# Calculate daily returns

94

df['returns'] = df['close'].pct_change()

95

volatility = df['returns'].std() * (252 ** 0.5) # Annualized volatility

96

print(f" Annualized Volatility: {volatility:.2%}")

97

98

# Run the async function

99

asyncio.run(fetch_multiple_symbols())

100

```

101

102

### Async Latest Data

103

104

Get the most recent market data asynchronously for real-time analysis.

105

106

```python { .api }

107

async def get_latest_trade_async(symbol: str) -> Tuple[str, TradeV2]:

108

"""Get latest trade asynchronously."""

109

110

async def get_latest_quote_async(symbol: str) -> Tuple[str, QuoteV2]:

111

"""Get latest quote asynchronously."""

112

```

113

114

**Usage Examples:**

115

116

```python

117

async def monitor_portfolio():

118

async_api = AsyncRest('key', 'secret')

119

portfolio_symbols = ['AAPL', 'TSLA', 'NVDA', 'AMD', 'GOOGL']

120

121

while True:

122

# Fetch latest trades for all symbols concurrently

123

tasks = [async_api.get_latest_trade_async(symbol) for symbol in portfolio_symbols]

124

results = await asyncio.gather(*tasks)

125

126

print(f"Portfolio Update - {pd.Timestamp.now()}")

127

total_value = 0

128

129

for symbol, trade in results:

130

# Assume we hold 100 shares of each

131

position_value = trade.price * 100

132

total_value += position_value

133

print(f" {symbol}: ${trade.price:.2f} (Value: ${position_value:,.2f})")

134

135

print(f"Total Portfolio Value: ${total_value:,.2f}")

136

print("-" * 50)

137

138

# Wait 30 seconds before next update

139

await asyncio.sleep(30)

140

141

# Run portfolio monitor

142

asyncio.run(monitor_portfolio())

143

```

144

145

### Concurrent Data Analysis

146

147

Combine async operations with pandas for efficient data analysis workflows.

148

149

```python { .api }

150

async def gather_with_concurrency(n: int, *tasks) -> List:

151

"""Execute async tasks with concurrency limit."""

152

```

153

154

**Usage Examples:**

155

156

```python

157

async def sector_analysis():

158

async_api = AsyncRest('key', 'secret')

159

160

# Define sector ETFs and major stocks

161

sectors = {

162

'Technology': ['QQQ', 'AAPL', 'MSFT', 'GOOGL', 'META'],

163

'Energy': ['XLE', 'XOM', 'CVX', 'COP', 'SLB'],

164

'Healthcare': ['XLV', 'JNJ', 'PFE', 'UNH', 'ABBV'],

165

'Finance': ['XLF', 'JPM', 'BAC', 'WFC', 'GS']

166

}

167

168

all_symbols = []

169

for sector_symbols in sectors.values():

170

all_symbols.extend(sector_symbols)

171

172

# Limit concurrency to avoid rate limits

173

tasks = [

174

async_api.get_bars_async(

175

symbol,

176

'2023-01-01',

177

'2023-12-31',

178

TimeFrame.Day

179

)

180

for symbol in all_symbols

181

]

182

183

# Use concurrency control

184

results = await gather_with_concurrency(10, *tasks)

185

186

# Organize results by sector

187

sector_performance = {}

188

result_dict = dict(results)

189

190

for sector, symbols in sectors.items():

191

sector_returns = []

192

193

for symbol in symbols:

194

if symbol in result_dict:

195

df = result_dict[symbol]

196

if not df.empty:

197

# Calculate total return for the year

198

total_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1

199

sector_returns.append(total_return)

200

201

if sector_returns:

202

avg_return = sum(sector_returns) / len(sector_returns)

203

sector_performance[sector] = avg_return

204

205

# Display sector performance

206

print("2023 Sector Performance:")

207

for sector, performance in sorted(sector_performance.items(), key=lambda x: x[1], reverse=True):

208

print(f" {sector}: {performance:.2%}")

209

210

asyncio.run(sector_analysis())

211

```

212

213

### Error Handling in Async Operations

214

215

Handle errors gracefully in async operations with proper exception handling.

216

217

**Usage Examples:**

218

219

```python

220

async def robust_data_fetch():

221

async_api = AsyncRest('key', 'secret')

222

symbols = ['AAPL', 'INVALID_SYMBOL', 'TSLA', 'GOOGL']

223

224

async def safe_fetch(symbol):

225

try:

226

return await async_api.get_bars_async(

227

symbol,

228

'2023-01-01',

229

'2023-01-31',

230

TimeFrame.Day

231

)

232

except Exception as e:

233

print(f"Error fetching {symbol}: {e}")

234

return symbol, pd.DataFrame() # Return empty DataFrame on error

235

236

# Fetch data for all symbols, handling errors gracefully

237

results = await asyncio.gather(*[safe_fetch(symbol) for symbol in symbols])

238

239

# Process successful results

240

successful_results = [(symbol, df) for symbol, df in results if not df.empty]

241

242

print(f"Successfully fetched data for {len(successful_results)} out of {len(symbols)} symbols")

243

for symbol, df in successful_results:

244

print(f" {symbol}: {len(df)} bars")

245

```

246

247

### Batch Processing

248

249

Efficiently process large datasets using async operations with batching.

250

251

**Usage Examples:**

252

253

```python

254

async def batch_historical_analysis():

255

async_api = AsyncRest('key', 'secret')

256

257

# Large list of symbols to analyze

258

all_symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA', 'AMD',

259

'NFLX', 'CRM', 'ORCL', 'ADBE', 'INTC', 'CSCO', 'IBM', 'HPE']

260

261

batch_size = 5

262

results = []

263

264

# Process symbols in batches to manage memory and rate limits

265

for i in range(0, len(all_symbols), batch_size):

266

batch = all_symbols[i:i + batch_size]

267

print(f"Processing batch {i//batch_size + 1}: {batch}")

268

269

# Fetch data for current batch

270

batch_tasks = [

271

async_api.get_bars_async(

272

symbol,

273

'2023-01-01',

274

'2023-12-31',

275

TimeFrame.Day

276

)

277

for symbol in batch

278

]

279

280

batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

281

282

# Process batch results

283

for j, (symbol, df) in enumerate(batch_results):

284

if isinstance(df, Exception):

285

print(f" Error with {batch[j]}: {df}")

286

continue

287

288

if not df.empty:

289

# Calculate key metrics

290

annual_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1

291

volatility = df['close'].pct_change().std() * (252 ** 0.5)

292

max_drawdown = ((df['close'] / df['close'].cummax()) - 1).min()

293

294

results.append({

295

'symbol': symbol,

296

'annual_return': annual_return,

297

'volatility': volatility,

298

'max_drawdown': max_drawdown,

299

'sharpe_ratio': annual_return / volatility if volatility > 0 else 0

300

})

301

302

# Small delay between batches

303

await asyncio.sleep(1)

304

305

# Create summary DataFrame

306

summary_df = pd.DataFrame(results)

307

308

print("\nTop performers by Sharpe ratio:")

309

top_performers = summary_df.nlargest(5, 'sharpe_ratio')

310

for _, row in top_performers.iterrows():

311

print(f" {row['symbol']}: Sharpe={row['sharpe_ratio']:.2f}, "

312

f"Return={row['annual_return']:.2%}, Vol={row['volatility']:.2%}")

313

314

asyncio.run(batch_historical_analysis())

315

```

316

317

## Types

318

319

Async operations return the same data types as synchronous operations, but wrapped in tuples with the symbol name:

320

321

```python { .api }

322

# Async return types

323

Tuple[str, pd.DataFrame] # For bars, trades, quotes

324

Tuple[str, TradeV2] # For latest trade

325

Tuple[str, QuoteV2] # For latest quote

326

```

327

328

The pandas DataFrames returned have the same structure as synchronous operations but are optimized for async workflows and analysis.