CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-yfinance

Download market data from Yahoo! Finance API

Overview
Eval results
Files

live-streaming.mddocs/

Real-time Data Streaming

Live financial data streaming using WebSocket connections with both synchronous and asynchronous support. Enable real-time price updates, volume changes, and market data streaming for active trading and monitoring applications.

Capabilities

Synchronous WebSocket Streaming

Real-time data streaming using synchronous WebSocket connections for traditional applications.

class WebSocket:
    def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2", 
                 verbose: bool = True):
        """
        Create a WebSocket connection for real-time data streaming.
        
        Parameters:
        - url: str, WebSocket server URL (default: Yahoo Finance streaming endpoint)
        - verbose: bool, enable verbose logging of connection events
        """
    
    def subscribe(self, symbols: Union[str, List[str]]):
        """
        Subscribe to real-time data for specified symbols.
        
        Parameters:
        - symbols: str or list, ticker symbols to subscribe to
        """
    
    def unsubscribe(self, symbols: Union[str, List[str]]):
        """
        Unsubscribe from real-time data for specified symbols.
        
        Parameters:
        - symbols: str or list, ticker symbols to unsubscribe from
        """
    
    def listen(self, message_handler: Optional[Callable] = None):
        """
        Start listening for real-time data messages.
        
        Parameters:
        - message_handler: function to handle incoming messages
          If None, messages are printed to console
        """
    
    def close(self):
        """
        Close the WebSocket connection.
        """

Context Manager Support

# WebSocket can be used as a context manager
with WebSocket(verbose=True) as ws:
    ws.subscribe(["AAPL", "GOOGL"])
    ws.listen(message_handler=custom_handler)
# Connection automatically closed when exiting context

Usage Examples

import yfinance as yf

# Basic real-time streaming
def handle_price_update(message):
    symbol = message.get('symbol', 'Unknown')
    price = message.get('price', 0)
    print(f"{symbol}: ${price}")

ws = yf.WebSocket(verbose=True)
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
ws.listen(message_handler=handle_price_update)

# Using context manager
with yf.WebSocket() as ws:
    ws.subscribe("AAPL")
    ws.listen()  # Uses default console output

# Managing subscriptions
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL"])  # Initial subscription
ws.subscribe("MSFT")             # Add more symbols
ws.unsubscribe("GOOGL")          # Remove specific symbol
ws.listen(handle_price_update)

Asynchronous WebSocket Streaming

Real-time data streaming using asynchronous WebSocket connections for modern async applications.

class AsyncWebSocket:
    def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2", 
                 verbose: bool = True):
        """
        Create an async WebSocket connection for real-time data streaming.
        
        Parameters:
        - url: str, WebSocket server URL
        - verbose: bool, enable verbose logging
        """
    
    async def subscribe(self, symbols: Union[str, List[str]]):
        """
        Asynchronously subscribe to real-time data.
        
        Parameters:
        - symbols: str or list, ticker symbols to subscribe to
        """
    
    async def unsubscribe(self, symbols: Union[str, List[str]]):
        """
        Asynchronously unsubscribe from real-time data.
        
        Parameters:
        - symbols: str or list, ticker symbols to unsubscribe from
        """
    
    async def listen(self, message_handler: Optional[Callable] = None):
        """
        Asynchronously listen for real-time data messages.
        
        Parameters:
        - message_handler: async function to handle incoming messages
        """
    
    async def close(self):
        """
        Asynchronously close the WebSocket connection.
        """

Async Context Manager Support

# AsyncWebSocket can be used as an async context manager
async with AsyncWebSocket(verbose=True) as ws:
    await ws.subscribe(["AAPL", "GOOGL"])
    await ws.listen(message_handler=async_handler)
# Connection automatically closed when exiting context

Usage Examples

import asyncio
import yfinance as yf

# Basic async streaming
async def async_price_handler(message):
    symbol = message.get('symbol', 'Unknown')
    price = message.get('price', 0)
    print(f"Async: {symbol} -> ${price}")

async def main():
    ws = yf.AsyncWebSocket(verbose=True)
    await ws.subscribe(["AAPL", "GOOGL", "MSFT"])
    await ws.listen(message_handler=async_price_handler)

# Run the async function
asyncio.run(main())

# Using async context manager
async def stream_with_context():
    async with yf.AsyncWebSocket() as ws:
        await ws.subscribe("AAPL")
        await ws.listen()

asyncio.run(stream_with_context())

Ticker-Level Streaming

