CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-alpaca-py

The Official Python SDK for Alpaca APIs providing access to trading, market data, and broker services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-client.mddocs/

Data Client

The Data Client provides comprehensive access to historical and real-time market data across multiple asset classes including stocks, cryptocurrencies, options, and news. It supports both REST API calls for historical data and WebSocket streams for real-time data feeds.

Historical Data Clients

StockHistoricalDataClient

from alpaca.data.historical import StockHistoricalDataClient

class StockHistoricalDataClient(RESTClient):
    def __init__(
        self,
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        oauth_token: Optional[str] = None,
        use_basic_auth: bool = False,
        raw_data: bool = False,
        url_override: Optional[str] = None,
        sandbox: bool = False,
    ) -> None:
        """
        Initialize stock historical data client.
        
        Args:
            api_key: Alpaca API key
            secret_key: Alpaca API secret  
            oauth_token: OAuth token for user-on-behalf access
            use_basic_auth: Use basic auth headers (for broker sandbox)
            raw_data: Return raw API responses instead of models
            url_override: Override base URL for testing
            sandbox: Use sandbox environment
        """

Basic Setup

# Standard data client
data_client = StockHistoricalDataClient(
    api_key="your-api-key",
    secret_key="your-secret-key"
)

# OAuth-based client
data_client = StockHistoricalDataClient(
    oauth_token="user-oauth-token"
)

# Sandbox client (for broker API users)
data_client = StockHistoricalDataClient(
    api_key="broker-sandbox-key",
    secret_key="broker-sandbox-secret",
    use_basic_auth=True,
    sandbox=True
)

CryptoHistoricalDataClient

from alpaca.data.historical import CryptoHistoricalDataClient

class CryptoHistoricalDataClient(RESTClient):
    def __init__(
        self,
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        oauth_token: Optional[str] = None,
        use_basic_auth: bool = False,
        raw_data: bool = False,
        url_override: Optional[str] = None,
        sandbox: bool = False,
    ) -> None:
        """Initialize crypto historical data client with same parameters as stock client."""

OptionHistoricalDataClient

from alpaca.data.historical import OptionHistoricalDataClient

class OptionHistoricalDataClient(RESTClient):
    def __init__(
        self,
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        oauth_token: Optional[str] = None,
        use_basic_auth: bool = False,
        raw_data: bool = False,
        url_override: Optional[str] = None,
        sandbox: bool = False,
    ) -> None:
        """Initialize options historical data client with same parameters as stock client."""

NewsClient

from alpaca.data.historical import NewsClient

class NewsClient(RESTClient):
    def __init__(
        self,
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        oauth_token: Optional[str] = None,
        use_basic_auth: bool = False,
        raw_data: bool = False,
        url_override: Optional[str] = None,
        sandbox: bool = False,
    ) -> None:
        """Initialize news data client with same parameters as other clients."""

ScreenerClient

from alpaca.data.historical import ScreenerClient

class ScreenerClient(RESTClient):
    def __init__(
        self,
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        oauth_token: Optional[str] = None,
        use_basic_auth: bool = False,
        raw_data: bool = False,
        url_override: Optional[str] = None,
        sandbox: bool = False,
    ) -> None:
        """Initialize screener client for market scanning data."""

TimeFrame System

TimeFrame Class

from alpaca.data.timeframe import TimeFrame, TimeFrameUnit

class TimeFrame:
    def __init__(self, amount: int, unit: TimeFrameUnit) -> None:
        """
        Create a timeframe for data aggregation.
        
        Args:
            amount: Number of time units (1-59 for minutes, 1-23 for hours, etc.)
            unit: Time unit (Minute, Hour, Day, Week, Month)
        """
    
    @property
    def amount(self) -> int:
        """Number of time units."""
    
    @property  
    def unit(self) -> TimeFrameUnit:
        """Time unit type."""
    
    @property
    def value(self) -> str:
        """String representation for API (e.g., '5Min', '1Day')."""

# Predefined timeframes
TimeFrame.Minute    # 1-minute bars
TimeFrame.Hour      # 1-hour bars  
TimeFrame.Day       # 1-day bars
TimeFrame.Week      # 1-week bars
TimeFrame.Month     # 1-month bars

TimeFrameUnit Enum

from alpaca.data.timeframe import TimeFrameUnit

class TimeFrameUnit(str, Enum):
    Minute = "Min"     # Minute intervals (1-59)
    Hour = "Hour"      # Hour intervals (1-23)
    Day = "Day"        # Daily intervals (1 only)
    Week = "Week"      # Weekly intervals (1 only)  
    Month = "Month"    # Monthly intervals (1, 2, 3, 6, 12)

Usage Examples:

# Create custom timeframes
five_min = TimeFrame(5, TimeFrameUnit.Minute)
four_hour = TimeFrame(4, TimeFrameUnit.Hour)
quarterly = TimeFrame(3, TimeFrameUnit.Month)

# Use predefined timeframes
minute_tf = TimeFrame.Minute
daily_tf = TimeFrame.Day

print(f"Timeframe: {five_min.value}")  # Output: "5Min"

Stock Market Data

Historical Bars (OHLCV)

get_stock_bars()

def get_stock_bars(
    self, 
    request: StockBarsRequest
) -> Union[BarSet, RawData]:
    """
    Get historical bar data for stocks.
    
    Args:
        request: Bar request parameters
        
    Returns:
        BarSet: Time-series bar data with OHLCV information
    """

StockBarsRequest

from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import Adjustment, DataFeed
from datetime import datetime

class StockBarsRequest(BaseBarsRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols to retrieve
        timeframe: TimeFrame,                       # Required: bar aggregation period
        start: Optional[datetime] = None,           # Start time (UTC)
        end: Optional[datetime] = None,             # End time (UTC, defaults to now)
        limit: Optional[int] = None,                # Max bars to return
        adjustment: Optional[Adjustment] = None,    # Corporate action adjustment
        feed: Optional[DataFeed] = None,           # Data feed (IEX, SIP, etc.)
        sort: Optional[Sort] = None,               # ASC or DESC
        currency: Optional[SupportedCurrencies] = None  # Currency (defaults to USD)
    ):

