Download market data from Yahoo! Finance API
Live financial data streaming using WebSocket connections with both synchronous and asynchronous support. Enable real-time price updates, volume changes, and market data streaming for active trading and monitoring applications.
Real-time data streaming using synchronous WebSocket connections for traditional applications.
class WebSocket:
def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2",
verbose: bool = True):
"""
Create a WebSocket connection for real-time data streaming.
Parameters:
- url: str, WebSocket server URL (default: Yahoo Finance streaming endpoint)
- verbose: bool, enable verbose logging of connection events
"""
def subscribe(self, symbols: Union[str, List[str]]):
"""
Subscribe to real-time data for specified symbols.
Parameters:
- symbols: str or list, ticker symbols to subscribe to
"""
def unsubscribe(self, symbols: Union[str, List[str]]):
"""
Unsubscribe from real-time data for specified symbols.
Parameters:
- symbols: str or list, ticker symbols to unsubscribe from
"""
def listen(self, message_handler: Optional[Callable] = None):
"""
Start listening for real-time data messages.
Parameters:
- message_handler: function to handle incoming messages
If None, messages are printed to console
"""
def close(self):
"""
Close the WebSocket connection.
"""# WebSocket can be used as a context manager
with WebSocket(verbose=True) as ws:
ws.subscribe(["AAPL", "GOOGL"])
ws.listen(message_handler=custom_handler)
# Connection automatically closed when exiting contextimport yfinance as yf
# Basic real-time streaming
def handle_price_update(message):
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
print(f"{symbol}: ${price}")
ws = yf.WebSocket(verbose=True)
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
ws.listen(message_handler=handle_price_update)
# Using context manager
with yf.WebSocket() as ws:
ws.subscribe("AAPL")
ws.listen() # Uses default console output
# Managing subscriptions
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL"]) # Initial subscription
ws.subscribe("MSFT") # Add more symbols
ws.unsubscribe("GOOGL") # Remove specific symbol
ws.listen(handle_price_update)Real-time data streaming using asynchronous WebSocket connections for modern async applications.
class AsyncWebSocket:
def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2",
verbose: bool = True):
"""
Create an async WebSocket connection for real-time data streaming.
Parameters:
- url: str, WebSocket server URL
- verbose: bool, enable verbose logging
"""
async def subscribe(self, symbols: Union[str, List[str]]):
"""
Asynchronously subscribe to real-time data.
Parameters:
- symbols: str or list, ticker symbols to subscribe to
"""
async def unsubscribe(self, symbols: Union[str, List[str]]):
"""
Asynchronously unsubscribe from real-time data.
Parameters:
- symbols: str or list, ticker symbols to unsubscribe from
"""
async def listen(self, message_handler: Optional[Callable] = None):
"""
Asynchronously listen for real-time data messages.
Parameters:
- message_handler: async function to handle incoming messages
"""
async def close(self):
"""
Asynchronously close the WebSocket connection.
"""# AsyncWebSocket can be used as an async context manager
async with AsyncWebSocket(verbose=True) as ws:
await ws.subscribe(["AAPL", "GOOGL"])
await ws.listen(message_handler=async_handler)
# Connection automatically closed when exiting contextimport asyncio
import yfinance as yf
# Basic async streaming
async def async_price_handler(message):
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
print(f"Async: {symbol} -> ${price}")
async def main():
ws = yf.AsyncWebSocket(verbose=True)
await ws.subscribe(["AAPL", "GOOGL", "MSFT"])
await ws.listen(message_handler=async_price_handler)
# Run the async function
asyncio.run(main())
# Using async context manager
async def stream_with_context():
async with yf.AsyncWebSocket() as ws:
await ws.subscribe("AAPL")
await ws.listen()
asyncio.run(stream_with_context())Start real-time streaming directly from Ticker and Tickers objects.
# Available on Ticker class
def live(self, message_handler: Callable = None, verbose: bool = True):
"""
Start real-time streaming for this ticker.
Parameters:
- message_handler: function to handle incoming messages
- verbose: bool, enable verbose logging
"""
# Available on Tickers class
def live(self, message_handler: Callable = None, verbose: bool = True):
"""
Start real-time streaming for all tickers in the collection.
Parameters:
- message_handler: function to handle incoming messages
- verbose: bool, enable verbose logging
"""# Single ticker streaming
ticker = yf.Ticker("AAPL")
ticker.live(message_handler=handle_price_update)
# Multiple ticker streaming
portfolio = yf.Tickers(["AAPL", "GOOGL", "MSFT"])
portfolio.live(message_handler=handle_portfolio_update)Real-time messages contain various fields depending on the data type:
# Typical message structure
{
'symbol': 'AAPL',
'price': 175.43,
'change': 2.15,
'changePercent': 1.24,
'volume': 45672100,
'timestamp': 1640995200,
'marketHours': 'REGULAR_MARKET',
'dayHigh': 176.12,
'dayLow': 173.78,
'bid': 175.42,
'ask': 175.44,
'bidSize': 100,
'askSize': 200
}def comprehensive_message_handler(message):
"""Advanced message handler with detailed processing."""
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
change = message.get('change', 0)
volume = message.get('volume', 0)
timestamp = message.get('timestamp', 0)
# Price movement analysis
if change > 0:
direction = "▲"
color_code = "\033[92m" # Green
elif change < 0:
direction = "▼"
color_code = "\033[91m" # Red
else:
direction = "→"
color_code = "\033[93m" # Yellow
# Format timestamp
import datetime
time_str = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
# Display formatted message
print(f"{color_code}{time_str} {symbol} {direction} ${price:.2f} "
f"({change:+.2f}) Vol: {volume:,}\033[0m")
# Usage
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
ws.listen(message_handler=comprehensive_message_handler)import asyncio
from collections import deque
class AsyncMessageProcessor:
def __init__(self):
self.message_queue = deque()
self.processing = False
async def handle_message(self, message):
"""Add message to queue for processing."""
self.message_queue.append(message)
if not self.processing:
await self.process_messages()
async def process_messages(self):
"""Process queued messages asynchronously."""
self.processing = True
while self.message_queue:
message = self.message_queue.popleft()
# Simulate async processing (database write, API call, etc.)
await asyncio.sleep(0.01)
# Process message
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
print(f"Processed: {symbol} @ ${price}")
self.processing = False
# Usage
async def main():
processor = AsyncMessageProcessor()
async with yf.AsyncWebSocket() as ws:
await ws.subscribe(["AAPL", "GOOGL"])
await ws.listen(message_handler=processor.handle_message)
asyncio.run(main())class PortfolioMonitor:
def __init__(self, portfolio_symbols, alerts=None):
self.portfolio = {symbol: {'price': 0, 'change': 0} for symbol in portfolio_symbols}
self.alerts = alerts or {}
self.total_value = 0
def handle_update(self, message):
symbol = message.get('symbol')
price = message.get('price', 0)
change = message.get('change', 0)
if symbol in self.portfolio:
self.portfolio[symbol].update({'price': price, 'change': change})
# Check alerts
if symbol in self.alerts:
self.check_alerts(symbol, price)
# Update portfolio summary
self.update_portfolio_summary()
def check_alerts(self, symbol, price):
alerts = self.alerts[symbol]
if 'stop_loss' in alerts and price <= alerts['stop_loss']:
print(f"🚨 STOP LOSS ALERT: {symbol} @ ${price} (Stop: ${alerts['stop_loss']})")
if 'take_profit' in alerts and price >= alerts['take_profit']:
print(f"🎯 TAKE PROFIT ALERT: {symbol} @ ${price} (Target: ${alerts['take_profit']})")
def update_portfolio_summary(self):
total_change = sum(data['change'] for data in self.portfolio.values())
print(f"Portfolio Update - Total Change: ${total_change:+.2f}")
# Usage
portfolio_symbols = ["AAPL", "GOOGL", "MSFT"]
alerts = {
"AAPL": {"stop_loss": 160.0, "take_profit": 180.0},
"GOOGL": {"stop_loss": 90.0, "take_profit": 110.0}
}
monitor = PortfolioMonitor(portfolio_symbols, alerts)
ws = yf.WebSocket()
ws.subscribe(portfolio_symbols)
ws.listen(message_handler=monitor.handle_update)class MarketScanner:
def __init__(self, scan_criteria):
self.criteria = scan_criteria
self.matches = []
def scan_message(self, message):
symbol = message.get('symbol')
price = message.get('price', 0)
change_percent = message.get('changePercent', 0)
volume = message.get('volume', 0)
# Apply scanning criteria
if self.meets_criteria(message):
match = {
'symbol': symbol,
'price': price,
'change_percent': change_percent,
'volume': volume,
'timestamp': message.get('timestamp', 0)
}
self.matches.append(match)
print(f"🔍 SCANNER MATCH: {symbol} - {change_percent:+.2f}% @ ${price}")
def meets_criteria(self, message):
change_percent = message.get('changePercent', 0)
volume = message.get('volume', 0)
# Example criteria: Large price movement with high volume
return (abs(change_percent) > self.criteria.get('min_change_percent', 5) and
volume > self.criteria.get('min_volume', 1000000))
# Usage
scan_criteria = {
'min_change_percent': 3.0, # 3% minimum price change
'min_volume': 2000000 # 2M minimum volume
}
scanner = MarketScanner(scan_criteria)
watchlist = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA"]
ws = yf.WebSocket()
ws.subscribe(watchlist)
ws.listen(message_handler=scanner.scan_message)import pandas as pd
from datetime import datetime
import sqlite3
class StreamingDataRecorder:
def __init__(self, db_path="streaming_data.db"):
self.db_path = db_path
self.setup_database()
def setup_database(self):
"""Create database table for streaming data."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS streaming_data (
timestamp INTEGER,
symbol TEXT,
price REAL,
change_amount REAL,
change_percent REAL,
volume INTEGER,
bid REAL,
ask REAL
)
''')
conn.commit()
conn.close()
def record_message(self, message):
"""Record streaming message to database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO streaming_data
(timestamp, symbol, price, change_amount, change_percent, volume, bid, ask)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
message.get('timestamp', int(datetime.now().timestamp())),
message.get('symbol', ''),
message.get('price', 0),
message.get('change', 0),
message.get('changePercent', 0),
message.get('volume', 0),
message.get('bid', 0),
message.get('ask', 0)
))
conn.commit()
conn.close()
# Print confirmation
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
print(f"📊 Recorded: {symbol} @ ${price}")
def get_recorded_data(self, symbol=None, hours=1):
"""Retrieve recorded data for analysis."""
conn = sqlite3.connect(self.db_path)
query = '''
SELECT * FROM streaming_data
WHERE timestamp > ?
'''
params = [int(datetime.now().timestamp()) - (hours * 3600)]
if symbol:
query += ' AND symbol = ?'
params.append(symbol)
query += ' ORDER BY timestamp DESC'
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
# Usage
recorder = StreamingDataRecorder()
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
ws.listen(message_handler=recorder.record_message)
# Later, analyze recorded data
recent_data = recorder.get_recorded_data(symbol="AAPL", hours=2)
print(recent_data.describe())import time
import logging
class RobustWebSocketClient:
def __init__(self, symbols, message_handler, max_retries=5):
self.symbols = symbols
self.message_handler = message_handler
self.max_retries = max_retries
self.retry_count = 0
self.ws = None
def connect_with_retry(self):
"""Connect with automatic retry logic."""
while self.retry_count < self.max_retries:
try:
self.ws = yf.WebSocket(verbose=True)
self.ws.subscribe(self.symbols)
print(f"✅ Connected successfully (attempt {self.retry_count + 1})")
# Reset retry count on successful connection
self.retry_count = 0
# Start listening
self.ws.listen(message_handler=self.handle_with_error_recovery)
except Exception as e:
self.retry_count += 1
wait_time = min(2 ** self.retry_count, 60) # Exponential backoff
print(f"❌ Connection failed (attempt {self.retry_count}): {e}")
print(f"⏳ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
print(f"🚫 Max retries ({self.max_retries}) exceeded. Giving up.")
def handle_with_error_recovery(self, message):
"""Message handler with error recovery."""
try:
self.message_handler(message)
except Exception as e:
print(f"⚠️ Error processing message: {e}")
# Continue processing other messages
def close(self):
"""Safely close connection."""
if self.ws:
try:
self.ws.close()
print("🔌 Connection closed successfully")
except Exception as e:
print(f"⚠️ Error closing connection: {e}")
# Usage
def safe_message_handler(message):
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
print(f"{symbol}: ${price}")
client = RobustWebSocketClient(
symbols=["AAPL", "GOOGL", "MSFT"],
message_handler=safe_message_handler,
max_retries=3
)
try:
client.connect_with_retry()
except KeyboardInterrupt:
print("\n🛑 Stopping...")
client.close()from collections import deque
import threading
class EfficientStreamProcessor:
def __init__(self, buffer_size=1000):
self.buffer = deque(maxlen=buffer_size) # Fixed-size buffer
self.lock = threading.Lock()
def process_message(self, message):
"""Process message with memory-efficient buffering."""
with self.lock:
# Add to buffer (automatically removes oldest when full)
self.buffer.append({
'symbol': message.get('symbol'),
'price': message.get('price'),
'timestamp': message.get('timestamp')
})
# Process latest message
self.handle_latest_update(message)
def handle_latest_update(self, message):
"""Handle the latest update efficiently."""
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
print(f"{symbol}: ${price}")
def get_recent_data(self, count=10):
"""Get recent data from buffer."""
with self.lock:
return list(self.buffer)[-count:]
# Usage
processor = EfficientStreamProcessor(buffer_size=500)
ws = yf.WebSocket()
ws.subscribe(["AAPL", "GOOGL"])
ws.listen(message_handler=processor.process_message)import asyncio
from datetime import datetime, timedelta
class BatchStreamProcessor:
def __init__(self, batch_size=10, batch_timeout=5):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.message_batch = []
self.last_batch_time = datetime.now()
async def handle_message(self, message):
"""Add message to batch and process when ready."""
self.message_batch.append(message)
# Process batch if size limit reached or timeout exceeded
if (len(self.message_batch) >= self.batch_size or
datetime.now() - self.last_batch_time >= timedelta(seconds=self.batch_timeout)):
await self.process_batch()
async def process_batch(self):
"""Process accumulated messages in batch."""
if not self.message_batch:
return
# Simulate batch processing (database write, API call, etc.)
print(f"📦 Processing batch of {len(self.message_batch)} messages")
# Process each message in the batch
for message in self.message_batch:
symbol = message.get('symbol', 'Unknown')
price = message.get('price', 0)
# Batch processing logic here
# Clear batch and reset timer
self.message_batch.clear()
self.last_batch_time = datetime.now()
# Usage
async def main():
processor = BatchStreamProcessor(batch_size=5, batch_timeout=3)
async with yf.AsyncWebSocket() as ws:
await ws.subscribe(["AAPL", "GOOGL", "MSFT"])
await ws.listen(message_handler=processor.handle_message)
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-yfinance