CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-yfinance

Download market data from Yahoo! Finance API

Overview
Eval results
Files

config-utils.mddocs/

Configuration and Utilities

Global configuration options, debugging utilities, and cache management for optimal performance and troubleshooting. These utilities help optimize yfinance behavior and diagnose issues in production environments.

Capabilities

Global Configuration

Configure global yfinance settings including proxy configuration and data source preferences.

def set_config(proxy=None):
    """
    Configure global yfinance settings.
    
    Parameters:
    - proxy: str or dict, proxy configuration for HTTP requests
      Can be a string URL or dictionary with protocol-specific proxies
      
    Examples:
    - proxy="http://proxy.company.com:8080"
    - proxy={"http": "http://proxy.com:8080", "https": "https://proxy.com:8080"}
    """

Usage Examples

import yfinance as yf

# Configure HTTP proxy
yf.set_config(proxy="http://proxy.company.com:8080")

# Configure protocol-specific proxies
yf.set_config(proxy={
    "http": "http://proxy.company.com:8080",
    "https": "https://secure-proxy.company.com:8443"
})

# Configure SOCKS proxy
yf.set_config(proxy="socks5://proxy.company.com:1080")

# Remove proxy configuration
yf.set_config(proxy=None)

Configuration Best Practices

# Corporate environment setup
def setup_corporate_environment():
    """Configure yfinance for corporate network environments."""
    
    import os
    
    # Check for corporate proxy environment variables
    http_proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
    https_proxy = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy')
    
    if http_proxy or https_proxy:
        proxy_config = {}
        if http_proxy:
            proxy_config['http'] = http_proxy
        if https_proxy:
            proxy_config['https'] = https_proxy
        
        yf.set_config(proxy=proxy_config)
        print(f"Configured proxy: {proxy_config}")
    
    # Enable debug mode for troubleshooting
    yf.enable_debug_mode()

# Usage
setup_corporate_environment()

Debug Mode

Enable comprehensive debug logging for troubleshooting network issues, API responses, and data processing problems.

def enable_debug_mode():
    """
    Enable debug logging for yfinance operations.
    
    This enables detailed logging of:
    - HTTP requests and responses
    - API endpoint calls
    - Data processing steps  
    - Error conditions and stack traces
    - Performance timing information
    """

Usage Examples

import yfinance as yf

# Enable debug mode
yf.enable_debug_mode()

# Now all yfinance operations will produce detailed logging
ticker = yf.Ticker("AAPL")
data = ticker.history(period="1mo")

# Debug output will show:
# - URL being called
# - HTTP status codes
# - Response headers
# - Data parsing steps
# - Any errors encountered

Debug Mode Applications

def troubleshoot_data_issues(symbol):
    """Troubleshoot data retrieval issues with debug logging."""
    
    print(f"Troubleshooting data issues for {symbol}")
    
    # Enable debug mode
    yf.enable_debug_mode()
    
    try:
        ticker = yf.Ticker(symbol)
        
        # Test different data sources
        print("\n=== Testing Basic Info ===")
        info = ticker.info
        print(f"Info keys available: {len(info.keys()) if info else 0}")
        
        print("\n=== Testing Historical Data ===")
        history = ticker.history(period="5d")
        print(f"History shape: {history.shape if not history.empty else 'Empty'}")
        
        print("\n=== Testing Financial Data ===")
        financials = ticker.income_stmt
        print(f"Financials available: {not financials.empty}")
        
        print("\n=== Testing Options Data ===")
        options = ticker.options
        print(f"Options expirations: {len(options) if options else 0}")
        
    except Exception as e:
        print(f"Error during troubleshooting: {e}")
        import traceback
        traceback.print_exc()

# Usage
troubleshoot_data_issues("AAPL")

Cache Management

Configure and manage yfinance's internal caching system for improved performance and reduced API calls.

def set_tz_cache_location(cache_dir: str):
    """
    Set custom location for timezone cache data.
    
    Parameters:
    - cache_dir: str, directory path for storing cache files
      Directory will be created if it doesn't exist
    """

Usage Examples

import yfinance as yf
import os

# Set custom cache location
custom_cache_dir = os.path.expanduser("~/yfinance_cache")
yf.set_tz_cache_location(custom_cache_dir)