Adjustment Enum

from alpaca.data.enums import Adjustment

class Adjustment(str, Enum):
    RAW = "raw"           # No adjustments
    SPLIT = "split"       # Split-adjusted only
    DIVIDEND = "dividend" # Dividend-adjusted only  
    ALL = "all"          # Both split and dividend adjusted

DataFeed Enum

from alpaca.data.enums import DataFeed

class DataFeed(str, Enum):
    IEX = "iex"          # IEX data feed
    SIP = "sip"          # Securities Information Processor (premium)
    OTC = "otc"          # Over-the-counter data
    ERSX = "ersx"        # ERSX data feed
    MEMX = "memx"        # Members Exchange
    OPRA = "opra"        # Options Price Reporting Authority
    INDICATIVE = "indicative"  # Indicative pricing

Usage Examples:

from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from alpaca.data.enums import Adjustment, DataFeed
from datetime import datetime, timedelta

# Get 1-minute bars for single stock
bars_request = StockBarsRequest(
    symbol_or_symbols="AAPL",
    timeframe=TimeFrame.Minute,
    start=datetime.now() - timedelta(hours=6),
    end=datetime.now()
)
bars = data_client.get_stock_bars(bars_request)

# Get daily bars for multiple stocks with adjustments
bars_request = StockBarsRequest(
    symbol_or_symbols=["AAPL", "TSLA", "MSFT", "GOOGL"],
    timeframe=TimeFrame.Day,
    start=datetime(2023, 1, 1),
    end=datetime(2023, 12, 31),
    adjustment=Adjustment.ALL,
    feed=DataFeed.SIP
)
bars = data_client.get_stock_bars(bars_request)

# Get 5-minute bars with limit
bars_request = StockBarsRequest(
    symbol_or_symbols="SPY",
    timeframe=TimeFrame(5, TimeFrameUnit.Minute),
    limit=1000,
    sort=Sort.DESC  # Most recent first
)
bars = data_client.get_stock_bars(bars_request)

Bar Model

from alpaca.data.models import Bar

class Bar:
    """OHLCV bar data for a single time period."""
    
    symbol: str              # Stock symbol
    timestamp: datetime      # Bar timestamp (UTC)
    open: float             # Opening price
    high: float             # High price
    low: float              # Low price  
    close: float            # Closing price
    volume: int             # Trading volume
    trade_count: Optional[int]  # Number of trades
    vwap: Optional[float]      # Volume-weighted average price

BarSet Model

from alpaca.data.models import BarSet

class BarSet:
    """Collection of bars with time-series functionality."""
    
    # Dictionary access: bars["AAPL"] returns List[Bar]
    # Pandas integration: bars.df returns DataFrame
    # Time-series operations available
    
    def __getitem__(self, symbol: str) -> List[Bar]:
        """Get bars for specific symbol."""
    
    @property
    def df(self) -> pd.DataFrame:
        """Convert to pandas DataFrame with MultiIndex."""
    
    @property
    def symbols(self) -> List[str]:
        """Get all symbols in the bar set."""

Working with Bar Data:

# Access bars by symbol
bars = data_client.get_stock_bars(bars_request)
aapl_bars = bars["AAPL"]

print(f"Retrieved {len(aapl_bars)} bars for AAPL")
for bar in aapl_bars[-5:]:  # Last 5 bars
    print(f"{bar.timestamp}: O:{bar.open} H:{bar.high} L:{bar.low} C:{bar.close} V:{bar.volume}")

# Convert to pandas for analysis
df = bars.df
print(df.head())
print(f"Symbols: {bars.symbols}")

# Calculate returns
df['returns'] = df.groupby('symbol')['close'].pct_change()
print(f"Average daily return: {df.groupby('symbol')['returns'].mean()}")

Quotes (Bid/Ask Data)

get_stock_quotes()

def get_stock_quotes(
    self, 
    request: StockQuotesRequest
) -> Union[QuoteSet, RawData]:
    """
    Get historical quote data (bid/ask) for stocks.
    
    Args:
        request: Quote request parameters
        
    Returns:
        QuoteSet: Time-series quote data
    """

StockQuotesRequest

from alpaca.data.requests import StockQuotesRequest

class StockQuotesRequest(BaseTimeseriesDataRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols
        start: Optional[datetime] = None,           # Start time (UTC)
        end: Optional[datetime] = None,             # End time (UTC)  
        limit: Optional[int] = None,                # Max quotes to return
        feed: Optional[DataFeed] = None,           # Data feed
        sort: Optional[Sort] = None,               # Sort order
        currency: Optional[SupportedCurrencies] = None
    ):

Quote Model

from alpaca.data.models import Quote

class Quote:
    """Bid/ask quote data."""
    
    symbol: str              # Stock symbol
    timestamp: datetime      # Quote timestamp (UTC)
    bid_exchange: str        # Bid exchange code
    bid_price: float         # Bid price
    bid_size: int           # Bid size (shares)
    ask_exchange: str        # Ask exchange code  
    ask_price: float         # Ask price
    ask_size: int           # Ask size (shares)
    conditions: Optional[List[str]]  # Quote conditions/flags

Usage Examples:

# Get recent quotes
quotes_request = StockQuotesRequest(
    symbol_or_symbols="AAPL",
    start=datetime.now() - timedelta(hours=1),
    limit=1000
)
quotes = data_client.get_stock_quotes(quotes_request)

# Analyze bid-ask spread
aapl_quotes = quotes["AAPL"]
for quote in aapl_quotes[-10:]:
    spread = quote.ask_price - quote.bid_price
    spread_pct = (spread / quote.ask_price) * 100
    print(f"{quote.timestamp}: Bid ${quote.bid_price} Ask ${quote.ask_price} Spread {spread_pct:.3f}%")

Trades (Individual Transactions)

get_stock_trades()

def get_stock_trades(
    self, 
    request: StockTradesRequest
) -> Union[TradeSet, RawData]:
    """
    Get historical trade data for stocks.
    
    Args:
        request: Trade request parameters
        
    Returns:
        TradeSet: Time-series trade execution data
    """

