Python client library for Alpaca's commission-free trading API with support for both REST and streaming data interfaces
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
WebSocket streaming for real-time market data feeds and trading account updates. Supports subscription-based data streams with handler functions and decorator patterns for event processing.
Create and configure streaming connections with authentication and data feed selection.
class Stream:
def __init__(
self,
key_id: str = None,
secret_key: str = None,
base_url: str = None,
data_stream_url: str = None,
data_feed: str = 'iex',
raw_data: bool = False,
crypto_exchanges: List[str] = None,
websocket_params: Dict = None
): ...Usage Example:
import alpaca_trade_api as tradeapi
# Initialize stream with credentials
stream = tradeapi.Stream('your-key-id', 'your-secret-key', data_feed='iex')Subscribe to real-time market data feeds including trades, quotes, and bars.
def subscribe_trades(handler: Callable, *symbols: str,
handler_cancel_errors: Callable = None,
handler_corrections: Callable = None) -> None:
"""Subscribe to trade data streams."""
def subscribe_quotes(handler: Callable, *symbols: str) -> None:
"""Subscribe to quote data streams."""
def subscribe_bars(handler: Callable, *symbols: str) -> None:
"""Subscribe to minute bar data streams."""
def subscribe_updated_bars(handler: Callable, *symbols: str) -> None:
"""Subscribe to updated bar data streams."""
def subscribe_daily_bars(handler: Callable, *symbols: str) -> None:
"""Subscribe to daily bar data streams."""
def subscribe_statuses(handler: Callable, *symbols: str) -> None:
"""Subscribe to trading status updates."""
def subscribe_lulds(handler: Callable, *symbols: str) -> None:
"""Subscribe to Limit Up Limit Down updates."""Usage Examples:
# Define handler functions
def handle_trade(trade):
print(f"Trade: {trade.symbol} {trade.size} @ ${trade.price}")
def handle_quote(quote):
spread = quote.ask_price - quote.bid_price
print(f"Quote: {quote.symbol} Bid=${quote.bid_price} Ask=${quote.ask_price} Spread=${spread:.2f}")
def handle_bar(bar):
print(f"Bar: {bar.symbol} O=${bar.open} H=${bar.high} L=${bar.low} C=${bar.close} V={bar.volume}")
# Subscribe to streams
stream.subscribe_trades(handle_trade, 'AAPL', 'TSLA')
stream.subscribe_quotes(handle_quote, 'AAPL', 'TSLA', 'GOOGL')
stream.subscribe_bars(handle_bar, 'SPY')Subscribe to real-time updates for trading account events and order status changes.
def subscribe_trade_updates(handler: Callable) -> None:
"""Subscribe to trading account updates."""Usage Example:
def handle_trade_update(update):
print(f"Trade Update: {update.event} - Order {update.order.id}")
if update.event == 'fill':
print(f" Filled: {update.order.symbol} {update.order.side} {update.order.filled_qty}")
elif update.event == 'partial_fill':
print(f" Partial Fill: {update.order.filled_qty}/{update.order.qty}")
elif update.event == 'canceled':
print(f" Canceled: {update.order.symbol}")
stream.subscribe_trade_updates(handle_trade_update)Use decorators for clean event handler registration.
def on_trade(*symbols: str) -> Callable:
"""Decorator for trade event handlers."""
def on_quote(*symbols: str) -> Callable:
"""Decorator for quote event handlers."""
def on_bar(*symbols: str) -> Callable:
"""Decorator for bar event handlers."""
def on_updated_bar(*symbols: str) -> Callable:
"""Decorator for updated bar event handlers."""
def on_daily_bar(*symbols: str) -> Callable:
"""Decorator for daily bar event handlers."""
def on_status(*symbols: str) -> Callable:
"""Decorator for status event handlers."""
def on_luld(*symbols: str) -> Callable:
"""Decorator for LULD event handlers."""
def on_trade_update() -> Callable:
"""Decorator for trade update handlers."""
def on_cancel_error(*symbols: str) -> Callable:
"""Decorator for cancel error handlers."""
def on_correction(*symbols: str) -> Callable:
"""Decorator for correction handlers."""Usage Examples:
# Using decorators for clean handler registration
@stream.on_trade('AAPL', 'TSLA')
def trade_handler(trade):
if trade.size >= 1000: # Large trades only
print(f"Large trade: {trade.symbol} {trade.size} shares @ ${trade.price}")
@stream.on_quote('AAPL', 'MSFT', 'GOOGL')
def quote_handler(quote):
spread_bps = ((quote.ask_price - quote.bid_price) / quote.bid_price) * 10000
if spread_bps > 10: # Wide spreads
print(f"Wide spread: {quote.symbol} {spread_bps:.1f} bps")
@stream.on_bar('SPY', 'QQQ')
def bar_handler(bar):
# Calculate momentum
body = abs(bar.close - bar.open)
range_size = bar.high - bar.low
momentum = body / range_size if range_size > 0 else 0
print(f"Bar momentum: {bar.symbol} {momentum:.2f}")
@stream.on_trade_update()
def trade_update_handler(update):
if update.event in ['fill', 'partial_fill']:
print(f"Execution: {update.order.symbol} {update.order.side} {update.order.filled_qty} @ ${update.price}")Control streaming connections and manage subscriptions.
def run() -> None:
"""Start the streaming connection (blocking)."""
def stop() -> None:
"""Stop the streaming connection."""
async def stop_ws() -> None:
"""Stop WebSocket connections (async)."""
def is_open() -> bool:
"""Check if WebSocket connection is open."""Usage Examples:
# Start streaming (blocking)
try:
print("Starting stream...")
stream.run()
except KeyboardInterrupt:
print("Stream stopped by user")
# Non-blocking usage with threading
import threading
def start_stream():
stream.run()
stream_thread = threading.Thread(target=start_stream)
stream_thread.daemon = True
stream_thread.start()
# Your main application logic here
time.sleep(60)
stream.stop()Remove subscriptions when no longer needed.
def unsubscribe_trades(*symbols: str) -> None:
"""Unsubscribe from trade streams."""
def unsubscribe_quotes(*symbols: str) -> None:
"""Unsubscribe from quote streams."""
def unsubscribe_bars(*symbols: str) -> None:
"""Unsubscribe from bar streams."""
def unsubscribe_updated_bars(*symbols: str) -> None:
"""Unsubscribe from updated bar streams."""
def unsubscribe_daily_bars(*symbols: str) -> None:
"""Unsubscribe from daily bar streams."""
def unsubscribe_statuses(*symbols: str) -> None:
"""Unsubscribe from status streams."""
def unsubscribe_lulds(*symbols: str) -> None:
"""Unsubscribe from LULD streams."""
def unsubscribe_crypto_trades(*symbols: str) -> None:
"""Unsubscribe from crypto trade streams."""
def unsubscribe_crypto_quotes(*symbols: str) -> None:
"""Unsubscribe from crypto quote streams."""
def unsubscribe_crypto_bars(*symbols: str) -> None:
"""Unsubscribe from crypto bar streams."""
def unsubscribe_crypto_updated_bars(*symbols: str) -> None:
"""Unsubscribe from crypto updated bar streams."""
def unsubscribe_crypto_daily_bars(*symbols: str) -> None:
"""Unsubscribe from crypto daily bar streams."""
def unsubscribe_crypto_orderbooks(*symbols: str) -> None:
"""Unsubscribe from crypto orderbook streams."""
def unsubscribe_news(*symbols: str) -> None:
"""Unsubscribe from news streams."""Usage Example:
# Dynamic subscription management
def manage_subscriptions(portfolio_symbols):
# Unsubscribe from old symbols
stream.unsubscribe_trades('OLD1', 'OLD2')
stream.unsubscribe_quotes('OLD1', 'OLD2')
# Subscribe to new portfolio
stream.subscribe_trades(handle_trade, *portfolio_symbols)
stream.subscribe_quotes(handle_quote, *portfolio_symbols)
# Update subscriptions based on portfolio changes
new_portfolio = ['AAPL', 'TSLA', 'NVDA']
manage_subscriptions(new_portfolio)Subscribe to real-time financial news feeds.
def subscribe_news(handler: Callable, *symbols: str) -> None:
"""Subscribe to news data streams."""
def on_news(*symbols: str) -> Callable:
"""Decorator for news event handlers."""
def unsubscribe_news(*symbols: str) -> None:
"""Unsubscribe from news streams."""Usage Examples:
@stream.on_news('AAPL', 'TSLA')
def news_handler(news):
print(f"News Alert: {news.headline}")
print(f"Symbols: {', '.join(news.symbols)}")
if any(word in news.headline.lower() for word in ['earnings', 'acquisition', 'merger']):
print("⚠️ Market-moving news detected!")
# Or with handler function
def handle_market_news(news):
# Process news for sentiment analysis or trading signals
sentiment_score = analyze_sentiment(news.summary) # Your function
for symbol in news.symbols:
update_symbol_sentiment(symbol, sentiment_score) # Your function
stream.subscribe_news(handle_market_news, 'AAPL', 'TSLA', 'GOOGL')class TradeUpdate:
@property
def event(self) -> str: ... # 'new', 'fill', 'partial_fill', 'canceled', 'expired', etc.
@property
def order(self) -> Order: ...
@property
def timestamp(self) -> pd.Timestamp: ...
@property
def position_qty(self) -> int: ...
@property
def price(self) -> float: ...
@property
def qty(self) -> int: ...
# Market data types are the same as in Market Data section:
# TradeV2, QuoteV2, BarV2, StatusV2, LULDV2, NewsV2, etc.Install with Tessl CLI
npx tessl i tessl/pypi-alpaca-trade-api