# Now timezone data will be cached in the custom directory
# This is useful for:
# - Persistent caching across application restarts
# - Shared cache in multi-user environments
# - Custom cache management policies

Advanced Cache Management

import os
import shutil
from pathlib import Path

class YFinanceCacheManager:
    """Advanced cache management for yfinance."""
    
    def __init__(self, base_cache_dir=None):
        self.base_cache_dir = base_cache_dir or os.path.expanduser("~/.yfinance_cache")
        self.setup_cache_structure()
    
    def setup_cache_structure(self):
        """Create organized cache directory structure."""
        cache_dirs = [
            os.path.join(self.base_cache_dir, "timezone"),
            os.path.join(self.base_cache_dir, "session_data"),
            os.path.join(self.base_cache_dir, "temp")
        ]
        
        for cache_dir in cache_dirs:
            os.makedirs(cache_dir, exist_ok=True)
        
        # Configure yfinance to use our timezone cache
        tz_cache_dir = os.path.join(self.base_cache_dir, "timezone")
        yf.set_tz_cache_location(tz_cache_dir)
    
    def clear_cache(self, cache_type="all"):
        """Clear cache data."""
        
        if cache_type == "all":
            if os.path.exists(self.base_cache_dir):
                shutil.rmtree(self.base_cache_dir)
                self.setup_cache_structure()
                print("All cache data cleared")
        
        elif cache_type == "timezone":
            tz_cache = os.path.join(self.base_cache_dir, "timezone")
            if os.path.exists(tz_cache):
                shutil.rmtree(tz_cache)
                os.makedirs(tz_cache, exist_ok=True)
                print("Timezone cache cleared")
    
    def get_cache_size(self):
        """Get cache directory size in MB."""
        
        total_size = 0
        for dirpath, dirnames, filenames in os.walk(self.base_cache_dir):
            for filename in filenames:
                filepath = os.path.join(dirpath, filename)
                total_size += os.path.getsize(filepath)
        
        return total_size / (1024 * 1024)  # Convert to MB
    
    def cache_info(self):
        """Get cache information."""
        
        info = {
            'cache_dir': self.base_cache_dir,
            'exists': os.path.exists(self.base_cache_dir),
            'size_mb': self.get_cache_size() if os.path.exists(self.base_cache_dir) else 0
        }
        
        # Count files in each subdirectory
        for subdir in ['timezone', 'session_data', 'temp']:
            subdir_path = os.path.join(self.base_cache_dir, subdir)
            if os.path.exists(subdir_path):
                file_count = len([f for f in os.listdir(subdir_path) 
                                if os.path.isfile(os.path.join(subdir_path, f))])
                info[f'{subdir}_files'] = file_count
        
        return info

# Usage
cache_manager = YFinanceCacheManager("/tmp/yfinance_cache")

# View cache information
cache_info = cache_manager.cache_info()
print(f"Cache location: {cache_info['cache_dir']}")
print(f"Cache size: {cache_info['size_mb']:.2f} MB")

# Clear specific cache type
cache_manager.clear_cache("timezone")

# Clear all cache
cache_manager.clear_cache("all")

Performance Optimization

Session Management

Optimize performance through proper session management and connection pooling.

import requests
import yfinance as yf
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

def create_optimized_session():
    """Create an optimized requests session for yfinance."""
    
    session = requests.Session()
    
    # Configure retry strategy
    retry_strategy = Retry(
        total=3,                    # Total number of retries
        backoff_factor=1,           # Wait time between retries
        status_forcelist=[429, 500, 502, 503, 504],  # HTTP status codes to retry
        method_whitelist=["HEAD", "GET", "OPTIONS"]
    )
    
    # Configure adapter with retry strategy
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=20,        # Number of connection pools
        pool_maxsize=20,           # Connections per pool
        pool_block=False           # Don't block when pool is full
    )
    
    # Mount adapter for both HTTP and HTTPS
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    # Set timeout
    session.timeout = 30
    
    # Set user agent
    session.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    })
    
    return session

# Usage with yfinance
optimized_session = create_optimized_session()

# Use session with Ticker objects
ticker = yf.Ticker("AAPL", session=optimized_session)
data = ticker.history(period="1mo")

# Use session with download function
bulk_data = yf.download(["AAPL", "GOOGL", "MSFT"], 
                       period="1mo", 
                       session=optimized_session)