StockTradesRequest

from alpaca.data.requests import StockTradesRequest

class StockTradesRequest(BaseTimeseriesDataRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols
        start: Optional[datetime] = None,           # Start time (UTC)
        end: Optional[datetime] = None,             # End time (UTC)
        limit: Optional[int] = None,                # Max trades to return  
        feed: Optional[DataFeed] = None,           # Data feed
        sort: Optional[Sort] = None,               # Sort order
        currency: Optional[SupportedCurrencies] = None
    ):

Trade Model

from alpaca.data.models import Trade

class Trade:
    """Individual trade execution data."""
    
    symbol: str              # Stock symbol
    timestamp: datetime      # Trade timestamp (UTC)
    exchange: str            # Exchange code
    price: float            # Trade price
    size: int               # Trade size (shares)
    trade_id: Optional[str]  # Unique trade ID
    conditions: Optional[List[str]]  # Trade conditions/flags

Usage Examples:

# Get recent trades for analysis
trades_request = StockTradesRequest(
    symbol_or_symbols="TSLA",
    start=datetime.now() - timedelta(minutes=30),
    limit=5000
)
trades = data_client.get_stock_trades(trades_request)

# Analyze trade activity
tsla_trades = trades["TSLA"]
total_volume = sum(trade.size for trade in tsla_trades)
vwap = sum(trade.price * trade.size for trade in tsla_trades) / total_volume

print(f"Trades: {len(tsla_trades)}, Volume: {total_volume:,}, VWAP: ${vwap:.2f}")

Latest Market Data

get_stock_latest_bar()

def get_stock_latest_bar(
    self, 
    request: StockLatestBarRequest
) -> Union[Dict[str, Bar], RawData]:
    """
    Get latest bar for stocks.
    
    Args:
        request: Latest bar request
        
    Returns:
        Dict[str, Bar]: Latest bar for each symbol
    """

get_stock_latest_quote()

def get_stock_latest_quote(
    self, 
    request: StockLatestQuoteRequest
) -> Union[Dict[str, Quote], RawData]:
    """
    Get latest quote for stocks.
    
    Args:
        request: Latest quote request
        
    Returns:
        Dict[str, Quote]: Latest quote for each symbol  
    """

get_stock_latest_trade()

def get_stock_latest_trade(
    self, 
    request: StockLatestTradeRequest
) -> Union[Dict[str, Trade], RawData]:
    """
    Get latest trade for stocks.
    
    Args:
        request: Latest trade request
        
    Returns:
        Dict[str, Trade]: Latest trade for each symbol
    """

Latest Data Request Classes

from alpaca.data.requests import (
    StockLatestBarRequest, StockLatestQuoteRequest, StockLatestTradeRequest
)

class StockLatestBarRequest(NonEmptyRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols
        feed: Optional[DataFeed] = None,           # Data feed
        currency: Optional[SupportedCurrencies] = None
    ):

class StockLatestQuoteRequest(NonEmptyRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols
        feed: Optional[DataFeed] = None,           # Data feed  
        currency: Optional[SupportedCurrencies] = None
    ):

class StockLatestTradeRequest(NonEmptyRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols
        feed: Optional[DataFeed] = None,           # Data feed
        currency: Optional[SupportedCurrencies] = None
    ):

Usage Examples:

# Get latest bars for portfolio monitoring
latest_bars = data_client.get_stock_latest_bar(
    StockLatestBarRequest(
        symbol_or_symbols=["AAPL", "TSLA", "MSFT", "GOOGL"],
        feed=DataFeed.IEX
    )
)

for symbol, bar in latest_bars.items():
    print(f"{symbol}: ${bar.close} (Vol: {bar.volume:,})")

# Get latest quotes for order placement
latest_quotes = data_client.get_stock_latest_quote(
    StockLatestQuoteRequest(symbol_or_symbols="AAPL")
)

aapl_quote = latest_quotes["AAPL"]
print(f"AAPL Bid: ${aapl_quote.bid_price} x {aapl_quote.bid_size}")
print(f"AAPL Ask: ${aapl_quote.ask_price} x {aapl_quote.ask_size}")

# Get latest trades for execution analysis  
latest_trades = data_client.get_stock_latest_trade(
    StockLatestTradeRequest(symbol_or_symbols="SPY")
)

spy_trade = latest_trades["SPY"]
print(f"SPY Last: ${spy_trade.price} @ {spy_trade.timestamp}")

Market Snapshots

get_stock_snapshot()

def get_stock_snapshot(
    self, 
    request: StockSnapshotRequest
) -> Union[Dict[str, Snapshot], RawData]:
    """
    Get comprehensive market snapshot for stocks.
    
    Args:
        request: Snapshot request parameters
        
    Returns:
        Dict[str, Snapshot]: Complete market data snapshot for each symbol
    """

StockSnapshotRequest

from alpaca.data.requests import StockSnapshotRequest

class StockSnapshotRequest(NonEmptyRequest):
    def __init__(
        self,
        symbol_or_symbols: Union[str, List[str]],  # Required: symbols
        feed: Optional[DataFeed] = None,           # Data feed
        currency: Optional[SupportedCurrencies] = None
    ):

Snapshot Model

from alpaca.data.models import Snapshot

class Snapshot:
    """Comprehensive market snapshot combining all data types."""
    
    symbol: str                    # Stock symbol
    latest_trade: Optional[Trade]  # Most recent trade
    latest_quote: Optional[Quote]  # Current bid/ask
    minute_bar: Optional[Bar]      # Current minute bar
    daily_bar: Optional[Bar]       # Current daily bar
    prev_daily_bar: Optional[Bar]  # Previous day's bar

Usage Examples:

# Get comprehensive market snapshot
snapshot = data_client.get_stock_snapshot(
    StockSnapshotRequest(
        symbol_or_symbols=["AAPL", "TSLA", "SPY"],
        feed=DataFeed.IEX
    )
)

