Python client library for Alpaca's commission-free trading API with support for both REST and streaming data interfaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asynchronous market data operations optimized for high-performance data retrieval with pandas DataFrame outputs. Designed for concurrent data fetching and analysis workflows.
Create asynchronous REST client for non-blocking market data operations.
class AsyncRest:
def __init__(
self,
key_id: str = None,
secret_key: str = None,
data_url: str = None,
api_version: str = None,
raw_data: bool = False
): ...Usage Example:
from alpaca_trade_api import AsyncRest
import asyncio
# Initialize async client
async_api = AsyncRest('your-key-id', 'your-secret-key')Retrieve historical market data asynchronously for improved performance when fetching data for multiple symbols.
async def get_bars_async(
symbol: str,
start: str,
end: str,
timeframe: TimeFrame,
limit: int = None,
adjustment: str = None
) -> Tuple[str, pd.DataFrame]:
"""Get historical bars asynchronously."""
async def get_trades_async(
symbol: str,
start: str,
end: str,
limit: int = None
) -> Tuple[str, pd.DataFrame]:
"""Get historical trades asynchronously."""
async def get_quotes_async(
symbol: str,
start: str,
end: str,
limit: int = None
) -> Tuple[str, pd.DataFrame]:
"""Get historical quotes asynchronously."""Usage Examples:
import asyncio
import pandas as pd
from alpaca_trade_api import AsyncRest, TimeFrame
async def fetch_multiple_symbols():
async_api = AsyncRest('key', 'secret')
symbols = ['AAPL', 'TSLA', 'GOOGL', 'MSFT', 'AMZN']
# Fetch bars for multiple symbols concurrently
tasks = [
async_api.get_bars_async(
symbol,
'2023-01-01',
'2023-01-31',
TimeFrame.Day
)
for symbol in symbols
]
results = await asyncio.gather(*tasks)
# Process results
for symbol, df in results:
print(f"{symbol}: {len(df)} bars, Avg Volume: {df['volume'].mean():,.0f}")
# Calculate daily returns
df['returns'] = df['close'].pct_change()
volatility = df['returns'].std() * (252 ** 0.5) # Annualized volatility
print(f" Annualized Volatility: {volatility:.2%}")
# Run the async function
asyncio.run(fetch_multiple_symbols())Get the most recent market data asynchronously for real-time analysis.
async def get_latest_trade_async(symbol: str) -> Tuple[str, TradeV2]:
"""Get latest trade asynchronously."""
async def get_latest_quote_async(symbol: str) -> Tuple[str, QuoteV2]:
"""Get latest quote asynchronously."""Usage Examples:
async def monitor_portfolio():
async_api = AsyncRest('key', 'secret')
portfolio_symbols = ['AAPL', 'TSLA', 'NVDA', 'AMD', 'GOOGL']
while True:
# Fetch latest trades for all symbols concurrently
tasks = [async_api.get_latest_trade_async(symbol) for symbol in portfolio_symbols]
results = await asyncio.gather(*tasks)
print(f"Portfolio Update - {pd.Timestamp.now()}")
total_value = 0
for symbol, trade in results:
# Assume we hold 100 shares of each
position_value = trade.price * 100
total_value += position_value
print(f" {symbol}: ${trade.price:.2f} (Value: ${position_value:,.2f})")
print(f"Total Portfolio Value: ${total_value:,.2f}")
print("-" * 50)
# Wait 30 seconds before next update
await asyncio.sleep(30)
# Run portfolio monitor
asyncio.run(monitor_portfolio())Combine async operations with pandas for efficient data analysis workflows.
async def gather_with_concurrency(n: int, *tasks) -> List:
"""Execute async tasks with concurrency limit."""Usage Examples:
async def sector_analysis():
async_api = AsyncRest('key', 'secret')
# Define sector ETFs and major stocks
sectors = {
'Technology': ['QQQ', 'AAPL', 'MSFT', 'GOOGL', 'META'],
'Energy': ['XLE', 'XOM', 'CVX', 'COP', 'SLB'],
'Healthcare': ['XLV', 'JNJ', 'PFE', 'UNH', 'ABBV'],
'Finance': ['XLF', 'JPM', 'BAC', 'WFC', 'GS']
}
all_symbols = []
for sector_symbols in sectors.values():
all_symbols.extend(sector_symbols)
# Limit concurrency to avoid rate limits
tasks = [
async_api.get_bars_async(
symbol,
'2023-01-01',
'2023-12-31',
TimeFrame.Day
)
for symbol in all_symbols
]
# Use concurrency control
results = await gather_with_concurrency(10, *tasks)
# Organize results by sector
sector_performance = {}
result_dict = dict(results)
for sector, symbols in sectors.items():
sector_returns = []
for symbol in symbols:
if symbol in result_dict:
df = result_dict[symbol]
if not df.empty:
# Calculate total return for the year
total_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
sector_returns.append(total_return)
if sector_returns:
avg_return = sum(sector_returns) / len(sector_returns)
sector_performance[sector] = avg_return
# Display sector performance
print("2023 Sector Performance:")
for sector, performance in sorted(sector_performance.items(), key=lambda x: x[1], reverse=True):
print(f" {sector}: {performance:.2%}")
asyncio.run(sector_analysis())Handle errors gracefully in async operations with proper exception handling.
Usage Examples:
async def robust_data_fetch():
async_api = AsyncRest('key', 'secret')
symbols = ['AAPL', 'INVALID_SYMBOL', 'TSLA', 'GOOGL']
async def safe_fetch(symbol):
try:
return await async_api.get_bars_async(
symbol,
'2023-01-01',
'2023-01-31',
TimeFrame.Day
)
except Exception as e:
print(f"Error fetching {symbol}: {e}")
return symbol, pd.DataFrame() # Return empty DataFrame on error
# Fetch data for all symbols, handling errors gracefully
results = await asyncio.gather(*[safe_fetch(symbol) for symbol in symbols])
# Process successful results
successful_results = [(symbol, df) for symbol, df in results if not df.empty]
print(f"Successfully fetched data for {len(successful_results)} out of {len(symbols)} symbols")
for symbol, df in successful_results:
print(f" {symbol}: {len(df)} bars")Efficiently process large datasets using async operations with batching.
Usage Examples:
async def batch_historical_analysis():
async_api = AsyncRest('key', 'secret')
# Large list of symbols to analyze
all_symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA', 'AMD',
'NFLX', 'CRM', 'ORCL', 'ADBE', 'INTC', 'CSCO', 'IBM', 'HPE']
batch_size = 5
results = []
# Process symbols in batches to manage memory and rate limits
for i in range(0, len(all_symbols), batch_size):
batch = all_symbols[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {batch}")
# Fetch data for current batch
batch_tasks = [
async_api.get_bars_async(
symbol,
'2023-01-01',
'2023-12-31',
TimeFrame.Day
)
for symbol in batch
]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# Process batch results
for j, (symbol, df) in enumerate(batch_results):
if isinstance(df, Exception):
print(f" Error with {batch[j]}: {df}")
continue
if not df.empty:
# Calculate key metrics
annual_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
volatility = df['close'].pct_change().std() * (252 ** 0.5)
max_drawdown = ((df['close'] / df['close'].cummax()) - 1).min()
results.append({
'symbol': symbol,
'annual_return': annual_return,
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': annual_return / volatility if volatility > 0 else 0
})
# Small delay between batches
await asyncio.sleep(1)
# Create summary DataFrame
summary_df = pd.DataFrame(results)
print("\nTop performers by Sharpe ratio:")
top_performers = summary_df.nlargest(5, 'sharpe_ratio')
for _, row in top_performers.iterrows():
print(f" {row['symbol']}: Sharpe={row['sharpe_ratio']:.2f}, "
f"Return={row['annual_return']:.2%}, Vol={row['volatility']:.2%}")
asyncio.run(batch_historical_analysis())Async operations return the same data types as synchronous operations, but wrapped in tuples with the symbol name:
# Async return types
Tuple[str, pd.DataFrame] # For bars, trades, quotes
Tuple[str, TradeV2] # For latest trade
Tuple[str, QuoteV2] # For latest quoteThe pandas DataFrames returned have the same structure as synchronous operations but are optimized for async workflows and analysis.
Install with Tessl CLI
npx tessl i tessl/pypi-alpaca-trade-api