0
# Async Market Data Operations
1
2
Asynchronous market data operations optimized for high-performance data retrieval with pandas DataFrame outputs. Designed for concurrent data fetching and analysis workflows.
3
4
## Capabilities
5
6
### Async Client Initialization
7
8
Create asynchronous REST client for non-blocking market data operations.
9
10
```python { .api }
11
class AsyncRest:
12
def __init__(
13
self,
14
key_id: str = None,
15
secret_key: str = None,
16
data_url: str = None,
17
api_version: str = None,
18
raw_data: bool = False
19
): ...
20
```
21
22
**Usage Example:**
23
24
```python
25
from alpaca_trade_api import AsyncRest
26
import asyncio
27
28
# Initialize async client
29
async_api = AsyncRest('your-key-id', 'your-secret-key')
30
```
31
32
### Async Historical Data
33
34
Retrieve historical market data asynchronously for improved performance when fetching data for multiple symbols.
35
36
```python { .api }
37
async def get_bars_async(
38
symbol: str,
39
start: str,
40
end: str,
41
timeframe: TimeFrame,
42
limit: int = None,
43
adjustment: str = None
44
) -> Tuple[str, pd.DataFrame]:
45
"""Get historical bars asynchronously."""
46
47
async def get_trades_async(
48
symbol: str,
49
start: str,
50
end: str,
51
limit: int = None
52
) -> Tuple[str, pd.DataFrame]:
53
"""Get historical trades asynchronously."""
54
55
async def get_quotes_async(
56
symbol: str,
57
start: str,
58
end: str,
59
limit: int = None
60
) -> Tuple[str, pd.DataFrame]:
61
"""Get historical quotes asynchronously."""
62
```
63
64
**Usage Examples:**
65
66
```python
67
import asyncio
68
import pandas as pd
69
from alpaca_trade_api import AsyncRest, TimeFrame
70
71
async def fetch_multiple_symbols():
72
async_api = AsyncRest('key', 'secret')
73
74
symbols = ['AAPL', 'TSLA', 'GOOGL', 'MSFT', 'AMZN']
75
76
# Fetch bars for multiple symbols concurrently
77
tasks = [
78
async_api.get_bars_async(
79
symbol,
80
'2023-01-01',
81
'2023-01-31',
82
TimeFrame.Day
83
)
84
for symbol in symbols
85
]
86
87
results = await asyncio.gather(*tasks)
88
89
# Process results
90
for symbol, df in results:
91
print(f"{symbol}: {len(df)} bars, Avg Volume: {df['volume'].mean():,.0f}")
92
93
# Calculate daily returns
94
df['returns'] = df['close'].pct_change()
95
volatility = df['returns'].std() * (252 ** 0.5) # Annualized volatility
96
print(f" Annualized Volatility: {volatility:.2%}")
97
98
# Run the async function
99
asyncio.run(fetch_multiple_symbols())
100
```
101
102
### Async Latest Data
103
104
Get the most recent market data asynchronously for real-time analysis.
105
106
```python { .api }
107
async def get_latest_trade_async(symbol: str) -> Tuple[str, TradeV2]:
108
"""Get latest trade asynchronously."""
109
110
async def get_latest_quote_async(symbol: str) -> Tuple[str, QuoteV2]:
111
"""Get latest quote asynchronously."""
112
```
113
114
**Usage Examples:**
115
116
```python
117
async def monitor_portfolio():
118
async_api = AsyncRest('key', 'secret')
119
portfolio_symbols = ['AAPL', 'TSLA', 'NVDA', 'AMD', 'GOOGL']
120
121
while True:
122
# Fetch latest trades for all symbols concurrently
123
tasks = [async_api.get_latest_trade_async(symbol) for symbol in portfolio_symbols]
124
results = await asyncio.gather(*tasks)
125
126
print(f"Portfolio Update - {pd.Timestamp.now()}")
127
total_value = 0
128
129
for symbol, trade in results:
130
# Assume we hold 100 shares of each
131
position_value = trade.price * 100
132
total_value += position_value
133
print(f" {symbol}: ${trade.price:.2f} (Value: ${position_value:,.2f})")
134
135
print(f"Total Portfolio Value: ${total_value:,.2f}")
136
print("-" * 50)
137
138
# Wait 30 seconds before next update
139
await asyncio.sleep(30)
140
141
# Run portfolio monitor
142
asyncio.run(monitor_portfolio())
143
```
144
145
### Concurrent Data Analysis
146
147
Combine async operations with pandas for efficient data analysis workflows.
148
149
```python { .api }
150
async def gather_with_concurrency(n: int, *tasks) -> List:
151
"""Execute async tasks with concurrency limit."""
152
```
153
154
**Usage Examples:**
155
156
```python
157
async def sector_analysis():
158
async_api = AsyncRest('key', 'secret')
159
160
# Define sector ETFs and major stocks
161
sectors = {
162
'Technology': ['QQQ', 'AAPL', 'MSFT', 'GOOGL', 'META'],
163
'Energy': ['XLE', 'XOM', 'CVX', 'COP', 'SLB'],
164
'Healthcare': ['XLV', 'JNJ', 'PFE', 'UNH', 'ABBV'],
165
'Finance': ['XLF', 'JPM', 'BAC', 'WFC', 'GS']
166
}
167
168
all_symbols = []
169
for sector_symbols in sectors.values():
170
all_symbols.extend(sector_symbols)
171
172
# Limit concurrency to avoid rate limits
173
tasks = [
174
async_api.get_bars_async(
175
symbol,
176
'2023-01-01',
177
'2023-12-31',
178
TimeFrame.Day
179
)
180
for symbol in all_symbols
181
]
182
183
# Use concurrency control
184
results = await gather_with_concurrency(10, *tasks)
185
186
# Organize results by sector
187
sector_performance = {}
188
result_dict = dict(results)
189
190
for sector, symbols in sectors.items():
191
sector_returns = []
192
193
for symbol in symbols:
194
if symbol in result_dict:
195
df = result_dict[symbol]
196
if not df.empty:
197
# Calculate total return for the year
198
total_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
199
sector_returns.append(total_return)
200
201
if sector_returns:
202
avg_return = sum(sector_returns) / len(sector_returns)
203
sector_performance[sector] = avg_return
204
205
# Display sector performance
206
print("2023 Sector Performance:")
207
for sector, performance in sorted(sector_performance.items(), key=lambda x: x[1], reverse=True):
208
print(f" {sector}: {performance:.2%}")
209
210
asyncio.run(sector_analysis())
211
```
212
213
### Error Handling in Async Operations
214
215
Handle errors gracefully in async operations with proper exception handling.
216
217
**Usage Examples:**
218
219
```python
220
async def robust_data_fetch():
221
async_api = AsyncRest('key', 'secret')
222
symbols = ['AAPL', 'INVALID_SYMBOL', 'TSLA', 'GOOGL']
223
224
async def safe_fetch(symbol):
225
try:
226
return await async_api.get_bars_async(
227
symbol,
228
'2023-01-01',
229
'2023-01-31',
230
TimeFrame.Day
231
)
232
except Exception as e:
233
print(f"Error fetching {symbol}: {e}")
234
return symbol, pd.DataFrame() # Return empty DataFrame on error
235
236
# Fetch data for all symbols, handling errors gracefully
237
results = await asyncio.gather(*[safe_fetch(symbol) for symbol in symbols])
238
239
# Process successful results
240
successful_results = [(symbol, df) for symbol, df in results if not df.empty]
241
242
print(f"Successfully fetched data for {len(successful_results)} out of {len(symbols)} symbols")
243
for symbol, df in successful_results:
244
print(f" {symbol}: {len(df)} bars")
245
```
246
247
### Batch Processing
248
249
Efficiently process large datasets using async operations with batching.
250
251
**Usage Examples:**
252
253
```python
254
async def batch_historical_analysis():
255
async_api = AsyncRest('key', 'secret')
256
257
# Large list of symbols to analyze
258
all_symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA', 'AMD',
259
'NFLX', 'CRM', 'ORCL', 'ADBE', 'INTC', 'CSCO', 'IBM', 'HPE']
260
261
batch_size = 5
262
results = []
263
264
# Process symbols in batches to manage memory and rate limits
265
for i in range(0, len(all_symbols), batch_size):
266
batch = all_symbols[i:i + batch_size]
267
print(f"Processing batch {i//batch_size + 1}: {batch}")
268
269
# Fetch data for current batch
270
batch_tasks = [
271
async_api.get_bars_async(
272
symbol,
273
'2023-01-01',
274
'2023-12-31',
275
TimeFrame.Day
276
)
277
for symbol in batch
278
]
279
280
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
281
282
# Process batch results
283
for j, (symbol, df) in enumerate(batch_results):
284
if isinstance(df, Exception):
285
print(f" Error with {batch[j]}: {df}")
286
continue
287
288
if not df.empty:
289
# Calculate key metrics
290
annual_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
291
volatility = df['close'].pct_change().std() * (252 ** 0.5)
292
max_drawdown = ((df['close'] / df['close'].cummax()) - 1).min()
293
294
results.append({
295
'symbol': symbol,
296
'annual_return': annual_return,
297
'volatility': volatility,
298
'max_drawdown': max_drawdown,
299
'sharpe_ratio': annual_return / volatility if volatility > 0 else 0
300
})
301
302
# Small delay between batches
303
await asyncio.sleep(1)
304
305
# Create summary DataFrame
306
summary_df = pd.DataFrame(results)
307
308
print("\nTop performers by Sharpe ratio:")
309
top_performers = summary_df.nlargest(5, 'sharpe_ratio')
310
for _, row in top_performers.iterrows():
311
print(f" {row['symbol']}: Sharpe={row['sharpe_ratio']:.2f}, "
312
f"Return={row['annual_return']:.2%}, Vol={row['volatility']:.2%}")
313
314
asyncio.run(batch_historical_analysis())
315
```
316
317
## Types
318
319
Async operations return the same data types as synchronous operations, but wrapped in tuples with the symbol name:
320
321
```python { .api }
322
# Async return types
323
Tuple[str, pd.DataFrame] # For bars, trades, quotes
324
Tuple[str, TradeV2] # For latest trade
325
Tuple[str, QuoteV2] # For latest quote
326
```
327
328
The pandas DataFrames returned have the same structure as synchronous operations but are optimized for async workflows and analysis.