for symbol, snap in snapshot.items():
    print(f"\n{symbol} Snapshot:")
    
    if snap.latest_trade:
        print(f"  Last Trade: ${snap.latest_trade.price} @ {snap.latest_trade.timestamp}")
    
    if snap.latest_quote:
        spread = snap.latest_quote.ask_price - snap.latest_quote.bid_price
        print(f"  Quote: ${snap.latest_quote.bid_price} x ${snap.latest_quote.ask_price} (${spread:.2f})")
    
    if snap.daily_bar and snap.prev_daily_bar:
        daily_change = snap.daily_bar.close - snap.prev_daily_bar.close
        daily_change_pct = (daily_change / snap.prev_daily_bar.close) * 100
        print(f"  Daily: ${snap.daily_bar.close} ({daily_change_pct:+.2f}%)")
        print(f"  Volume: {snap.daily_bar.volume:,}")

Cryptocurrency Data

The cryptocurrency clients follow the same patterns as stock data with crypto-specific symbols and features.

CryptoHistoricalDataClient Methods

from alpaca.data.historical import CryptoHistoricalDataClient
from alpaca.data.requests import (
    CryptoBarsRequest, CryptoQuotesRequest, CryptoTradesRequest,
    CryptoLatestBarRequest, CryptoLatestQuoteRequest, CryptoLatestTradeRequest,
    CryptoSnapshotRequest, CryptoLatestOrderbookRequest
)

crypto_client = CryptoHistoricalDataClient(api_key="key", secret_key="secret")

# Same methods as stock client but with crypto request types
crypto_client.get_crypto_bars(CryptoBarsRequest(...))
crypto_client.get_crypto_quotes(CryptoQuotesRequest(...))
crypto_client.get_crypto_trades(CryptoTradesRequest(...))
crypto_client.get_crypto_latest_bar(CryptoLatestBarRequest(...))
crypto_client.get_crypto_latest_quote(CryptoLatestQuoteRequest(...))
crypto_client.get_crypto_latest_trade(CryptoLatestTradeRequest(...))
crypto_client.get_crypto_snapshot(CryptoSnapshotRequest(...))

# Crypto-specific: order book data
crypto_client.get_crypto_latest_orderbook(CryptoLatestOrderbookRequest(...))

Crypto Symbols and Usage

# Crypto symbols use / format: BASE/QUOTE
crypto_symbols = ["BTC/USD", "ETH/USD", "LTC/USD", "BCH/USD", "DOGE/USD"]

# Get crypto bars
crypto_bars = crypto_client.get_crypto_bars(
    CryptoBarsRequest(
        symbol_or_symbols=crypto_symbols,
        timeframe=TimeFrame.Hour,
        start=datetime.now() - timedelta(days=7)
    )
)

# Get latest crypto quotes  
crypto_quotes = crypto_client.get_crypto_latest_quote(
    CryptoLatestQuoteRequest(symbol_or_symbols="BTC/USD")
)

btc_quote = crypto_quotes["BTC/USD"]
print(f"BTC/USD: ${btc_quote.bid_price:.2f} / ${btc_quote.ask_price:.2f}")

Crypto Order Book Data

from alpaca.data.requests import CryptoLatestOrderbookRequest

# Get order book depth
orderbook = crypto_client.get_crypto_latest_orderbook(
    CryptoLatestOrderbookRequest(symbol_or_symbols="BTC/USD")
)

btc_book = orderbook["BTC/USD"] 
print("Top 5 Bids:")
for bid in btc_book.bids[:5]:
    print(f"  ${bid.price:.2f} x {bid.size}")

print("Top 5 Asks:")  
for ask in btc_book.asks[:5]:
    print(f"  ${ask.price:.2f} x {ask.size}")

Options Data

OptionHistoricalDataClient Methods

from alpaca.data.historical import OptionHistoricalDataClient
from alpaca.data.requests import (
    OptionBarsRequest, OptionTradesRequest,
    OptionLatestQuoteRequest, OptionLatestTradeRequest,
    OptionSnapshotRequest, OptionChainRequest
)

options_client = OptionHistoricalDataClient(api_key="key", secret_key="secret")

# Options data methods  
options_client.get_option_bars(OptionBarsRequest(...))
options_client.get_option_trades(OptionTradesRequest(...))
options_client.get_option_latest_quote(OptionLatestQuoteRequest(...))
options_client.get_option_latest_trade(OptionLatestTradeRequest(...))  
options_client.get_option_snapshot(OptionSnapshotRequest(...))

# Options-specific: option chain data
options_client.get_option_chain(OptionChainRequest(...))

Option Data Usage

# Option symbols use OCC format: AAPL230317C00150000
# Format: [SYMBOL][YYMMDD][C/P][STRIKE_PRICE_PADDED]

# Get option bars
option_bars = options_client.get_option_bars(
    OptionBarsRequest(
        symbol_or_symbols="AAPL230317C00150000",  # AAPL $150 Call expiring 3/17/23
        timeframe=TimeFrame.Minute,
        start=datetime.now() - timedelta(hours=6)
    )
)

# Get option chain for underlying
option_chain = options_client.get_option_chain(
    OptionChainRequest(
        underlying_symbol="AAPL",
        expiration_date=date(2023, 3, 17)
    )
)

print("AAPL Option Chain 3/17/23:")
for contract in option_chain.options:
    print(f"{contract.symbol}: ${contract.strike_price} {contract.type}")

News Data

News Client

from alpaca.data.historical import NewsClient
from alpaca.data.requests import NewsRequest

news_client = NewsClient(api_key="key", secret_key="secret")

def get_news(self, request: NewsRequest) -> Union[NewsSet, RawData]:
    """
    Get news articles with filtering.
    
    Args:
        request: News query parameters
        
    Returns:
        NewsSet: Collection of news articles
    """

NewsRequest

from alpaca.data.requests import NewsRequest