Start real-time streaming directly from Ticker and Tickers objects.

# Available on Ticker class
def live(self, message_handler: Callable = None, verbose: bool = True):
    """
    Start real-time streaming for this ticker.
    
    Parameters:
    - message_handler: function to handle incoming messages
    - verbose: bool, enable verbose logging
    """

# Available on Tickers class
def live(self, message_handler: Callable = None, verbose: bool = True):
    """
    Start real-time streaming for all tickers in the collection.
    
    Parameters:
    - message_handler: function to handle incoming messages
    - verbose: bool, enable verbose logging
    """

Usage Examples

# Single ticker streaming
ticker = yf.Ticker("AAPL")
ticker.live(message_handler=handle_price_update)

# Multiple ticker streaming
portfolio = yf.Tickers(["AAPL", "GOOGL", "MSFT"])
portfolio.live(message_handler=handle_portfolio_update)

Message Structure and Handling

Message Format

Real-time messages contain various fields depending on the data type:

# Typical message structure
{
    'symbol': 'AAPL',
    'price': 175.43,
    'change': 2.15,
    'changePercent': 1.24,
    'volume': 45672100,
    'timestamp': 1640995200,
    'marketHours': 'REGULAR_MARKET',
    'dayHigh': 176.12,
    'dayLow': 173.78,
    'bid': 175.42,
    'ask': 175.44,
    'bidSize': 100,
    'askSize': 200
}

Advanced Message Handlers

def comprehensive_message_handler(message):
    """Advanced message handler with detailed processing."""
    
    symbol = message.get('symbol', 'Unknown')
    price = message.get('price', 0)
    change = message.get('change', 0)
    volume = message.get('volume', 0)
    timestamp = message.get('timestamp', 0)
    
    # Price movement analysis
    if change > 0:
        direction = "▲"
        color_code = "\033[92m"  # Green
    elif change < 0:
        direction = "▼"
        color_code = "\033[91m"  # Red
    else:
        direction = "→"
        color_code = "\033[93m"  # Yellow
    
    # Format timestamp
    import datetime
    time_str = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
    
    # Display formatted message
    print(f"{color_code}{time_str} {symbol} {direction} ${price:.2f} "
          f"({change:+.2f}) Vol: {volume:,}\033[0m")

# Usage
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
ws.listen(message_handler=comprehensive_message_handler)

Async Message Processing

import asyncio
from collections import deque

class AsyncMessageProcessor:
    def __init__(self):
        self.message_queue = deque()
        self.processing = False
    
    async def handle_message(self, message):
        """Add message to queue for processing."""
        self.message_queue.append(message)
        
        if not self.processing:
            await self.process_messages()
    
    async def process_messages(self):
        """Process queued messages asynchronously."""
        self.processing = True
        
        while self.message_queue:
            message = self.message_queue.popleft()
            
            # Simulate async processing (database write, API call, etc.)
            await asyncio.sleep(0.01)
            
            # Process message
            symbol = message.get('symbol', 'Unknown')
            price = message.get('price', 0)
            print(f"Processed: {symbol} @ ${price}")
        
        self.processing = False

# Usage
async def main():
    processor = AsyncMessageProcessor()
    
    async with yf.AsyncWebSocket() as ws:
        await ws.subscribe(["AAPL", "GOOGL"])
        await ws.listen(message_handler=processor.handle_message)

asyncio.run(main())

Advanced Streaming Patterns

Portfolio Monitoring

class PortfolioMonitor:
    def __init__(self, portfolio_symbols, alerts=None):
        self.portfolio = {symbol: {'price': 0, 'change': 0} for symbol in portfolio_symbols}
        self.alerts = alerts or {}
        self.total_value = 0
    
    def handle_update(self, message):
        symbol = message.get('symbol')
        price = message.get('price', 0)
        change = message.get('change', 0)
        
        if symbol in self.portfolio:
            self.portfolio[symbol].update({'price': price, 'change': change})
            
            # Check alerts
            if symbol in self.alerts:
                self.check_alerts(symbol, price)
            
            # Update portfolio summary
            self.update_portfolio_summary()
    
    def check_alerts(self, symbol, price):
        alerts = self.alerts[symbol]
        
        if 'stop_loss' in alerts and price <= alerts['stop_loss']:
            print(f"🚨 STOP LOSS ALERT: {symbol} @ ${price} (Stop: ${alerts['stop_loss']})")
        
        if 'take_profit' in alerts and price >= alerts['take_profit']:
            print(f"🎯 TAKE PROFIT ALERT: {symbol} @ ${price} (Target: ${alerts['take_profit']})")
    
    def update_portfolio_summary(self):
        total_change = sum(data['change'] for data in self.portfolio.values())
        print(f"Portfolio Update - Total Change: ${total_change:+.2f}")

