CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-alpaca-trade-api

Python client library for Alpaca's commission-free trading API with support for both REST and streaming data interfaces

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streaming.mddocs/

Real-time Streaming

WebSocket streaming for real-time market data feeds and trading account updates. Supports subscription-based data streams with handler functions and decorator patterns for event processing.

Capabilities

Stream Initialization

Create and configure streaming connections with authentication and data feed selection.

class Stream:
    def __init__(
        self,
        key_id: str = None,
        secret_key: str = None,
        base_url: str = None,
        data_stream_url: str = None,
        data_feed: str = 'iex',
        raw_data: bool = False,
        crypto_exchanges: List[str] = None,
        websocket_params: Dict = None
    ): ...

Usage Example:

import alpaca_trade_api as tradeapi

# Initialize stream with credentials
stream = tradeapi.Stream('your-key-id', 'your-secret-key', data_feed='iex')

Market Data Subscriptions

Subscribe to real-time market data feeds including trades, quotes, and bars.

def subscribe_trades(handler: Callable, *symbols: str, 
                    handler_cancel_errors: Callable = None,
                    handler_corrections: Callable = None) -> None:
    """Subscribe to trade data streams."""

def subscribe_quotes(handler: Callable, *symbols: str) -> None:
    """Subscribe to quote data streams."""

def subscribe_bars(handler: Callable, *symbols: str) -> None:
    """Subscribe to minute bar data streams."""

def subscribe_updated_bars(handler: Callable, *symbols: str) -> None:
    """Subscribe to updated bar data streams."""

def subscribe_daily_bars(handler: Callable, *symbols: str) -> None:
    """Subscribe to daily bar data streams."""

def subscribe_statuses(handler: Callable, *symbols: str) -> None:
    """Subscribe to trading status updates."""

def subscribe_lulds(handler: Callable, *symbols: str) -> None:
    """Subscribe to Limit Up Limit Down updates."""

Usage Examples:

# Define handler functions
def handle_trade(trade):
    print(f"Trade: {trade.symbol} {trade.size} @ ${trade.price}")

def handle_quote(quote):
    spread = quote.ask_price - quote.bid_price
    print(f"Quote: {quote.symbol} Bid=${quote.bid_price} Ask=${quote.ask_price} Spread=${spread:.2f}")

def handle_bar(bar):
    print(f"Bar: {bar.symbol} O=${bar.open} H=${bar.high} L=${bar.low} C=${bar.close} V={bar.volume}")

# Subscribe to streams
stream.subscribe_trades(handle_trade, 'AAPL', 'TSLA')
stream.subscribe_quotes(handle_quote, 'AAPL', 'TSLA', 'GOOGL')
stream.subscribe_bars(handle_bar, 'SPY')

Trading Account Updates

Subscribe to real-time updates for trading account events and order status changes.

def subscribe_trade_updates(handler: Callable) -> None:
    """Subscribe to trading account updates."""

Usage Example:

def handle_trade_update(update):
    print(f"Trade Update: {update.event} - Order {update.order.id}")
    if update.event == 'fill':
        print(f"  Filled: {update.order.symbol} {update.order.side} {update.order.filled_qty}")
    elif update.event == 'partial_fill':
        print(f"  Partial Fill: {update.order.filled_qty}/{update.order.qty}")
    elif update.event == 'canceled':
        print(f"  Canceled: {update.order.symbol}")

stream.subscribe_trade_updates(handle_trade_update)

Decorator Pattern

Use decorators for clean event handler registration.

def on_trade(*symbols: str) -> Callable:
    """Decorator for trade event handlers."""

def on_quote(*symbols: str) -> Callable:
    """Decorator for quote event handlers."""

def on_bar(*symbols: str) -> Callable:
    """Decorator for bar event handlers."""

def on_updated_bar(*symbols: str) -> Callable:
    """Decorator for updated bar event handlers."""

def on_daily_bar(*symbols: str) -> Callable:
    """Decorator for daily bar event handlers."""

def on_status(*symbols: str) -> Callable:
    """Decorator for status event handlers."""

def on_luld(*symbols: str) -> Callable:
    """Decorator for LULD event handlers."""

def on_trade_update() -> Callable:
    """Decorator for trade update handlers."""

def on_cancel_error(*symbols: str) -> Callable:
    """Decorator for cancel error handlers."""

def on_correction(*symbols: str) -> Callable:
    """Decorator for correction handlers."""

Usage Examples:

# Using decorators for clean handler registration
@stream.on_trade('AAPL', 'TSLA')
def trade_handler(trade):
    if trade.size >= 1000:  # Large trades only
        print(f"Large trade: {trade.symbol} {trade.size} shares @ ${trade.price}")

@stream.on_quote('AAPL', 'MSFT', 'GOOGL')
def quote_handler(quote):
    spread_bps = ((quote.ask_price - quote.bid_price) / quote.bid_price) * 10000
    if spread_bps > 10:  # Wide spreads
        print(f"Wide spread: {quote.symbol} {spread_bps:.1f} bps")

@stream.on_bar('SPY', 'QQQ')  
def bar_handler(bar):
    # Calculate momentum
    body = abs(bar.close - bar.open)
    range_size = bar.high - bar.low
    momentum = body / range_size if range_size > 0 else 0
    print(f"Bar momentum: {bar.symbol} {momentum:.2f}")