class NewsRequest(NonEmptyRequest):
    def __init__(
        self,
        symbols: Optional[Union[str, List[str]]] = None,     # Filter by symbols
        start: Optional[datetime] = None,                    # Start time
        end: Optional[datetime] = None,                      # End time
        sort: Optional[Sort] = None,                         # Sort order
        include_content: Optional[bool] = None,              # Include article body
        exclude_contentless: Optional[bool] = None,          # Exclude articles without content
        limit: Optional[int] = None                          # Max articles to return
    ):

News Model

from alpaca.data.models import News

class News:
    """News article information."""
    
    id: str                      # Article ID
    headline: str                # Article headline
    author: Optional[str]        # Article author
    created_at: datetime         # Publication timestamp
    updated_at: datetime         # Last update timestamp
    summary: Optional[str]       # Article summary
    content: Optional[str]       # Full article content (if requested)
    symbols: List[str]           # Related symbols
    url: Optional[str]          # Source URL
    images: Optional[List[dict]] # Article images

Usage Examples:

# Get recent news for specific stocks
news = news_client.get_news(
    NewsRequest(
        symbols=["AAPL", "TSLA", "MSFT"],
        start=datetime.now() - timedelta(hours=24),
        limit=50,
        include_content=True
    )
)

print(f"Found {len(news.news)} articles")
for article in news.news[:5]:
    print(f"\n{article.headline}")
    print(f"Symbols: {', '.join(article.symbols)}")
    print(f"Published: {article.created_at}")
    if article.summary:
        print(f"Summary: {article.summary[:200]}...")

# Get all market news (no symbol filter)
market_news = news_client.get_news(
    NewsRequest(
        start=datetime.now() - timedelta(hours=6),
        sort=Sort.DESC,
        limit=20
    )
)

Market Screening

ScreenerClient

from alpaca.data.historical import ScreenerClient
from alpaca.data.requests import MostActivesRequest, MarketMoversRequest

screener_client = ScreenerClient(api_key="key", secret_key="secret")

def get_most_actives(
    self, 
    request: MostActivesRequest
) -> Union[MostActives, RawData]:
    """
    Get most active stocks by volume or trade count.
    
    Args:
        request: Most actives query parameters
        
    Returns:
        MostActives: Most active stocks data
    """

def get_market_movers(
    self, 
    request: MarketMoversRequest  
) -> Union[Movers, RawData]:
    """
    Get top gaining and losing stocks.
    
    Args:
        request: Market movers query parameters
        
    Returns:
        Movers: Top movers data with gainers and losers
    """

Screener Request Classes

from alpaca.data.requests import MostActivesRequest, MarketMoversRequest
from alpaca.data.enums import MostActivesBy, MarketType

class MostActivesRequest(NonEmptyRequest):
    def __init__(
        self,
        by: MostActivesBy,                    # VOLUME or TRADE_COUNT
        top: int,                            # Number of stocks to return (max 50)  
        market_type: Optional[MarketType] = None  # STOCKS (default)
    ):

class MarketMoversRequest(NonEmptyRequest):
    def __init__(
        self,
        top: int,                            # Number of gainers/losers (max 50)
        market_type: Optional[MarketType] = None  # STOCKS (default)
    ):

Usage Examples:

from alpaca.data.enums import MostActivesBy

# Get most active stocks by volume
most_active = screener_client.get_most_actives(
    MostActivesRequest(by=MostActivesBy.VOLUME, top=20)
)

print("Most Active Stocks by Volume:")
for stock in most_active.most_actives:
    print(f"{stock.symbol}: {stock.volume:,} shares, ${stock.price:.2f}")

# Get top movers (gainers and losers)
movers = screener_client.get_market_movers(
    MarketMoversRequest(top=10)
)

print("\nTop Gainers:")
for gainer in movers.gainers:
    print(f"{gainer.symbol}: +{gainer.percent_change:.2f}% (${gainer.price:.2f})")

print("\nTop Losers:")  
for loser in movers.losers:
    print(f"{loser.symbol}: {loser.percent_change:.2f}% (${loser.price:.2f})")

Real-time Data Streaming

Stock Data Stream

from alpaca.data.live import StockDataStream

class StockDataStream:
    def __init__(
        self,
        api_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        raw_data: bool = False,
        feed: Optional[DataFeed] = None,        # IEX, SIP, etc.
        websocket_params: Optional[dict] = None,
        url_override: Optional[str] = None
    ):
        """
        Initialize stock data stream for real-time data.
        
        Args:
            api_key: API key for authentication
            secret_key: Secret key for authentication
            raw_data: Return raw data instead of models
            feed: Data feed selection
            websocket_params: WebSocket configuration
            url_override: Override WebSocket URL
        """
    
    def subscribe_bars(self, handler: Callable, *symbols) -> None:
        """Subscribe to real-time bar updates."""
    
    def subscribe_quotes(self, handler: Callable, *symbols) -> None:
        """Subscribe to real-time quote updates."""
    
    def subscribe_trades(self, handler: Callable, *symbols) -> None:
        """Subscribe to real-time trade updates."""
    
    def subscribe_updated_bars(self, handler: Callable, *symbols) -> None:
        """Subscribe to minute bar updates."""
    
    def subscribe_daily_bars(self, handler: Callable, *symbols) -> None:
        """Subscribe to daily bar updates."""
    
    def subscribe_statuses(self, handler: Callable, *symbols) -> None:
        """Subscribe to trading status updates."""
    
    def subscribe_lulds(self, handler: Callable, *symbols) -> None:
        """Subscribe to limit up/limit down updates."""
    
    def subscribe_corrections(self, handler: Callable, *symbols) -> None:
        """Subscribe to trade corrections."""
    
    def subscribe_cancellations(self, handler: Callable, *symbols) -> None:
        """Subscribe to trade cancellations."""
    
    async def run(self) -> None:
        """Start the WebSocket connection and begin streaming."""

Real-time Data Example

import asyncio
from alpaca.data.live import StockDataStream
from alpaca.data.enums import DataFeed

# Initialize stream
stream = StockDataStream(
    api_key="your-api-key",
    secret_key="your-secret-key",
    feed=DataFeed.IEX
)