Batch Processing Utilities

Utilities for efficient batch processing of multiple tickers and operations.

def batch_process_tickers(ticker_symbols, operations, batch_size=10, 
                         session=None, error_handling='continue'):
    """
    Process multiple tickers in batches with error handling.
    
    Parameters:
    - ticker_symbols: list, ticker symbols to process
    - operations: list, operations to perform on each ticker
    - batch_size: int, number of tickers to process simultaneously
    - session: requests.Session, optional session for efficiency
    - error_handling: str, 'continue', 'stop', or 'collect'
    
    Returns:
    dict with results and any errors encountered
    """
    
    results = {}
    errors = {}
    
    # Process in batches
    for i in range(0, len(ticker_symbols), batch_size):
        batch = ticker_symbols[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1}: {batch}")
        
        for symbol in batch:
            try:
                ticker = yf.Ticker(symbol, session=session)
                ticker_results = {}
                
                # Perform requested operations
                for operation in operations:
                    if operation == 'info':
                        ticker_results['info'] = ticker.info
                    elif operation == 'history':
                        ticker_results['history'] = ticker.history(period="1mo")
                    elif operation == 'financials':
                        ticker_results['financials'] = ticker.income_stmt
                    elif operation == 'recommendations':
                        ticker_results['recommendations'] = ticker.recommendations
                    # Add more operations as needed
                
                results[symbol] = ticker_results
                
            except Exception as e:
                error_msg = f"Error processing {symbol}: {str(e)}"
                errors[symbol] = error_msg
                
                if error_handling == 'stop':
                    raise Exception(error_msg)
                elif error_handling == 'continue':
                    print(f"Warning: {error_msg}")
                # 'collect' mode just collects errors silently
    
    return {
        'results': results,
        'errors': errors,
        'success_count': len(results),
        'error_count': len(errors)
    }

# Usage
tickers = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
operations = ['info', 'history', 'recommendations']

batch_results = batch_process_tickers(
    tickers, 
    operations, 
    batch_size=3,
    session=create_optimized_session(),
    error_handling='continue'
)

print(f"Successfully processed: {batch_results['success_count']} tickers")
print(f"Errors encountered: {batch_results['error_count']} tickers")

Rate Limiting and Throttling

Implement rate limiting to avoid API restrictions and ensure reliable data access.

import time
from functools import wraps
from collections import deque
from threading import Lock

class RateLimiter:
    """Rate limiter for yfinance API calls."""
    
    def __init__(self, max_calls=100, time_window=60):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()
        self.lock = Lock()
    
    def wait_if_needed(self):
        """Wait if rate limit would be exceeded."""
        
        with self.lock:
            now = time.time()
            
            # Remove old calls outside time window
            while self.calls and self.calls[0] <= now - self.time_window:
                self.calls.popleft()
            
            # Check if we need to wait
            if len(self.calls) >= self.max_calls:
                sleep_time = self.calls[0] + self.time_window - now
                if sleep_time > 0:
                    print(f"Rate limit reached. Waiting {sleep_time:.2f} seconds...")
                    time.sleep(sleep_time)
                    # Clean up old calls after waiting
                    while self.calls and self.calls[0] <= time.time() - self.time_window:
                        self.calls.popleft()
            
            # Record this call
            self.calls.append(now)

def rate_limited_ticker_operation(rate_limiter):
    """Decorator to apply rate limiting to ticker operations."""
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            rate_limiter.wait_if_needed()
            return func(*args, **kwargs)
        return wrapper
    return decorator

# Usage
api_limiter = RateLimiter(max_calls=50, time_window=60)  # 50 calls per minute

@rate_limited_ticker_operation(api_limiter)
def get_ticker_data(symbol, data_type='history'):
    """Get ticker data with rate limiting."""
    
    ticker = yf.Ticker(symbol)
    
    if data_type == 'history':
        return ticker.history(period="1mo")
    elif data_type == 'info':
        return ticker.info
    elif data_type == 'financials':
        return ticker.income_stmt
    
    return None

# Process many tickers with automatic rate limiting
tickers = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "ORCL"]
results = {}

for symbol in tickers:
    print(f"Processing {symbol}...")
    results[symbol] = get_ticker_data(symbol, 'history')
    print(f"Completed {symbol}")

