CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-alpaca-trade-api

Python client library for Alpaca's commission-free trading API with support for both REST and streaming data interfaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Async Market Data Operations

Asynchronous market data operations optimized for high-performance data retrieval with pandas DataFrame outputs. Designed for concurrent data fetching and analysis workflows.

Capabilities

Async Client Initialization

Create asynchronous REST client for non-blocking market data operations.

class AsyncRest:
    def __init__(
        self,
        key_id: str = None,
        secret_key: str = None,
        data_url: str = None,
        api_version: str = None,
        raw_data: bool = False
    ): ...

Usage Example:

from alpaca_trade_api import AsyncRest
import asyncio

# Initialize async client  
async_api = AsyncRest('your-key-id', 'your-secret-key')

Async Historical Data

Retrieve historical market data asynchronously for improved performance when fetching data for multiple symbols.

async def get_bars_async(
    symbol: str,
    start: str,
    end: str,
    timeframe: TimeFrame,
    limit: int = None,
    adjustment: str = None
) -> Tuple[str, pd.DataFrame]:
    """Get historical bars asynchronously."""

async def get_trades_async(
    symbol: str,
    start: str,
    end: str,
    limit: int = None
) -> Tuple[str, pd.DataFrame]:
    """Get historical trades asynchronously."""

async def get_quotes_async(
    symbol: str,
    start: str,
    end: str,
    limit: int = None
) -> Tuple[str, pd.DataFrame]:
    """Get historical quotes asynchronously."""

Usage Examples:

import asyncio
import pandas as pd
from alpaca_trade_api import AsyncRest, TimeFrame

async def fetch_multiple_symbols():
    async_api = AsyncRest('key', 'secret')
    
    symbols = ['AAPL', 'TSLA', 'GOOGL', 'MSFT', 'AMZN']
    
    # Fetch bars for multiple symbols concurrently
    tasks = [
        async_api.get_bars_async(
            symbol, 
            '2023-01-01', 
            '2023-01-31',
            TimeFrame.Day
        ) 
        for symbol in symbols
    ]
    
    results = await asyncio.gather(*tasks)
    
    # Process results
    for symbol, df in results:
        print(f"{symbol}: {len(df)} bars, Avg Volume: {df['volume'].mean():,.0f}")
        
        # Calculate daily returns
        df['returns'] = df['close'].pct_change()
        volatility = df['returns'].std() * (252 ** 0.5)  # Annualized volatility
        print(f"  Annualized Volatility: {volatility:.2%}")

# Run the async function
asyncio.run(fetch_multiple_symbols())

Async Latest Data

Get the most recent market data asynchronously for real-time analysis.

async def get_latest_trade_async(symbol: str) -> Tuple[str, TradeV2]:
    """Get latest trade asynchronously."""

async def get_latest_quote_async(symbol: str) -> Tuple[str, QuoteV2]:
    """Get latest quote asynchronously."""

Usage Examples:

async def monitor_portfolio():
    async_api = AsyncRest('key', 'secret')
    portfolio_symbols = ['AAPL', 'TSLA', 'NVDA', 'AMD', 'GOOGL']
    
    while True:
        # Fetch latest trades for all symbols concurrently
        tasks = [async_api.get_latest_trade_async(symbol) for symbol in portfolio_symbols]
        results = await asyncio.gather(*tasks)
        
        print(f"Portfolio Update - {pd.Timestamp.now()}")
        total_value = 0
        
        for symbol, trade in results:
            # Assume we hold 100 shares of each
            position_value = trade.price * 100
            total_value += position_value
            print(f"  {symbol}: ${trade.price:.2f} (Value: ${position_value:,.2f})")
        
        print(f"Total Portfolio Value: ${total_value:,.2f}")
        print("-" * 50)
        
        # Wait 30 seconds before next update
        await asyncio.sleep(30)

# Run portfolio monitor
asyncio.run(monitor_portfolio())

Concurrent Data Analysis

Combine async operations with pandas for efficient data analysis workflows.

async def gather_with_concurrency(n: int, *tasks) -> List:
    """Execute async tasks with concurrency limit."""

Usage Examples:

async def sector_analysis():
    async_api = AsyncRest('key', 'secret')
    
    # Define sector ETFs and major stocks
    sectors = {
        'Technology': ['QQQ', 'AAPL', 'MSFT', 'GOOGL', 'META'],
        'Energy': ['XLE', 'XOM', 'CVX', 'COP', 'SLB'],
        'Healthcare': ['XLV', 'JNJ', 'PFE', 'UNH', 'ABBV'],
        'Finance': ['XLF', 'JPM', 'BAC', 'WFC', 'GS']
    }
    
    all_symbols = []
    for sector_symbols in sectors.values():
        all_symbols.extend(sector_symbols)
    
    # Limit concurrency to avoid rate limits
    tasks = [
        async_api.get_bars_async(
            symbol,
            '2023-01-01',
            '2023-12-31', 
            TimeFrame.Day
        )
        for symbol in all_symbols
    ]
    
    # Use concurrency control
    results = await gather_with_concurrency(10, *tasks)
    
    # Organize results by sector
    sector_performance = {}
    result_dict = dict(results)
    
    for sector, symbols in sectors.items():
        sector_returns = []
        
        for symbol in symbols:
            if symbol in result_dict:
                df = result_dict[symbol]
                if not df.empty:
                    # Calculate total return for the year
                    total_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
                    sector_returns.append(total_return)
        
        if sector_returns:
            avg_return = sum(sector_returns) / len(sector_returns)
            sector_performance[sector] = avg_return
    
    # Display sector performance
    print("2023 Sector Performance:")
    for sector, performance in sorted(sector_performance.items(), key=lambda x: x[1], reverse=True):
        print(f"  {sector}: {performance:.2%}")

asyncio.run(sector_analysis())

Error Handling in Async Operations

Handle errors gracefully in async operations with proper exception handling.

Usage Examples:

async def robust_data_fetch():
    async_api = AsyncRest('key', 'secret')
    symbols = ['AAPL', 'INVALID_SYMBOL', 'TSLA', 'GOOGL']
    
    async def safe_fetch(symbol):
        try:
            return await async_api.get_bars_async(
                symbol,
                '2023-01-01',
                '2023-01-31',
                TimeFrame.Day
            )
        except Exception as e:
            print(f"Error fetching {symbol}: {e}")
            return symbol, pd.DataFrame()  # Return empty DataFrame on error
    
    # Fetch data for all symbols, handling errors gracefully
    results = await asyncio.gather(*[safe_fetch(symbol) for symbol in symbols])
    
    # Process successful results
    successful_results = [(symbol, df) for symbol, df in results if not df.empty]
    
    print(f"Successfully fetched data for {len(successful_results)} out of {len(symbols)} symbols")
    for symbol, df in successful_results:
        print(f"  {symbol}: {len(df)} bars")

Batch Processing

Efficiently process large datasets using async operations with batching.

Usage Examples:

async def batch_historical_analysis():
    async_api = AsyncRest('key', 'secret')
    
    # Large list of symbols to analyze
    all_symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA', 'AMD', 
                   'NFLX', 'CRM', 'ORCL', 'ADBE', 'INTC', 'CSCO', 'IBM', 'HPE']
    
    batch_size = 5
    results = []
    
    # Process symbols in batches to manage memory and rate limits
    for i in range(0, len(all_symbols), batch_size):
        batch = all_symbols[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1}: {batch}")
        
        # Fetch data for current batch
        batch_tasks = [
            async_api.get_bars_async(
                symbol,
                '2023-01-01',
                '2023-12-31',
                TimeFrame.Day
            )
            for symbol in batch
        ]
        
        batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
        
        # Process batch results
        for j, (symbol, df) in enumerate(batch_results):
            if isinstance(df, Exception):
                print(f"  Error with {batch[j]}: {df}")
                continue
                
            if not df.empty:
                # Calculate key metrics
                annual_return = (df['close'].iloc[-1] / df['close'].iloc[0]) - 1
                volatility = df['close'].pct_change().std() * (252 ** 0.5)
                max_drawdown = ((df['close'] / df['close'].cummax()) - 1).min()
                
                results.append({
                    'symbol': symbol,
                    'annual_return': annual_return,
                    'volatility': volatility,
                    'max_drawdown': max_drawdown,
                    'sharpe_ratio': annual_return / volatility if volatility > 0 else 0
                })
        
        # Small delay between batches
        await asyncio.sleep(1)
    
    # Create summary DataFrame
    summary_df = pd.DataFrame(results)
    
    print("\nTop performers by Sharpe ratio:")
    top_performers = summary_df.nlargest(5, 'sharpe_ratio')
    for _, row in top_performers.iterrows():
        print(f"  {row['symbol']}: Sharpe={row['sharpe_ratio']:.2f}, "
              f"Return={row['annual_return']:.2%}, Vol={row['volatility']:.2%}")

asyncio.run(batch_historical_analysis())

Types

Async operations return the same data types as synchronous operations, but wrapped in tuples with the symbol name:

# Async return types
Tuple[str, pd.DataFrame]  # For bars, trades, quotes
Tuple[str, TradeV2]       # For latest trade
Tuple[str, QuoteV2]       # For latest quote

The pandas DataFrames returned have the same structure as synchronous operations but are optimized for async workflows and analysis.

Install with Tessl CLI

npx tessl i tessl/pypi-alpaca-trade-api

docs

async-operations.md

cryptocurrency.md

index.md

market-data.md

streaming.md

trading-operations.md

tile.json