# Define data handlers
async def trade_handler(data):
    """Handle real-time trade data."""
    print(f"TRADE {data.symbol}: ${data.price} x {data.size} @ {data.timestamp}")

async def quote_handler(data):
    """Handle real-time quote data."""
    spread = data.ask_price - data.bid_price
    print(f"QUOTE {data.symbol}: ${data.bid_price} x {data.ask_price} (${spread:.2f})")

async def bar_handler(data):
    """Handle real-time bar data."""  
    print(f"BAR {data.symbol}: O:${data.open} H:${data.high} L:${data.low} C:${data.close} V:{data.volume}")

# Subscribe to data streams
symbols = ["AAPL", "TSLA", "SPY", "QQQ"]

stream.subscribe_trades(trade_handler, *symbols)
stream.subscribe_quotes(quote_handler, *symbols) 
stream.subscribe_updated_bars(bar_handler, *symbols)

# Start streaming (this will run indefinitely)
asyncio.run(stream.run())

Crypto Data Stream

from alpaca.data.live import CryptoDataStream

# Similar interface to stock stream but for crypto symbols
crypto_stream = CryptoDataStream(
    api_key="your-api-key",
    secret_key="your-secret-key"
)

# Subscribe to crypto data
crypto_symbols = ["BTC/USD", "ETH/USD", "DOGE/USD"]
crypto_stream.subscribe_trades(trade_handler, *crypto_symbols)
crypto_stream.subscribe_quotes(quote_handler, *crypto_symbols)
crypto_stream.subscribe_bars(bar_handler, *crypto_symbols)

# Crypto-specific: order book updates
async def orderbook_handler(data):
    print(f"ORDERBOOK {data.symbol}: {len(data.bids)} bids, {len(data.asks)} asks")

crypto_stream.subscribe_orderbooks(orderbook_handler, "BTC/USD")

Option Data Stream

from alpaca.data.live import OptionDataStream

# Stream real-time options data
option_stream = OptionDataStream(
    api_key="your-api-key",
    secret_key="your-secret-key"
)

async def option_trade_handler(data):
    """Handle real-time option trade data."""
    print(f"OPTION TRADE {data.symbol}: ${data.price} x {data.size}")

async def option_quote_handler(data):
    """Handle real-time option quote data."""
    print(f"OPTION QUOTE {data.symbol}: ${data.bid_price} x ${data.ask_price}")

# Subscribe to option data
option_symbols = ["AAPL230317C00150000", "TSLA230317P00200000"]
option_stream.subscribe_trades(option_trade_handler, *option_symbols)
option_stream.subscribe_quotes(option_quote_handler, *option_symbols)

News Data Stream

from alpaca.data.live import NewsDataStream

# Stream real-time news
news_stream = NewsDataStream(
    api_key="your-api-key", 
    secret_key="your-secret-key"
)

async def news_handler(data):
    """Handle real-time news."""
    print(f"NEWS: {data.headline}")
    print(f"Symbols: {', '.join(data.symbols)}")
    print(f"Time: {data.created_at}")

# Subscribe to news for specific symbols
news_stream.subscribe_news(news_handler, "AAPL", "TSLA", "MSFT")

# Or subscribe to all news (no symbol filter)
news_stream.subscribe_news(news_handler)

Advanced Streaming Example

import asyncio
from datetime import datetime
from collections import defaultdict
from alpaca.data.live import StockDataStream, NewsDataStream
from alpaca.data.enums import DataFeed

class RealTimeAnalyzer:
    def __init__(self, api_key: str, secret_key: str):
        # Initialize data streams
        self.data_stream = StockDataStream(
            api_key=api_key,
            secret_key=secret_key,
            feed=DataFeed.IEX
        )
        
        self.news_stream = NewsDataStream(
            api_key=api_key,
            secret_key=secret_key  
        )
        
        # Data storage
        self.latest_prices = {}
        self.trade_volumes = defaultdict(int)
        self.price_alerts = {
            "AAPL": {"above": 180.00, "below": 170.00},
            "TSLA": {"above": 250.00, "below": 200.00}
        }
        
        # Subscribe to streams
        self.setup_subscriptions()
    
    def setup_subscriptions(self):
        """Set up data stream subscriptions."""
        symbols = ["AAPL", "TSLA", "SPY", "QQQ", "MSFT"]
        
        # Market data subscriptions
        self.data_stream.subscribe_trades(self.handle_trade, *symbols)
        self.data_stream.subscribe_quotes(self.handle_quote, *symbols)
        self.data_stream.subscribe_updated_bars(self.handle_bar, *symbols)
        
        # News subscriptions
        self.news_stream.subscribe_news(self.handle_news, *symbols)
    
    async def handle_trade(self, trade):
        """Process real-time trade data."""
        symbol = trade.symbol
        self.latest_prices[symbol] = trade.price
        self.trade_volumes[symbol] += trade.size
        
        # Check price alerts
        if symbol in self.price_alerts:
            alerts = self.price_alerts[symbol]
            if trade.price > alerts["above"]:
                print(f"🚀 {symbol} ABOVE ${alerts['above']}: ${trade.price}")
            elif trade.price < alerts["below"]:
                print(f"📉 {symbol} BELOW ${alerts['below']}: ${trade.price}")
        
        # Print large trades
        if trade.size >= 10000:
            print(f"🐋 LARGE TRADE {symbol}: ${trade.price} x {trade.size:,}")
    
    async def handle_quote(self, quote):
        """Process real-time quote data."""
        spread = quote.ask_price - quote.bid_price
        spread_pct = (spread / quote.ask_price) * 100
        
        # Alert on wide spreads
        if spread_pct > 0.5:  # 0.5% spread
            print(f"⚠️  WIDE SPREAD {quote.symbol}: {spread_pct:.2f}%")
    
    async def handle_bar(self, bar):
        """Process real-time bar updates."""
        # Calculate intraday returns
        if bar.symbol in self.latest_prices:
            prev_price = self.latest_prices.get(f"{bar.symbol}_prev", bar.open)
            change_pct = ((bar.close - prev_price) / prev_price) * 100
            
            if abs(change_pct) > 2.0:  # 2% move
                direction = "UP" if change_pct > 0 else "DOWN"
                print(f"📊 {bar.symbol} {direction} {change_pct:+.2f}% - ${bar.close} (Vol: {bar.volume:,})")
        
        self.latest_prices[f"{bar.symbol}_prev"] = bar.close
    
    async def handle_news(self, news):
        """Process real-time news."""
        print(f"📰 NEWS: {news.headline}")
        print(f"   Symbols: {', '.join(news.symbols)}")
        print(f"   Time: {news.created_at}")
        
        # Check for important keywords
        headline_lower = news.headline.lower()
        important_keywords = ["earnings", "merger", "acquisition", "fda approval", "bankruptcy"]
        
        for keyword in important_keywords:
            if keyword in headline_lower:
                print(f"🔥 IMPORTANT: {keyword.upper()} mentioned in news!")
                break
    
    async def print_summary(self):
        """Print periodic summary."""
        while True:
            await asyncio.sleep(60)  # Every minute
            
            print(f"\n=== MARKET SUMMARY ({datetime.now().strftime('%H:%M:%S')}) ===")
            for symbol, price in self.latest_prices.items():
                if not symbol.endswith("_prev"):
                    volume = self.trade_volumes.get(symbol, 0)
                    print(f"{symbol}: ${price:.2f} (Vol: {volume:,})")
            print("=" * 50)
    
    async def run(self):
        """Start the real-time analyzer."""
        print("🚀 Starting real-time market analyzer...")
        
        # Start data streams
        data_task = asyncio.create_task(self.data_stream.run())
        news_task = asyncio.create_task(self.news_stream.run())
        summary_task = asyncio.create_task(self.print_summary())
        
        try:
            # Run all tasks concurrently
            await asyncio.gather(data_task, news_task, summary_task)
        except KeyboardInterrupt:
            print("\n👋 Shutting down analyzer...")
            data_task.cancel()
            news_task.cancel()
            summary_task.cancel()