print(f"Processed {len(results)} tickers with rate limiting")

Logging and Monitoring

Custom Logging Setup

Set up comprehensive logging for yfinance operations in production environments.

import logging
import sys
from datetime import datetime

def setup_yfinance_logging(log_level=logging.INFO, log_file=None):
    """Set up comprehensive logging for yfinance operations."""
    
    # Create logger
    logger = logging.getLogger('yfinance_app')
    logger.setLevel(log_level)
    
    # Remove existing handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
    
    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # File handler (optional)
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(log_level)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    
    return logger

# Usage
logger = setup_yfinance_logging(
    log_level=logging.INFO,
    log_file=f"yfinance_{datetime.now().strftime('%Y%m%d')}.log"
)

# Log yfinance operations
def logged_ticker_operation(symbol, operation):
    """Perform ticker operation with logging."""
    
    logger.info(f"Starting {operation} for {symbol}")
    start_time = time.time()
    
    try:
        ticker = yf.Ticker(symbol)
        
        if operation == 'history':
            result = ticker.history(period="1mo")
            logger.info(f"Retrieved {len(result)} days of history for {symbol}")
        elif operation == 'info':
            result = ticker.info
            logger.info(f"Retrieved info for {symbol}: {result.get('shortName', 'N/A')}")
        
        elapsed_time = time.time() - start_time
        logger.info(f"Completed {operation} for {symbol} in {elapsed_time:.2f}s")
        
        return result
        
    except Exception as e:
        logger.error(f"Error in {operation} for {symbol}: {str(e)}")
        raise

# Usage
data = logged_ticker_operation("AAPL", "history")
info = logged_ticker_operation("GOOGL", "info")

Performance Monitoring

Monitor and optimize yfinance performance in production applications.

import time
import psutil
import threading
from collections import defaultdict

class YFinancePerformanceMonitor:
    """Monitor performance metrics for yfinance operations."""
    
    def __init__(self):
        self.metrics = defaultdict(list)
        self.active_operations = {}
        self.lock = threading.Lock()
    
    def start_operation(self, operation_name, symbol=None):
        """Start monitoring an operation."""
        
        operation_id = f"{operation_name}_{symbol}_{int(time.time() * 1000)}"
        
        with self.lock:
            self.active_operations[operation_id] = {
                'name': operation_name,
                'symbol': symbol,
                'start_time': time.time(),
                'start_memory': psutil.Process().memory_info().rss / 1024 / 1024  # MB
            }
        
        return operation_id
    
    def end_operation(self, operation_id, success=True, error_msg=None):
        """End monitoring an operation and record metrics."""
        
        with self.lock:
            if operation_id not in self.active_operations:
                return
            
            operation = self.active_operations.pop(operation_id)
            
            end_time = time.time()
            end_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
            
            metrics = {
                'name': operation['name'],
                'symbol': operation['symbol'],
                'duration': end_time - operation['start_time'],
                'memory_delta': end_memory - operation['start_memory'],
                'success': success,
                'error': error_msg,
                'timestamp': end_time
            }
            
            self.metrics[operation['name']].append(metrics)
    
    def get_performance_summary(self, operation_name=None):
        """Get performance summary for operations."""
        
        if operation_name:
            data = self.metrics.get(operation_name, [])
        else:
            data = []
            for op_metrics in self.metrics.values():
                data.extend(op_metrics)
        
        if not data:
            return {}
        
        successful_ops = [m for m in data if m['success']]
        failed_ops = [m for m in data if not m['success']]
        
        if successful_ops:
            durations = [m['duration'] for m in successful_ops]
            memory_deltas = [m['memory_delta'] for m in successful_ops]
            
            summary = {
                'total_operations': len(data),
                'successful_operations': len(successful_ops),
                'failed_operations': len(failed_ops),
                'success_rate': len(successful_ops) / len(data) * 100,
                'avg_duration': sum(durations) / len(durations),
                'max_duration': max(durations),
                'min_duration': min(durations),
                'avg_memory_delta': sum(memory_deltas) / len(memory_deltas),
                'max_memory_delta': max(memory_deltas)
            }
        else:
            summary = {
                'total_operations': len(data),
                'successful_operations': 0,
                'failed_operations': len(failed_ops),
                'success_rate': 0
            }
        
        return summary

