The Official Python SDK for Alpaca APIs providing access to trading, market data, and broker services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Data Client provides comprehensive access to historical and real-time market data across multiple asset classes including stocks, cryptocurrencies, options, and news. It supports both REST API calls for historical data and WebSocket streams for real-time data feeds.
from alpaca.data.historical import StockHistoricalDataClient
class StockHistoricalDataClient(RESTClient):
def __init__(
self,
api_key: Optional[str] = None,
secret_key: Optional[str] = None,
oauth_token: Optional[str] = None,
use_basic_auth: bool = False,
raw_data: bool = False,
url_override: Optional[str] = None,
sandbox: bool = False,
) -> None:
"""
Initialize stock historical data client.
Args:
api_key: Alpaca API key
secret_key: Alpaca API secret
oauth_token: OAuth token for user-on-behalf access
use_basic_auth: Use basic auth headers (for broker sandbox)
raw_data: Return raw API responses instead of models
url_override: Override base URL for testing
sandbox: Use sandbox environment
"""# Standard data client
data_client = StockHistoricalDataClient(
api_key="your-api-key",
secret_key="your-secret-key"
)
# OAuth-based client
data_client = StockHistoricalDataClient(
oauth_token="user-oauth-token"
)
# Sandbox client (for broker API users)
data_client = StockHistoricalDataClient(
api_key="broker-sandbox-key",
secret_key="broker-sandbox-secret",
use_basic_auth=True,
sandbox=True
)from alpaca.data.historical import CryptoHistoricalDataClient
class CryptoHistoricalDataClient(RESTClient):
def __init__(
self,
api_key: Optional[str] = None,
secret_key: Optional[str] = None,
oauth_token: Optional[str] = None,
use_basic_auth: bool = False,
raw_data: bool = False,
url_override: Optional[str] = None,
sandbox: bool = False,
) -> None:
"""Initialize crypto historical data client with same parameters as stock client."""from alpaca.data.historical import OptionHistoricalDataClient
class OptionHistoricalDataClient(RESTClient):
def __init__(
self,
api_key: Optional[str] = None,
secret_key: Optional[str] = None,
oauth_token: Optional[str] = None,
use_basic_auth: bool = False,
raw_data: bool = False,
url_override: Optional[str] = None,
sandbox: bool = False,
) -> None:
"""Initialize options historical data client with same parameters as stock client."""from alpaca.data.historical import NewsClient
class NewsClient(RESTClient):
def __init__(
self,
api_key: Optional[str] = None,
secret_key: Optional[str] = None,
oauth_token: Optional[str] = None,
use_basic_auth: bool = False,
raw_data: bool = False,
url_override: Optional[str] = None,
sandbox: bool = False,
) -> None:
"""Initialize news data client with same parameters as other clients."""from alpaca.data.historical import ScreenerClient
class ScreenerClient(RESTClient):
def __init__(
self,
api_key: Optional[str] = None,
secret_key: Optional[str] = None,
oauth_token: Optional[str] = None,
use_basic_auth: bool = False,
raw_data: bool = False,
url_override: Optional[str] = None,
sandbox: bool = False,
) -> None:
"""Initialize screener client for market scanning data."""from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
class TimeFrame:
def __init__(self, amount: int, unit: TimeFrameUnit) -> None:
"""
Create a timeframe for data aggregation.
Args:
amount: Number of time units (1-59 for minutes, 1-23 for hours, etc.)
unit: Time unit (Minute, Hour, Day, Week, Month)
"""
@property
def amount(self) -> int:
"""Number of time units."""
@property
def unit(self) -> TimeFrameUnit:
"""Time unit type."""
@property
def value(self) -> str:
"""String representation for API (e.g., '5Min', '1Day')."""
# Predefined timeframes
TimeFrame.Minute # 1-minute bars
TimeFrame.Hour # 1-hour bars
TimeFrame.Day # 1-day bars
TimeFrame.Week # 1-week bars
TimeFrame.Month # 1-month barsfrom alpaca.data.timeframe import TimeFrameUnit
class TimeFrameUnit(str, Enum):
Minute = "Min" # Minute intervals (1-59)
Hour = "Hour" # Hour intervals (1-23)
Day = "Day" # Daily intervals (1 only)
Week = "Week" # Weekly intervals (1 only)
Month = "Month" # Monthly intervals (1, 2, 3, 6, 12)Usage Examples:
# Create custom timeframes
five_min = TimeFrame(5, TimeFrameUnit.Minute)
four_hour = TimeFrame(4, TimeFrameUnit.Hour)
quarterly = TimeFrame(3, TimeFrameUnit.Month)
# Use predefined timeframes
minute_tf = TimeFrame.Minute
daily_tf = TimeFrame.Day
print(f"Timeframe: {five_min.value}") # Output: "5Min"def get_stock_bars(
self,
request: StockBarsRequest
) -> Union[BarSet, RawData]:
"""
Get historical bar data for stocks.
Args:
request: Bar request parameters
Returns:
BarSet: Time-series bar data with OHLCV information
"""from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import Adjustment, DataFeed
from datetime import datetime
class StockBarsRequest(BaseBarsRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols to retrieve
timeframe: TimeFrame, # Required: bar aggregation period
start: Optional[datetime] = None, # Start time (UTC)
end: Optional[datetime] = None, # End time (UTC, defaults to now)
limit: Optional[int] = None, # Max bars to return
adjustment: Optional[Adjustment] = None, # Corporate action adjustment
feed: Optional[DataFeed] = None, # Data feed (IEX, SIP, etc.)
sort: Optional[Sort] = None, # ASC or DESC
currency: Optional[SupportedCurrencies] = None # Currency (defaults to USD)
):from alpaca.data.enums import Adjustment
class Adjustment(str, Enum):
RAW = "raw" # No adjustments
SPLIT = "split" # Split-adjusted only
DIVIDEND = "dividend" # Dividend-adjusted only
ALL = "all" # Both split and dividend adjustedfrom alpaca.data.enums import DataFeed
class DataFeed(str, Enum):
IEX = "iex" # IEX data feed
SIP = "sip" # Securities Information Processor (premium)
OTC = "otc" # Over-the-counter data
ERSX = "ersx" # ERSX data feed
MEMX = "memx" # Members Exchange
OPRA = "opra" # Options Price Reporting Authority
INDICATIVE = "indicative" # Indicative pricingUsage Examples:
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from alpaca.data.enums import Adjustment, DataFeed
from datetime import datetime, timedelta
# Get 1-minute bars for single stock
bars_request = StockBarsRequest(
symbol_or_symbols="AAPL",
timeframe=TimeFrame.Minute,
start=datetime.now() - timedelta(hours=6),
end=datetime.now()
)
bars = data_client.get_stock_bars(bars_request)
# Get daily bars for multiple stocks with adjustments
bars_request = StockBarsRequest(
symbol_or_symbols=["AAPL", "TSLA", "MSFT", "GOOGL"],
timeframe=TimeFrame.Day,
start=datetime(2023, 1, 1),
end=datetime(2023, 12, 31),
adjustment=Adjustment.ALL,
feed=DataFeed.SIP
)
bars = data_client.get_stock_bars(bars_request)
# Get 5-minute bars with limit
bars_request = StockBarsRequest(
symbol_or_symbols="SPY",
timeframe=TimeFrame(5, TimeFrameUnit.Minute),
limit=1000,
sort=Sort.DESC # Most recent first
)
bars = data_client.get_stock_bars(bars_request)from alpaca.data.models import Bar
class Bar:
"""OHLCV bar data for a single time period."""
symbol: str # Stock symbol
timestamp: datetime # Bar timestamp (UTC)
open: float # Opening price
high: float # High price
low: float # Low price
close: float # Closing price
volume: int # Trading volume
trade_count: Optional[int] # Number of trades
vwap: Optional[float] # Volume-weighted average pricefrom alpaca.data.models import BarSet
class BarSet:
"""Collection of bars with time-series functionality."""
# Dictionary access: bars["AAPL"] returns List[Bar]
# Pandas integration: bars.df returns DataFrame
# Time-series operations available
def __getitem__(self, symbol: str) -> List[Bar]:
"""Get bars for specific symbol."""
@property
def df(self) -> pd.DataFrame:
"""Convert to pandas DataFrame with MultiIndex."""
@property
def symbols(self) -> List[str]:
"""Get all symbols in the bar set."""Working with Bar Data:
# Access bars by symbol
bars = data_client.get_stock_bars(bars_request)
aapl_bars = bars["AAPL"]
print(f"Retrieved {len(aapl_bars)} bars for AAPL")
for bar in aapl_bars[-5:]: # Last 5 bars
print(f"{bar.timestamp}: O:{bar.open} H:{bar.high} L:{bar.low} C:{bar.close} V:{bar.volume}")
# Convert to pandas for analysis
df = bars.df
print(df.head())
print(f"Symbols: {bars.symbols}")
# Calculate returns
df['returns'] = df.groupby('symbol')['close'].pct_change()
print(f"Average daily return: {df.groupby('symbol')['returns'].mean()}")def get_stock_quotes(
self,
request: StockQuotesRequest
) -> Union[QuoteSet, RawData]:
"""
Get historical quote data (bid/ask) for stocks.
Args:
request: Quote request parameters
Returns:
QuoteSet: Time-series quote data
"""from alpaca.data.requests import StockQuotesRequest
class StockQuotesRequest(BaseTimeseriesDataRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols
start: Optional[datetime] = None, # Start time (UTC)
end: Optional[datetime] = None, # End time (UTC)
limit: Optional[int] = None, # Max quotes to return
feed: Optional[DataFeed] = None, # Data feed
sort: Optional[Sort] = None, # Sort order
currency: Optional[SupportedCurrencies] = None
):from alpaca.data.models import Quote
class Quote:
"""Bid/ask quote data."""
symbol: str # Stock symbol
timestamp: datetime # Quote timestamp (UTC)
bid_exchange: str # Bid exchange code
bid_price: float # Bid price
bid_size: int # Bid size (shares)
ask_exchange: str # Ask exchange code
ask_price: float # Ask price
ask_size: int # Ask size (shares)
conditions: Optional[List[str]] # Quote conditions/flagsUsage Examples:
# Get recent quotes
quotes_request = StockQuotesRequest(
symbol_or_symbols="AAPL",
start=datetime.now() - timedelta(hours=1),
limit=1000
)
quotes = data_client.get_stock_quotes(quotes_request)
# Analyze bid-ask spread
aapl_quotes = quotes["AAPL"]
for quote in aapl_quotes[-10:]:
spread = quote.ask_price - quote.bid_price
spread_pct = (spread / quote.ask_price) * 100
print(f"{quote.timestamp}: Bid ${quote.bid_price} Ask ${quote.ask_price} Spread {spread_pct:.3f}%")def get_stock_trades(
self,
request: StockTradesRequest
) -> Union[TradeSet, RawData]:
"""
Get historical trade data for stocks.
Args:
request: Trade request parameters
Returns:
TradeSet: Time-series trade execution data
"""from alpaca.data.requests import StockTradesRequest
class StockTradesRequest(BaseTimeseriesDataRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols
start: Optional[datetime] = None, # Start time (UTC)
end: Optional[datetime] = None, # End time (UTC)
limit: Optional[int] = None, # Max trades to return
feed: Optional[DataFeed] = None, # Data feed
sort: Optional[Sort] = None, # Sort order
currency: Optional[SupportedCurrencies] = None
):from alpaca.data.models import Trade
class Trade:
"""Individual trade execution data."""
symbol: str # Stock symbol
timestamp: datetime # Trade timestamp (UTC)
exchange: str # Exchange code
price: float # Trade price
size: int # Trade size (shares)
trade_id: Optional[str] # Unique trade ID
conditions: Optional[List[str]] # Trade conditions/flagsUsage Examples:
# Get recent trades for analysis
trades_request = StockTradesRequest(
symbol_or_symbols="TSLA",
start=datetime.now() - timedelta(minutes=30),
limit=5000
)
trades = data_client.get_stock_trades(trades_request)
# Analyze trade activity
tsla_trades = trades["TSLA"]
total_volume = sum(trade.size for trade in tsla_trades)
vwap = sum(trade.price * trade.size for trade in tsla_trades) / total_volume
print(f"Trades: {len(tsla_trades)}, Volume: {total_volume:,}, VWAP: ${vwap:.2f}")def get_stock_latest_bar(
self,
request: StockLatestBarRequest
) -> Union[Dict[str, Bar], RawData]:
"""
Get latest bar for stocks.
Args:
request: Latest bar request
Returns:
Dict[str, Bar]: Latest bar for each symbol
"""def get_stock_latest_quote(
self,
request: StockLatestQuoteRequest
) -> Union[Dict[str, Quote], RawData]:
"""
Get latest quote for stocks.
Args:
request: Latest quote request
Returns:
Dict[str, Quote]: Latest quote for each symbol
"""def get_stock_latest_trade(
self,
request: StockLatestTradeRequest
) -> Union[Dict[str, Trade], RawData]:
"""
Get latest trade for stocks.
Args:
request: Latest trade request
Returns:
Dict[str, Trade]: Latest trade for each symbol
"""from alpaca.data.requests import (
StockLatestBarRequest, StockLatestQuoteRequest, StockLatestTradeRequest
)
class StockLatestBarRequest(NonEmptyRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols
feed: Optional[DataFeed] = None, # Data feed
currency: Optional[SupportedCurrencies] = None
):
class StockLatestQuoteRequest(NonEmptyRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols
feed: Optional[DataFeed] = None, # Data feed
currency: Optional[SupportedCurrencies] = None
):
class StockLatestTradeRequest(NonEmptyRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols
feed: Optional[DataFeed] = None, # Data feed
currency: Optional[SupportedCurrencies] = None
):Usage Examples:
# Get latest bars for portfolio monitoring
latest_bars = data_client.get_stock_latest_bar(
StockLatestBarRequest(
symbol_or_symbols=["AAPL", "TSLA", "MSFT", "GOOGL"],
feed=DataFeed.IEX
)
)
for symbol, bar in latest_bars.items():
print(f"{symbol}: ${bar.close} (Vol: {bar.volume:,})")
# Get latest quotes for order placement
latest_quotes = data_client.get_stock_latest_quote(
StockLatestQuoteRequest(symbol_or_symbols="AAPL")
)
aapl_quote = latest_quotes["AAPL"]
print(f"AAPL Bid: ${aapl_quote.bid_price} x {aapl_quote.bid_size}")
print(f"AAPL Ask: ${aapl_quote.ask_price} x {aapl_quote.ask_size}")
# Get latest trades for execution analysis
latest_trades = data_client.get_stock_latest_trade(
StockLatestTradeRequest(symbol_or_symbols="SPY")
)
spy_trade = latest_trades["SPY"]
print(f"SPY Last: ${spy_trade.price} @ {spy_trade.timestamp}")def get_stock_snapshot(
self,
request: StockSnapshotRequest
) -> Union[Dict[str, Snapshot], RawData]:
"""
Get comprehensive market snapshot for stocks.
Args:
request: Snapshot request parameters
Returns:
Dict[str, Snapshot]: Complete market data snapshot for each symbol
"""from alpaca.data.requests import StockSnapshotRequest
class StockSnapshotRequest(NonEmptyRequest):
def __init__(
self,
symbol_or_symbols: Union[str, List[str]], # Required: symbols
feed: Optional[DataFeed] = None, # Data feed
currency: Optional[SupportedCurrencies] = None
):from alpaca.data.models import Snapshot
class Snapshot:
"""Comprehensive market snapshot combining all data types."""
symbol: str # Stock symbol
latest_trade: Optional[Trade] # Most recent trade
latest_quote: Optional[Quote] # Current bid/ask
minute_bar: Optional[Bar] # Current minute bar
daily_bar: Optional[Bar] # Current daily bar
prev_daily_bar: Optional[Bar] # Previous day's barUsage Examples:
# Get comprehensive market snapshot
snapshot = data_client.get_stock_snapshot(
StockSnapshotRequest(
symbol_or_symbols=["AAPL", "TSLA", "SPY"],
feed=DataFeed.IEX
)
)
for symbol, snap in snapshot.items():
print(f"\n{symbol} Snapshot:")
if snap.latest_trade:
print(f" Last Trade: ${snap.latest_trade.price} @ {snap.latest_trade.timestamp}")
if snap.latest_quote:
spread = snap.latest_quote.ask_price - snap.latest_quote.bid_price
print(f" Quote: ${snap.latest_quote.bid_price} x ${snap.latest_quote.ask_price} (${spread:.2f})")
if snap.daily_bar and snap.prev_daily_bar:
daily_change = snap.daily_bar.close - snap.prev_daily_bar.close
daily_change_pct = (daily_change / snap.prev_daily_bar.close) * 100
print(f" Daily: ${snap.daily_bar.close} ({daily_change_pct:+.2f}%)")
print(f" Volume: {snap.daily_bar.volume:,}")The cryptocurrency clients follow the same patterns as stock data with crypto-specific symbols and features.
from alpaca.data.historical import CryptoHistoricalDataClient
from alpaca.data.requests import (
CryptoBarsRequest, CryptoQuotesRequest, CryptoTradesRequest,
CryptoLatestBarRequest, CryptoLatestQuoteRequest, CryptoLatestTradeRequest,
CryptoSnapshotRequest, CryptoLatestOrderbookRequest
)
crypto_client = CryptoHistoricalDataClient(api_key="key", secret_key="secret")
# Same methods as stock client but with crypto request types
crypto_client.get_crypto_bars(CryptoBarsRequest(...))
crypto_client.get_crypto_quotes(CryptoQuotesRequest(...))
crypto_client.get_crypto_trades(CryptoTradesRequest(...))
crypto_client.get_crypto_latest_bar(CryptoLatestBarRequest(...))
crypto_client.get_crypto_latest_quote(CryptoLatestQuoteRequest(...))
crypto_client.get_crypto_latest_trade(CryptoLatestTradeRequest(...))
crypto_client.get_crypto_snapshot(CryptoSnapshotRequest(...))
# Crypto-specific: order book data
crypto_client.get_crypto_latest_orderbook(CryptoLatestOrderbookRequest(...))# Crypto symbols use / format: BASE/QUOTE
crypto_symbols = ["BTC/USD", "ETH/USD", "LTC/USD", "BCH/USD", "DOGE/USD"]
# Get crypto bars
crypto_bars = crypto_client.get_crypto_bars(
CryptoBarsRequest(
symbol_or_symbols=crypto_symbols,
timeframe=TimeFrame.Hour,
start=datetime.now() - timedelta(days=7)
)
)
# Get latest crypto quotes
crypto_quotes = crypto_client.get_crypto_latest_quote(
CryptoLatestQuoteRequest(symbol_or_symbols="BTC/USD")
)
btc_quote = crypto_quotes["BTC/USD"]
print(f"BTC/USD: ${btc_quote.bid_price:.2f} / ${btc_quote.ask_price:.2f}")from alpaca.data.requests import CryptoLatestOrderbookRequest
# Get order book depth
orderbook = crypto_client.get_crypto_latest_orderbook(
CryptoLatestOrderbookRequest(symbol_or_symbols="BTC/USD")
)
btc_book = orderbook["BTC/USD"]
print("Top 5 Bids:")
for bid in btc_book.bids[:5]:
print(f" ${bid.price:.2f} x {bid.size}")
print("Top 5 Asks:")
for ask in btc_book.asks[:5]:
print(f" ${ask.price:.2f} x {ask.size}")from alpaca.data.historical import OptionHistoricalDataClient
from alpaca.data.requests import (
OptionBarsRequest, OptionTradesRequest,
OptionLatestQuoteRequest, OptionLatestTradeRequest,
OptionSnapshotRequest, OptionChainRequest
)
options_client = OptionHistoricalDataClient(api_key="key", secret_key="secret")
# Options data methods
options_client.get_option_bars(OptionBarsRequest(...))
options_client.get_option_trades(OptionTradesRequest(...))
options_client.get_option_latest_quote(OptionLatestQuoteRequest(...))
options_client.get_option_latest_trade(OptionLatestTradeRequest(...))
options_client.get_option_snapshot(OptionSnapshotRequest(...))
# Options-specific: option chain data
options_client.get_option_chain(OptionChainRequest(...))# Option symbols use OCC format: AAPL230317C00150000
# Format: [SYMBOL][YYMMDD][C/P][STRIKE_PRICE_PADDED]
# Get option bars
option_bars = options_client.get_option_bars(
OptionBarsRequest(
symbol_or_symbols="AAPL230317C00150000", # AAPL $150 Call expiring 3/17/23
timeframe=TimeFrame.Minute,
start=datetime.now() - timedelta(hours=6)
)
)
# Get option chain for underlying
option_chain = options_client.get_option_chain(
OptionChainRequest(
underlying_symbol="AAPL",
expiration_date=date(2023, 3, 17)
)
)
print("AAPL Option Chain 3/17/23:")
for contract in option_chain.options:
print(f"{contract.symbol}: ${contract.strike_price} {contract.type}")from alpaca.data.historical import NewsClient
from alpaca.data.requests import NewsRequest
news_client = NewsClient(api_key="key", secret_key="secret")
def get_news(self, request: NewsRequest) -> Union[NewsSet, RawData]:
"""
Get news articles with filtering.
Args:
request: News query parameters
Returns:
NewsSet: Collection of news articles
"""from alpaca.data.requests import NewsRequest
class NewsRequest(NonEmptyRequest):
def __init__(
self,
symbols: Optional[Union[str, List[str]]] = None, # Filter by symbols
start: Optional[datetime] = None, # Start time
end: Optional[datetime] = None, # End time
sort: Optional[Sort] = None, # Sort order
include_content: Optional[bool] = None, # Include article body
exclude_contentless: Optional[bool] = None, # Exclude articles without content
limit: Optional[int] = None # Max articles to return
):from alpaca.data.models import News
class News:
"""News article information."""
id: str # Article ID
headline: str # Article headline
author: Optional[str] # Article author
created_at: datetime # Publication timestamp
updated_at: datetime # Last update timestamp
summary: Optional[str] # Article summary
content: Optional[str] # Full article content (if requested)
symbols: List[str] # Related symbols
url: Optional[str] # Source URL
images: Optional[List[dict]] # Article imagesUsage Examples:
# Get recent news for specific stocks
news = news_client.get_news(
NewsRequest(
symbols=["AAPL", "TSLA", "MSFT"],
start=datetime.now() - timedelta(hours=24),
limit=50,
include_content=True
)
)
print(f"Found {len(news.news)} articles")
for article in news.news[:5]:
print(f"\n{article.headline}")
print(f"Symbols: {', '.join(article.symbols)}")
print(f"Published: {article.created_at}")
if article.summary:
print(f"Summary: {article.summary[:200]}...")
# Get all market news (no symbol filter)
market_news = news_client.get_news(
NewsRequest(
start=datetime.now() - timedelta(hours=6),
sort=Sort.DESC,
limit=20
)
)from alpaca.data.historical import ScreenerClient
from alpaca.data.requests import MostActivesRequest, MarketMoversRequest
screener_client = ScreenerClient(api_key="key", secret_key="secret")
def get_most_actives(
self,
request: MostActivesRequest
) -> Union[MostActives, RawData]:
"""
Get most active stocks by volume or trade count.
Args:
request: Most actives query parameters
Returns:
MostActives: Most active stocks data
"""
def get_market_movers(
self,
request: MarketMoversRequest
) -> Union[Movers, RawData]:
"""
Get top gaining and losing stocks.
Args:
request: Market movers query parameters
Returns:
Movers: Top movers data with gainers and losers
"""from alpaca.data.requests import MostActivesRequest, MarketMoversRequest
from alpaca.data.enums import MostActivesBy, MarketType
class MostActivesRequest(NonEmptyRequest):
def __init__(
self,
by: MostActivesBy, # VOLUME or TRADE_COUNT
top: int, # Number of stocks to return (max 50)
market_type: Optional[MarketType] = None # STOCKS (default)
):
class MarketMoversRequest(NonEmptyRequest):
def __init__(
self,
top: int, # Number of gainers/losers (max 50)
market_type: Optional[MarketType] = None # STOCKS (default)
):Usage Examples:
from alpaca.data.enums import MostActivesBy
# Get most active stocks by volume
most_active = screener_client.get_most_actives(
MostActivesRequest(by=MostActivesBy.VOLUME, top=20)
)
print("Most Active Stocks by Volume:")
for stock in most_active.most_actives:
print(f"{stock.symbol}: {stock.volume:,} shares, ${stock.price:.2f}")
# Get top movers (gainers and losers)
movers = screener_client.get_market_movers(
MarketMoversRequest(top=10)
)
print("\nTop Gainers:")
for gainer in movers.gainers:
print(f"{gainer.symbol}: +{gainer.percent_change:.2f}% (${gainer.price:.2f})")
print("\nTop Losers:")
for loser in movers.losers:
print(f"{loser.symbol}: {loser.percent_change:.2f}% (${loser.price:.2f})")from alpaca.data.live import StockDataStream
class StockDataStream:
def __init__(
self,
api_key: Optional[str] = None,
secret_key: Optional[str] = None,
raw_data: bool = False,
feed: Optional[DataFeed] = None, # IEX, SIP, etc.
websocket_params: Optional[dict] = None,
url_override: Optional[str] = None
):
"""
Initialize stock data stream for real-time data.
Args:
api_key: API key for authentication
secret_key: Secret key for authentication
raw_data: Return raw data instead of models
feed: Data feed selection
websocket_params: WebSocket configuration
url_override: Override WebSocket URL
"""
def subscribe_bars(self, handler: Callable, *symbols) -> None:
"""Subscribe to real-time bar updates."""
def subscribe_quotes(self, handler: Callable, *symbols) -> None:
"""Subscribe to real-time quote updates."""
def subscribe_trades(self, handler: Callable, *symbols) -> None:
"""Subscribe to real-time trade updates."""
def subscribe_updated_bars(self, handler: Callable, *symbols) -> None:
"""Subscribe to minute bar updates."""
def subscribe_daily_bars(self, handler: Callable, *symbols) -> None:
"""Subscribe to daily bar updates."""
def subscribe_statuses(self, handler: Callable, *symbols) -> None:
"""Subscribe to trading status updates."""
def subscribe_lulds(self, handler: Callable, *symbols) -> None:
"""Subscribe to limit up/limit down updates."""
def subscribe_corrections(self, handler: Callable, *symbols) -> None:
"""Subscribe to trade corrections."""
def subscribe_cancellations(self, handler: Callable, *symbols) -> None:
"""Subscribe to trade cancellations."""
async def run(self) -> None:
"""Start the WebSocket connection and begin streaming."""import asyncio
from alpaca.data.live import StockDataStream
from alpaca.data.enums import DataFeed
# Initialize stream
stream = StockDataStream(
api_key="your-api-key",
secret_key="your-secret-key",
feed=DataFeed.IEX
)
# Define data handlers
async def trade_handler(data):
"""Handle real-time trade data."""
print(f"TRADE {data.symbol}: ${data.price} x {data.size} @ {data.timestamp}")
async def quote_handler(data):
"""Handle real-time quote data."""
spread = data.ask_price - data.bid_price
print(f"QUOTE {data.symbol}: ${data.bid_price} x {data.ask_price} (${spread:.2f})")
async def bar_handler(data):
"""Handle real-time bar data."""
print(f"BAR {data.symbol}: O:${data.open} H:${data.high} L:${data.low} C:${data.close} V:{data.volume}")
# Subscribe to data streams
symbols = ["AAPL", "TSLA", "SPY", "QQQ"]
stream.subscribe_trades(trade_handler, *symbols)
stream.subscribe_quotes(quote_handler, *symbols)
stream.subscribe_updated_bars(bar_handler, *symbols)
# Start streaming (this will run indefinitely)
asyncio.run(stream.run())from alpaca.data.live import CryptoDataStream
# Similar interface to stock stream but for crypto symbols
crypto_stream = CryptoDataStream(
api_key="your-api-key",
secret_key="your-secret-key"
)
# Subscribe to crypto data
crypto_symbols = ["BTC/USD", "ETH/USD", "DOGE/USD"]
crypto_stream.subscribe_trades(trade_handler, *crypto_symbols)
crypto_stream.subscribe_quotes(quote_handler, *crypto_symbols)
crypto_stream.subscribe_bars(bar_handler, *crypto_symbols)
# Crypto-specific: order book updates
async def orderbook_handler(data):
print(f"ORDERBOOK {data.symbol}: {len(data.bids)} bids, {len(data.asks)} asks")
crypto_stream.subscribe_orderbooks(orderbook_handler, "BTC/USD")from alpaca.data.live import OptionDataStream
# Stream real-time options data
option_stream = OptionDataStream(
api_key="your-api-key",
secret_key="your-secret-key"
)
async def option_trade_handler(data):
"""Handle real-time option trade data."""
print(f"OPTION TRADE {data.symbol}: ${data.price} x {data.size}")
async def option_quote_handler(data):
"""Handle real-time option quote data."""
print(f"OPTION QUOTE {data.symbol}: ${data.bid_price} x ${data.ask_price}")
# Subscribe to option data
option_symbols = ["AAPL230317C00150000", "TSLA230317P00200000"]
option_stream.subscribe_trades(option_trade_handler, *option_symbols)
option_stream.subscribe_quotes(option_quote_handler, *option_symbols)from alpaca.data.live import NewsDataStream
# Stream real-time news
news_stream = NewsDataStream(
api_key="your-api-key",
secret_key="your-secret-key"
)
async def news_handler(data):
"""Handle real-time news."""
print(f"NEWS: {data.headline}")
print(f"Symbols: {', '.join(data.symbols)}")
print(f"Time: {data.created_at}")
# Subscribe to news for specific symbols
news_stream.subscribe_news(news_handler, "AAPL", "TSLA", "MSFT")
# Or subscribe to all news (no symbol filter)
news_stream.subscribe_news(news_handler)import asyncio
from datetime import datetime
from collections import defaultdict
from alpaca.data.live import StockDataStream, NewsDataStream
from alpaca.data.enums import DataFeed
class RealTimeAnalyzer:
def __init__(self, api_key: str, secret_key: str):
# Initialize data streams
self.data_stream = StockDataStream(
api_key=api_key,
secret_key=secret_key,
feed=DataFeed.IEX
)
self.news_stream = NewsDataStream(
api_key=api_key,
secret_key=secret_key
)
# Data storage
self.latest_prices = {}
self.trade_volumes = defaultdict(int)
self.price_alerts = {
"AAPL": {"above": 180.00, "below": 170.00},
"TSLA": {"above": 250.00, "below": 200.00}
}
# Subscribe to streams
self.setup_subscriptions()
def setup_subscriptions(self):
"""Set up data stream subscriptions."""
symbols = ["AAPL", "TSLA", "SPY", "QQQ", "MSFT"]
# Market data subscriptions
self.data_stream.subscribe_trades(self.handle_trade, *symbols)
self.data_stream.subscribe_quotes(self.handle_quote, *symbols)
self.data_stream.subscribe_updated_bars(self.handle_bar, *symbols)
# News subscriptions
self.news_stream.subscribe_news(self.handle_news, *symbols)
async def handle_trade(self, trade):
"""Process real-time trade data."""
symbol = trade.symbol
self.latest_prices[symbol] = trade.price
self.trade_volumes[symbol] += trade.size
# Check price alerts
if symbol in self.price_alerts:
alerts = self.price_alerts[symbol]
if trade.price > alerts["above"]:
print(f"🚀 {symbol} ABOVE ${alerts['above']}: ${trade.price}")
elif trade.price < alerts["below"]:
print(f"📉 {symbol} BELOW ${alerts['below']}: ${trade.price}")
# Print large trades
if trade.size >= 10000:
print(f"🐋 LARGE TRADE {symbol}: ${trade.price} x {trade.size:,}")
async def handle_quote(self, quote):
"""Process real-time quote data."""
spread = quote.ask_price - quote.bid_price
spread_pct = (spread / quote.ask_price) * 100
# Alert on wide spreads
if spread_pct > 0.5: # 0.5% spread
print(f"⚠️ WIDE SPREAD {quote.symbol}: {spread_pct:.2f}%")
async def handle_bar(self, bar):
"""Process real-time bar updates."""
# Calculate intraday returns
if bar.symbol in self.latest_prices:
prev_price = self.latest_prices.get(f"{bar.symbol}_prev", bar.open)
change_pct = ((bar.close - prev_price) / prev_price) * 100
if abs(change_pct) > 2.0: # 2% move
direction = "UP" if change_pct > 0 else "DOWN"
print(f"📊 {bar.symbol} {direction} {change_pct:+.2f}% - ${bar.close} (Vol: {bar.volume:,})")
self.latest_prices[f"{bar.symbol}_prev"] = bar.close
async def handle_news(self, news):
"""Process real-time news."""
print(f"📰 NEWS: {news.headline}")
print(f" Symbols: {', '.join(news.symbols)}")
print(f" Time: {news.created_at}")
# Check for important keywords
headline_lower = news.headline.lower()
important_keywords = ["earnings", "merger", "acquisition", "fda approval", "bankruptcy"]
for keyword in important_keywords:
if keyword in headline_lower:
print(f"🔥 IMPORTANT: {keyword.upper()} mentioned in news!")
break
async def print_summary(self):
"""Print periodic summary."""
while True:
await asyncio.sleep(60) # Every minute
print(f"\n=== MARKET SUMMARY ({datetime.now().strftime('%H:%M:%S')}) ===")
for symbol, price in self.latest_prices.items():
if not symbol.endswith("_prev"):
volume = self.trade_volumes.get(symbol, 0)
print(f"{symbol}: ${price:.2f} (Vol: {volume:,})")
print("=" * 50)
async def run(self):
"""Start the real-time analyzer."""
print("🚀 Starting real-time market analyzer...")
# Start data streams
data_task = asyncio.create_task(self.data_stream.run())
news_task = asyncio.create_task(self.news_stream.run())
summary_task = asyncio.create_task(self.print_summary())
try:
# Run all tasks concurrently
await asyncio.gather(data_task, news_task, summary_task)
except KeyboardInterrupt:
print("\n👋 Shutting down analyzer...")
data_task.cancel()
news_task.cancel()
summary_task.cancel()
# Run the analyzer
if __name__ == "__main__":
analyzer = RealTimeAnalyzer(
api_key="your-api-key",
secret_key="your-secret-key"
)
asyncio.run(analyzer.run())import pandas as pd
# BarSet, QuoteSet, TradeSet all have .df property
bars = data_client.get_stock_bars(bars_request)
df = bars.df
# DataFrame has MultiIndex: (timestamp, symbol)
print(df.index.names) # ['timestamp', 'symbol']
print(df.columns) # ['open', 'high', 'low', 'close', 'volume', ...]
# Access data for specific symbol
aapl_data = df.xs('AAPL', level='symbol')
print(aapl_data.tail())
# Pivot for time-series analysis
pivot_df = df.reset_index().pivot(index='timestamp', columns='symbol', values='close')
print(pivot_df.head())
# Calculate returns
returns = pivot_df.pct_change().dropna()
print(f"Correlations:\n{returns.corr()}")# Export to CSV
bars_df = bars.df
bars_df.to_csv('market_data.csv')
# Export to Parquet (more efficient for large datasets)
bars_df.to_parquet('market_data.parquet')
# Store in database (example with SQLite)
import sqlite3
conn = sqlite3.connect('market_data.db')
bars_df.to_sql('stock_bars', conn, if_exists='append', index=True)
conn.close()
# Export individual symbols
for symbol in bars.symbols:
symbol_bars = bars[symbol]
symbol_df = pd.DataFrame([{
'timestamp': bar.timestamp,
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': bar.volume
} for bar in symbol_bars])
symbol_df.to_csv(f'{symbol}_bars.csv', index=False)from alpaca.common.requests import NonEmptyRequest
from alpaca.common.enums import Sort, SupportedCurrencies
class NonEmptyRequest:
"""Base class for API request objects with validation."""
class Sort(str, Enum):
ASC = "asc" # Ascending order
DESC = "desc" # Descending order
class SupportedCurrencies(str, Enum):
USD = "USD" # US Dollar (default)
GBP = "GBP" # British Pound
CHF = "CHF" # Swiss Franc
EUR = "EUR" # Euro
CAD = "CAD" # Canadian Dollarfrom alpaca.common.exceptions import APIError
import time
def robust_data_fetch(client, request, max_retries=3):
"""Fetch data with error handling and retries."""
for attempt in range(max_retries):
try:
return client.get_stock_bars(request)
except APIError as e:
if e.status_code == 429: # Rate limit
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1})")
time.sleep(wait_time)
continue
elif e.status_code == 422: # Invalid parameters
print(f"Invalid request parameters: {e.message}")
break
else:
print(f"API error: {e.message} (status: {e.status_code})")
if attempt == max_retries - 1:
raise
time.sleep(1)
except Exception as e:
print(f"Unexpected error: {e}")
if attempt == max_retries - 1:
raise
time.sleep(1)
return None
# Usage
bars = robust_data_fetch(data_client, bars_request)
if bars:
print(f"Successfully retrieved {len(bars.df)} bars")
else:
print("Failed to retrieve data after all retries")import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest, StockLatestQuoteRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import Adjustment
class MarketAnalyzer:
def __init__(self, api_key: str, secret_key: str):
self.client = StockHistoricalDataClient(api_key, secret_key)
def get_historical_data(self, symbols: list, days: int = 30) -> pd.DataFrame:
"""Get historical daily data for analysis."""
request = StockBarsRequest(
symbol_or_symbols=symbols,
timeframe=TimeFrame.Day,
start=datetime.now() - timedelta(days=days),
adjustment=Adjustment.ALL
)
bars = self.client.get_stock_bars(request)
df = bars.df.reset_index()
# Pivot for analysis
pivot_df = df.pivot(index='timestamp', columns='symbol', values='close')
return pivot_df.fillna(method='forward')
def calculate_metrics(self, df: pd.DataFrame) -> dict:
"""Calculate key financial metrics."""
# Daily returns
returns = df.pct_change().dropna()
# Metrics calculation
metrics = {}
for symbol in df.columns:
symbol_returns = returns[symbol].dropna()
metrics[symbol] = {
'total_return': (df[symbol].iloc[-1] / df[symbol].iloc[0] - 1) * 100,
'volatility': symbol_returns.std() * np.sqrt(252) * 100, # Annualized
'sharpe_ratio': (symbol_returns.mean() / symbol_returns.std()) * np.sqrt(252),
'max_drawdown': self.calculate_max_drawdown(df[symbol]),
'current_price': df[symbol].iloc[-1]
}
return metrics
def calculate_max_drawdown(self, price_series: pd.Series) -> float:
"""Calculate maximum drawdown."""
peak = price_series.expanding().max()
drawdown = (price_series - peak) / peak
return drawdown.min() * 100
def get_correlation_matrix(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate correlation matrix."""
returns = df.pct_change().dropna()
return returns.corr()
def screen_stocks(self, symbols: list) -> pd.DataFrame:
"""Screen stocks based on various criteria."""
# Get current quotes
quotes = self.client.get_stock_latest_quote(
StockLatestQuoteRequest(symbol_or_symbols=symbols)
)
# Get historical data for calculations
df = self.get_historical_data(symbols, days=90)
metrics = self.calculate_metrics(df)
# Build screening results
screening_data = []
for symbol in symbols:
if symbol in quotes and symbol in metrics:
quote = quotes[symbol]
metric = metrics[symbol]
screening_data.append({
'symbol': symbol,
'price': metric['current_price'],
'bid': quote.bid_price,
'ask': quote.ask_price,
'spread_pct': ((quote.ask_price - quote.bid_price) / quote.ask_price) * 100,
'total_return_pct': metric['total_return'],
'volatility_pct': metric['volatility'],
'sharpe_ratio': metric['sharpe_ratio'],
'max_drawdown_pct': metric['max_drawdown']
})
return pd.DataFrame(screening_data).set_index('symbol')
def generate_report(self, symbols: list) -> None:
"""Generate comprehensive market report."""
print("📊 MARKET ANALYSIS REPORT")
print("=" * 50)
# Get data and metrics
df = self.get_historical_data(symbols, days=60)
metrics = self.calculate_metrics(df)
correlation = self.get_correlation_matrix(df)
screening = self.screen_stocks(symbols)
# Performance summary
print("\n📈 PERFORMANCE SUMMARY (60 Days)")
print("-" * 30)
perf_df = pd.DataFrame({
symbol: {
'Return %': f"{metrics[symbol]['total_return']:.2f}%",
'Volatility %': f"{metrics[symbol]['volatility']:.2f}%",
'Sharpe': f"{metrics[symbol]['sharpe_ratio']:.2f}",
'Max DD %': f"{metrics[symbol]['max_drawdown']:.2f}%"
} for symbol in symbols if symbol in metrics
}).T
print(perf_df)
# Top performers
print("\n🏆 TOP PERFORMERS")
print("-" * 20)
sorted_by_return = screening.sort_values('total_return_pct', ascending=False)
print(sorted_by_return[['price', 'total_return_pct', 'volatility_pct']].head())
# Risk analysis
print("\n⚠️ RISK ANALYSIS")
print("-" * 15)
high_vol = screening[screening['volatility_pct'] > 30]
if not high_vol.empty:
print(f"High Volatility Stocks (>30%): {', '.join(high_vol.index)}")
wide_spreads = screening[screening['spread_pct'] > 0.5]
if not wide_spreads.empty:
print(f"Wide Spreads (>0.5%): {', '.join(wide_spreads.index)}")
# Correlation insights
print("\n🔗 CORRELATION INSIGHTS")
print("-" * 22)
# Find highest correlations (excluding self-correlation)
corr_matrix = correlation.where(np.triu(np.ones(correlation.shape), k=1).astype(bool))
high_corr = corr_matrix.stack().sort_values(ascending=False).head(3)
print("Highest Correlations:")
for (stock1, stock2), corr_val in high_corr.items():
print(f" {stock1} - {stock2}: {corr_val:.3f}")
# Investment recommendations
print("\n💡 INVESTMENT INSIGHTS")
print("-" * 21)
# Best risk-adjusted returns
best_sharpe = screening.nlargest(3, 'sharpe_ratio')
print("Best Risk-Adjusted Returns (Sharpe Ratio):")
for symbol, data in best_sharpe.iterrows():
print(f" {symbol}: {data['sharpe_ratio']:.2f}")
# Momentum stocks
momentum = screening[(screening['total_return_pct'] > 10) & (screening['volatility_pct'] < 40)]
if not momentum.empty:
print(f"\nMomentum Candidates: {', '.join(momentum.index)}")
print("\n" + "=" * 50)
print(f"Report generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Run comprehensive analysis
if __name__ == "__main__":
analyzer = MarketAnalyzer(
api_key="your-api-key",
secret_key="your-secret-key"
)
# Analyze major tech stocks
tech_stocks = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "NFLX"]
analyzer.generate_report(tech_stocks)Install with Tessl CLI
npx tessl i tessl/pypi-alpaca-py