# Run the analyzer
if __name__ == "__main__":
    analyzer = RealTimeAnalyzer(
        api_key="your-api-key",
        secret_key="your-secret-key"
    )
    
    asyncio.run(analyzer.run())

Data Processing Utilities

Converting to Pandas DataFrames

import pandas as pd

# BarSet, QuoteSet, TradeSet all have .df property
bars = data_client.get_stock_bars(bars_request)
df = bars.df

# DataFrame has MultiIndex: (timestamp, symbol)
print(df.index.names)  # ['timestamp', 'symbol']
print(df.columns)      # ['open', 'high', 'low', 'close', 'volume', ...]

# Access data for specific symbol
aapl_data = df.xs('AAPL', level='symbol')
print(aapl_data.tail())

# Pivot for time-series analysis
pivot_df = df.reset_index().pivot(index='timestamp', columns='symbol', values='close')
print(pivot_df.head())

# Calculate returns
returns = pivot_df.pct_change().dropna()
print(f"Correlations:\n{returns.corr()}")

Data Export and Storage

# Export to CSV
bars_df = bars.df
bars_df.to_csv('market_data.csv')

# Export to Parquet (more efficient for large datasets)
bars_df.to_parquet('market_data.parquet')

# Store in database (example with SQLite)
import sqlite3

conn = sqlite3.connect('market_data.db')
bars_df.to_sql('stock_bars', conn, if_exists='append', index=True)
conn.close()

# Export individual symbols
for symbol in bars.symbols:
    symbol_bars = bars[symbol]
    symbol_df = pd.DataFrame([{
        'timestamp': bar.timestamp,
        'open': bar.open,
        'high': bar.high, 
        'low': bar.low,
        'close': bar.close,
        'volume': bar.volume
    } for bar in symbol_bars])
    symbol_df.to_csv(f'{symbol}_bars.csv', index=False)

Common Types and Base Classes

Base Request Types

from alpaca.common.requests import NonEmptyRequest
from alpaca.common.enums import Sort, SupportedCurrencies

class NonEmptyRequest:
    """Base class for API request objects with validation."""

class Sort(str, Enum):
    ASC = "asc"      # Ascending order
    DESC = "desc"    # Descending order

class SupportedCurrencies(str, Enum):
    USD = "USD"      # US Dollar (default)
    GBP = "GBP"      # British Pound
    CHF = "CHF"      # Swiss Franc
    EUR = "EUR"      # Euro
    CAD = "CAD"      # Canadian Dollar

Error Handling and Rate Limits

from alpaca.common.exceptions import APIError
import time