# Usage
portfolio_symbols = ["AAPL", "GOOGL", "MSFT"]
alerts = {
    "AAPL": {"stop_loss": 160.0, "take_profit": 180.0},
    "GOOGL": {"stop_loss": 90.0, "take_profit": 110.0}
}

monitor = PortfolioMonitor(portfolio_symbols, alerts)
ws = yf.WebSocket()
ws.subscribe(portfolio_symbols)
ws.listen(message_handler=monitor.handle_update)

Market Scanner

class MarketScanner:
    def __init__(self, scan_criteria):
        self.criteria = scan_criteria
        self.matches = []
    
    def scan_message(self, message):
        symbol = message.get('symbol')
        price = message.get('price', 0)
        change_percent = message.get('changePercent', 0)
        volume = message.get('volume', 0)
        
        # Apply scanning criteria
        if self.meets_criteria(message):
            match = {
                'symbol': symbol,
                'price': price,
                'change_percent': change_percent,
                'volume': volume,
                'timestamp': message.get('timestamp', 0)
            }
            
            self.matches.append(match)
            print(f"🔍 SCANNER MATCH: {symbol} - {change_percent:+.2f}% @ ${price}")
    
    def meets_criteria(self, message):
        change_percent = message.get('changePercent', 0)
        volume = message.get('volume', 0)
        
        # Example criteria: Large price movement with high volume
        return (abs(change_percent) > self.criteria.get('min_change_percent', 5) and
                volume > self.criteria.get('min_volume', 1000000))

# Usage
scan_criteria = {
    'min_change_percent': 3.0,  # 3% minimum price change
    'min_volume': 2000000       # 2M minimum volume
}

scanner = MarketScanner(scan_criteria)
watchlist = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA"]

ws = yf.WebSocket()
ws.subscribe(watchlist)
ws.listen(message_handler=scanner.scan_message)

Data Recording and Analysis

import pandas as pd
from datetime import datetime
import sqlite3

class StreamingDataRecorder:
    def __init__(self, db_path="streaming_data.db"):
        self.db_path = db_path
        self.setup_database()
    
    def setup_database(self):
        """Create database table for streaming data."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS streaming_data (
                timestamp INTEGER,
                symbol TEXT,
                price REAL,
                change_amount REAL,
                change_percent REAL,
                volume INTEGER,
                bid REAL,
                ask REAL
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def record_message(self, message):
        """Record streaming message to database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO streaming_data 
            (timestamp, symbol, price, change_amount, change_percent, volume, bid, ask)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            message.get('timestamp', int(datetime.now().timestamp())),
            message.get('symbol', ''),
            message.get('price', 0),
            message.get('change', 0),
            message.get('changePercent', 0),
            message.get('volume', 0),
            message.get('bid', 0),
            message.get('ask', 0)
        ))
        
        conn.commit()
        conn.close()
        
        # Print confirmation
        symbol = message.get('symbol', 'Unknown')
        price = message.get('price', 0)
        print(f"📊 Recorded: {symbol} @ ${price}")
    
    def get_recorded_data(self, symbol=None, hours=1):
        """Retrieve recorded data for analysis."""
        conn = sqlite3.connect(self.db_path)
        
        query = '''
            SELECT * FROM streaming_data 
            WHERE timestamp > ? 
        '''
        params = [int(datetime.now().timestamp()) - (hours * 3600)]
        
        if symbol:
            query += ' AND symbol = ?'
            params.append(symbol)
        
        query += ' ORDER BY timestamp DESC'
        
        df = pd.read_sql_query(query, conn, params=params)
        conn.close()
        
        return df

# Usage
recorder = StreamingDataRecorder()

ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
ws.listen(message_handler=recorder.record_message)

# Later, analyze recorded data
recent_data = recorder.get_recorded_data(symbol="AAPL", hours=2)
print(recent_data.describe())

Error Handling and Reconnection

Robust Connection Management

import time
import logging