@stream.on_trade_update()
def trade_update_handler(update):
    if update.event in ['fill', 'partial_fill']:
        print(f"Execution: {update.order.symbol} {update.order.side} {update.order.filled_qty} @ ${update.price}")

Stream Management

Control streaming connections and manage subscriptions.

def run() -> None:
    """Start the streaming connection (blocking)."""

def stop() -> None:
    """Stop the streaming connection."""

async def stop_ws() -> None:
    """Stop WebSocket connections (async)."""

def is_open() -> bool:
    """Check if WebSocket connection is open."""

Usage Examples:

# Start streaming (blocking)
try:
    print("Starting stream...")
    stream.run()
except KeyboardInterrupt:
    print("Stream stopped by user")

# Non-blocking usage with threading
import threading

def start_stream():
    stream.run()

stream_thread = threading.Thread(target=start_stream)
stream_thread.daemon = True
stream_thread.start()

# Your main application logic here
time.sleep(60)
stream.stop()

Unsubscription

Remove subscriptions when no longer needed.

def unsubscribe_trades(*symbols: str) -> None:
    """Unsubscribe from trade streams."""

def unsubscribe_quotes(*symbols: str) -> None:
    """Unsubscribe from quote streams."""

def unsubscribe_bars(*symbols: str) -> None:
    """Unsubscribe from bar streams."""

def unsubscribe_updated_bars(*symbols: str) -> None:
    """Unsubscribe from updated bar streams."""

def unsubscribe_daily_bars(*symbols: str) -> None:
    """Unsubscribe from daily bar streams."""

def unsubscribe_statuses(*symbols: str) -> None:
    """Unsubscribe from status streams."""

def unsubscribe_lulds(*symbols: str) -> None:
    """Unsubscribe from LULD streams."""

def unsubscribe_crypto_trades(*symbols: str) -> None:
    """Unsubscribe from crypto trade streams."""

def unsubscribe_crypto_quotes(*symbols: str) -> None:
    """Unsubscribe from crypto quote streams."""

def unsubscribe_crypto_bars(*symbols: str) -> None:
    """Unsubscribe from crypto bar streams."""

def unsubscribe_crypto_updated_bars(*symbols: str) -> None:
    """Unsubscribe from crypto updated bar streams."""

def unsubscribe_crypto_daily_bars(*symbols: str) -> None:
    """Unsubscribe from crypto daily bar streams."""

def unsubscribe_crypto_orderbooks(*symbols: str) -> None:
    """Unsubscribe from crypto orderbook streams."""

def unsubscribe_news(*symbols: str) -> None:
    """Unsubscribe from news streams."""

Usage Example:

# Dynamic subscription management
def manage_subscriptions(portfolio_symbols):
    # Unsubscribe from old symbols
    stream.unsubscribe_trades('OLD1', 'OLD2')
    stream.unsubscribe_quotes('OLD1', 'OLD2')
    
    # Subscribe to new portfolio
    stream.subscribe_trades(handle_trade, *portfolio_symbols)
    stream.subscribe_quotes(handle_quote, *portfolio_symbols)

# Update subscriptions based on portfolio changes
new_portfolio = ['AAPL', 'TSLA', 'NVDA']
manage_subscriptions(new_portfolio)

News Streaming

Subscribe to real-time financial news feeds.

def subscribe_news(handler: Callable, *symbols: str) -> None:
    """Subscribe to news data streams."""

def on_news(*symbols: str) -> Callable:
    """Decorator for news event handlers."""

def unsubscribe_news(*symbols: str) -> None:
    """Unsubscribe from news streams."""

Usage Examples:

@stream.on_news('AAPL', 'TSLA')
def news_handler(news):
    print(f"News Alert: {news.headline}")
    print(f"Symbols: {', '.join(news.symbols)}")
    if any(word in news.headline.lower() for word in ['earnings', 'acquisition', 'merger']):
        print("⚠️  Market-moving news detected!")

# Or with handler function
def handle_market_news(news):
    # Process news for sentiment analysis or trading signals
    sentiment_score = analyze_sentiment(news.summary)  # Your function
    for symbol in news.symbols:
        update_symbol_sentiment(symbol, sentiment_score)  # Your function

stream.subscribe_news(handle_market_news, 'AAPL', 'TSLA', 'GOOGL')

Types

class TradeUpdate:
    @property
    def event(self) -> str: ...  # 'new', 'fill', 'partial_fill', 'canceled', 'expired', etc.
    @property
    def order(self) -> Order: ...
    @property
    def timestamp(self) -> pd.Timestamp: ...
    @property
    def position_qty(self) -> int: ...
    @property
    def price(self) -> float: ...
    @property
    def qty(self) -> int: ...

# Market data types are the same as in Market Data section:
# TradeV2, QuoteV2, BarV2, StatusV2, LULDV2, NewsV2, etc.

Install with Tessl CLI

npx tessl i tessl/pypi-alpaca-trade-api

docs

async-operations.md

cryptocurrency.md

index.md

market-data.md

streaming.md

trading-operations.md

tile.json