# Usage
monitor = YFinancePerformanceMonitor()

def monitored_ticker_operation(symbol, operation):
    """Perform ticker operation with performance monitoring."""
    
    op_id = monitor.start_operation(operation, symbol)
    
    try:
        ticker = yf.Ticker(symbol)
        
        if operation == 'history':
            result = ticker.history(period="1mo")
        elif operation == 'info':
            result = ticker.info
        
        monitor.end_operation(op_id, success=True)
        return result
        
    except Exception as e:
        monitor.end_operation(op_id, success=False, error_msg=str(e))
        raise

# Run monitored operations
symbols = ["AAPL", "GOOGL", "MSFT"]
for symbol in symbols:
    try:
        data = monitored_ticker_operation(symbol, "history")
        info = monitored_ticker_operation(symbol, "info")
    except Exception as e:
        print(f"Error processing {symbol}: {e}")

# Get performance summary
history_perf = monitor.get_performance_summary("history")
print(f"History operations: {history_perf['success_rate']:.1f}% success rate, "
      f"avg duration: {history_perf['avg_duration']:.2f}s")

all_perf = monitor.get_performance_summary()
print(f"All operations: {all_perf['total_operations']} total, "
      f"{all_perf['success_rate']:.1f}% success rate")

Production Deployment Considerations

Environment Configuration

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class YFinanceConfig:
    """Configuration class for yfinance deployment."""
    
    # Network settings
    proxy_http: Optional[str] = None
    proxy_https: Optional[str] = None
    timeout: int = 30
    max_retries: int = 3
    
    # Cache settings
    cache_dir: Optional[str] = None
    enable_cache: bool = True
    
    # Performance settings
    max_concurrent_requests: int = 10
    rate_limit_calls: int = 100
    rate_limit_window: int = 60
    
    # Logging settings
    log_level: str = "INFO"
    log_file: Optional[str] = None
    enable_debug: bool = False
    
    @classmethod
    def from_environment(cls):
        """Create configuration from environment variables."""
        
        return cls(
            proxy_http=os.getenv('YFINANCE_PROXY_HTTP'),
            proxy_https=os.getenv('YFINANCE_PROXY_HTTPS'),
            timeout=int(os.getenv('YFINANCE_TIMEOUT', '30')),
            max_retries=int(os.getenv('YFINANCE_MAX_RETRIES', '3')),
            cache_dir=os.getenv('YFINANCE_CACHE_DIR'),
            enable_cache=os.getenv('YFINANCE_ENABLE_CACHE', 'true').lower() == 'true',
            max_concurrent_requests=int(os.getenv('YFINANCE_MAX_CONCURRENT', '10')),
            rate_limit_calls=int(os.getenv('YFINANCE_RATE_LIMIT_CALLS', '100')),
            rate_limit_window=int(os.getenv('YFINANCE_RATE_LIMIT_WINDOW', '60')),
            log_level=os.getenv('YFINANCE_LOG_LEVEL', 'INFO'),
            log_file=os.getenv('YFINANCE_LOG_FILE'),
            enable_debug=os.getenv('YFINANCE_DEBUG', 'false').lower() == 'true'
        )
    
    def apply_configuration(self):
        """Apply configuration to yfinance."""
        
        # Configure proxy
        if self.proxy_http or self.proxy_https:
            proxy_config = {}
            if self.proxy_http:
                proxy_config['http'] = self.proxy_http
            if self.proxy_https:
                proxy_config['https'] = self.proxy_https
            yf.set_config(proxy=proxy_config)
        
        # Configure cache
        if self.enable_cache and self.cache_dir:
            yf.set_tz_cache_location(self.cache_dir)
        
        # Configure debug mode
        if self.enable_debug:
            yf.enable_debug_mode()

# Usage
config = YFinanceConfig.from_environment()
config.apply_configuration()

print(f"yfinance configured with:")
print(f"  Cache: {config.cache_dir if config.enable_cache else 'Disabled'}")
print(f"  Proxy: {config.proxy_http or config.proxy_https or 'None'}")
print(f"  Debug: {config.enable_debug}")

Install with Tessl CLI

npx tessl i tessl/pypi-yfinance

docs

bulk-data.md

config-utils.md

index.md

live-streaming.md

market-sector.md

screening.md

search-lookup.md

ticker-data.md

tile.json