class RobustWebSocketClient:
    def __init__(self, symbols, message_handler, max_retries=5):
        self.symbols = symbols
        self.message_handler = message_handler
        self.max_retries = max_retries
        self.retry_count = 0
        self.ws = None
    
    def connect_with_retry(self):
        """Connect with automatic retry logic."""
        while self.retry_count < self.max_retries:
            try:
                self.ws = yf.WebSocket(verbose=True)
                self.ws.subscribe(self.symbols)
                print(f"✅ Connected successfully (attempt {self.retry_count + 1})")
                
                # Reset retry count on successful connection
                self.retry_count = 0
                
                # Start listening
                self.ws.listen(message_handler=self.handle_with_error_recovery)
                
            except Exception as e:
                self.retry_count += 1
                wait_time = min(2 ** self.retry_count, 60)  # Exponential backoff
                
                print(f"❌ Connection failed (attempt {self.retry_count}): {e}")
                print(f"⏳ Retrying in {wait_time} seconds...")
                
                time.sleep(wait_time)
        
        print(f"🚫 Max retries ({self.max_retries}) exceeded. Giving up.")
    
    def handle_with_error_recovery(self, message):
        """Message handler with error recovery."""
        try:
            self.message_handler(message)
        except Exception as e:
            print(f"⚠️  Error processing message: {e}")
            # Continue processing other messages
    
    def close(self):
        """Safely close connection."""
        if self.ws:
            try:
                self.ws.close()
                print("🔌 Connection closed successfully")
            except Exception as e:
                print(f"⚠️  Error closing connection: {e}")

# Usage
def safe_message_handler(message):
    symbol = message.get('symbol', 'Unknown')
    price = message.get('price', 0)
    print(f"{symbol}: ${price}")

client = RobustWebSocketClient(
    symbols=["AAPL", "GOOGL", "MSFT"],
    message_handler=safe_message_handler,
    max_retries=3
)

try:
    client.connect_with_retry()
except KeyboardInterrupt:
    print("\n🛑 Stopping...")
    client.close()

Performance Considerations

Memory Management

from collections import deque
import threading

class EfficientStreamProcessor:
    def __init__(self, buffer_size=1000):
        self.buffer = deque(maxlen=buffer_size)  # Fixed-size buffer
        self.lock = threading.Lock()
    
    def process_message(self, message):
        """Process message with memory-efficient buffering."""
        with self.lock:
            # Add to buffer (automatically removes oldest when full)
            self.buffer.append({
                'symbol': message.get('symbol'),
                'price': message.get('price'),
                'timestamp': message.get('timestamp')
            })
        
        # Process latest message
        self.handle_latest_update(message)
    
    def handle_latest_update(self, message):
        """Handle the latest update efficiently."""
        symbol = message.get('symbol', 'Unknown')
        price = message.get('price', 0)
        print(f"{symbol}: ${price}")
    
    def get_recent_data(self, count=10):
        """Get recent data from buffer."""
        with self.lock:
            return list(self.buffer)[-count:]

# Usage
processor = EfficientStreamProcessor(buffer_size=500)
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL"])
ws.listen(message_handler=processor.process_message)

Batch Processing

import asyncio
from datetime import datetime, timedelta

class BatchStreamProcessor:
    def __init__(self, batch_size=10, batch_timeout=5):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.message_batch = []
        self.last_batch_time = datetime.now()
    
    async def handle_message(self, message):
        """Add message to batch and process when ready."""
        self.message_batch.append(message)
        
        # Process batch if size limit reached or timeout exceeded
        if (len(self.message_batch) >= self.batch_size or 
            datetime.now() - self.last_batch_time >= timedelta(seconds=self.batch_timeout)):
            await self.process_batch()
    
    async def process_batch(self):
        """Process accumulated messages in batch."""
        if not self.message_batch:
            return
        
        # Simulate batch processing (database write, API call, etc.)
        print(f"📦 Processing batch of {len(self.message_batch)} messages")
        
        # Process each message in the batch
        for message in self.message_batch:
            symbol = message.get('symbol', 'Unknown')
            price = message.get('price', 0)
            # Batch processing logic here
        
        # Clear batch and reset timer
        self.message_batch.clear()
        self.last_batch_time = datetime.now()

# Usage
async def main():
    processor = BatchStreamProcessor(batch_size=5, batch_timeout=3)
    
    async with yf.AsyncWebSocket() as ws:
        await ws.subscribe(["AAPL", "GOOGL", "MSFT"])
        await ws.listen(message_handler=processor.handle_message)

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-yfinance

docs

bulk-data.md

config-utils.md

index.md

live-streaming.md

market-sector.md

screening.md

search-lookup.md

ticker-data.md

tile.json