def robust_data_fetch(client, request, max_retries=3):
    """Fetch data with error handling and retries."""
    
    for attempt in range(max_retries):
        try:
            return client.get_stock_bars(request)
            
        except APIError as e:
            if e.status_code == 429:  # Rate limit
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1})")
                time.sleep(wait_time)
                continue
            elif e.status_code == 422:  # Invalid parameters
                print(f"Invalid request parameters: {e.message}")
                break
            else:
                print(f"API error: {e.message} (status: {e.status_code})")
                if attempt == max_retries - 1:
                    raise
                time.sleep(1)
        except Exception as e:
            print(f"Unexpected error: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(1)
    
    return None

# Usage
bars = robust_data_fetch(data_client, bars_request)
if bars:
    print(f"Successfully retrieved {len(bars.df)} bars")
else:
    print("Failed to retrieve data after all retries")

Complete Market Data Analysis Example

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest, StockLatestQuoteRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import Adjustment

class MarketAnalyzer:
    def __init__(self, api_key: str, secret_key: str):
        self.client = StockHistoricalDataClient(api_key, secret_key)
        
    def get_historical_data(self, symbols: list, days: int = 30) -> pd.DataFrame:
        """Get historical daily data for analysis."""
        
        request = StockBarsRequest(
            symbol_or_symbols=symbols,
            timeframe=TimeFrame.Day,
            start=datetime.now() - timedelta(days=days),
            adjustment=Adjustment.ALL
        )
        
        bars = self.client.get_stock_bars(request)
        df = bars.df.reset_index()
        
        # Pivot for analysis
        pivot_df = df.pivot(index='timestamp', columns='symbol', values='close')
        return pivot_df.fillna(method='forward')
    
    def calculate_metrics(self, df: pd.DataFrame) -> dict:
        """Calculate key financial metrics."""
        
        # Daily returns
        returns = df.pct_change().dropna()
        
        # Metrics calculation
        metrics = {}
        
        for symbol in df.columns:
            symbol_returns = returns[symbol].dropna()
            
            metrics[symbol] = {
                'total_return': (df[symbol].iloc[-1] / df[symbol].iloc[0] - 1) * 100,
                'volatility': symbol_returns.std() * np.sqrt(252) * 100,  # Annualized
                'sharpe_ratio': (symbol_returns.mean() / symbol_returns.std()) * np.sqrt(252),
                'max_drawdown': self.calculate_max_drawdown(df[symbol]),
                'current_price': df[symbol].iloc[-1]
            }
        
        return metrics
    
    def calculate_max_drawdown(self, price_series: pd.Series) -> float:
        """Calculate maximum drawdown."""
        peak = price_series.expanding().max()
        drawdown = (price_series - peak) / peak
        return drawdown.min() * 100
    
    def get_correlation_matrix(self, df: pd.DataFrame) -> pd.DataFrame:
        """Calculate correlation matrix."""
        returns = df.pct_change().dropna()
        return returns.corr()
    
    def screen_stocks(self, symbols: list) -> pd.DataFrame:
        """Screen stocks based on various criteria."""
        
        # Get current quotes
        quotes = self.client.get_stock_latest_quote(
            StockLatestQuoteRequest(symbol_or_symbols=symbols)
        )
        
        # Get historical data for calculations
        df = self.get_historical_data(symbols, days=90)
        metrics = self.calculate_metrics(df)
        
        # Build screening results
        screening_data = []
        
        for symbol in symbols:
            if symbol in quotes and symbol in metrics:
                quote = quotes[symbol]
                metric = metrics[symbol]
                
                screening_data.append({
                    'symbol': symbol,
                    'price': metric['current_price'],
                    'bid': quote.bid_price,
                    'ask': quote.ask_price,
                    'spread_pct': ((quote.ask_price - quote.bid_price) / quote.ask_price) * 100,
                    'total_return_pct': metric['total_return'],
                    'volatility_pct': metric['volatility'],
                    'sharpe_ratio': metric['sharpe_ratio'],
                    'max_drawdown_pct': metric['max_drawdown']
                })
        
        return pd.DataFrame(screening_data).set_index('symbol')
    
    def generate_report(self, symbols: list) -> None:
        """Generate comprehensive market report."""
        
        print("📊 MARKET ANALYSIS REPORT")
        print("=" * 50)
        
        # Get data and metrics
        df = self.get_historical_data(symbols, days=60)
        metrics = self.calculate_metrics(df)
        correlation = self.get_correlation_matrix(df)
        screening = self.screen_stocks(symbols)
        
        # Performance summary
        print("\n📈 PERFORMANCE SUMMARY (60 Days)")
        print("-" * 30)
        
        perf_df = pd.DataFrame({
            symbol: {
                'Return %': f"{metrics[symbol]['total_return']:.2f}%",
                'Volatility %': f"{metrics[symbol]['volatility']:.2f}%", 
                'Sharpe': f"{metrics[symbol]['sharpe_ratio']:.2f}",
                'Max DD %': f"{metrics[symbol]['max_drawdown']:.2f}%"
            } for symbol in symbols if symbol in metrics
        }).T
        
        print(perf_df)
        
        # Top performers
        print("\n🏆 TOP PERFORMERS")
        print("-" * 20)
        
        sorted_by_return = screening.sort_values('total_return_pct', ascending=False)
        print(sorted_by_return[['price', 'total_return_pct', 'volatility_pct']].head())
        
        # Risk analysis
        print("\n⚠️  RISK ANALYSIS")
        print("-" * 15)
        
        high_vol = screening[screening['volatility_pct'] > 30]
        if not high_vol.empty:
            print(f"High Volatility Stocks (>30%): {', '.join(high_vol.index)}")
        
        wide_spreads = screening[screening['spread_pct'] > 0.5]
        if not wide_spreads.empty:
            print(f"Wide Spreads (>0.5%): {', '.join(wide_spreads.index)}")
        
        # Correlation insights
        print("\n🔗 CORRELATION INSIGHTS") 
        print("-" * 22)
        
        # Find highest correlations (excluding self-correlation)
        corr_matrix = correlation.where(np.triu(np.ones(correlation.shape), k=1).astype(bool))
        high_corr = corr_matrix.stack().sort_values(ascending=False).head(3)
        
        print("Highest Correlations:")
        for (stock1, stock2), corr_val in high_corr.items():
            print(f"  {stock1} - {stock2}: {corr_val:.3f}")
        
        # Investment recommendations
        print("\n💡 INVESTMENT INSIGHTS")
        print("-" * 21)
        
        # Best risk-adjusted returns
        best_sharpe = screening.nlargest(3, 'sharpe_ratio')
        print("Best Risk-Adjusted Returns (Sharpe Ratio):")
        for symbol, data in best_sharpe.iterrows():
            print(f"  {symbol}: {data['sharpe_ratio']:.2f}")
        
        # Momentum stocks
        momentum = screening[(screening['total_return_pct'] > 10) & (screening['volatility_pct'] < 40)]
        if not momentum.empty:
            print(f"\nMomentum Candidates: {', '.join(momentum.index)}")
        
        print("\n" + "=" * 50)
        print(f"Report generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# Run comprehensive analysis
if __name__ == "__main__":
    analyzer = MarketAnalyzer(
        api_key="your-api-key",
        secret_key="your-secret-key"
    )
    
    # Analyze major tech stocks
    tech_stocks = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "NFLX"]
    
    analyzer.generate_report(tech_stocks)

Install with Tessl CLI

npx tessl i tessl/pypi-alpaca-py

docs

broker-client.md

data-client.md

index.md

trading-client.md

tile.json