Download market data from Yahoo! Finance API
Global configuration options, debugging utilities, and cache management for optimal performance and troubleshooting. These utilities help optimize yfinance behavior and diagnose issues in production environments.
Configure global yfinance settings including proxy configuration and data source preferences.
def set_config(proxy=None):
"""
Configure global yfinance settings.
Parameters:
- proxy: str or dict, proxy configuration for HTTP requests
Can be a string URL or dictionary with protocol-specific proxies
Examples:
- proxy="http://proxy.company.com:8080"
- proxy={"http": "http://proxy.com:8080", "https": "https://proxy.com:8080"}
"""import yfinance as yf
# Configure HTTP proxy
yf.set_config(proxy="http://proxy.company.com:8080")
# Configure protocol-specific proxies
yf.set_config(proxy={
"http": "http://proxy.company.com:8080",
"https": "https://secure-proxy.company.com:8443"
})
# Configure SOCKS proxy
yf.set_config(proxy="socks5://proxy.company.com:1080")
# Remove proxy configuration
yf.set_config(proxy=None)# Corporate environment setup
def setup_corporate_environment():
"""Configure yfinance for corporate network environments."""
import os
# Check for corporate proxy environment variables
http_proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy')
https_proxy = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy')
if http_proxy or https_proxy:
proxy_config = {}
if http_proxy:
proxy_config['http'] = http_proxy
if https_proxy:
proxy_config['https'] = https_proxy
yf.set_config(proxy=proxy_config)
print(f"Configured proxy: {proxy_config}")
# Enable debug mode for troubleshooting
yf.enable_debug_mode()
# Usage
setup_corporate_environment()Enable comprehensive debug logging for troubleshooting network issues, API responses, and data processing problems.
def enable_debug_mode():
"""
Enable debug logging for yfinance operations.
This enables detailed logging of:
- HTTP requests and responses
- API endpoint calls
- Data processing steps
- Error conditions and stack traces
- Performance timing information
"""import yfinance as yf
# Enable debug mode
yf.enable_debug_mode()
# Now all yfinance operations will produce detailed logging
ticker = yf.Ticker("AAPL")
data = ticker.history(period="1mo")
# Debug output will show:
# - URL being called
# - HTTP status codes
# - Response headers
# - Data parsing steps
# - Any errors encountereddef troubleshoot_data_issues(symbol):
"""Troubleshoot data retrieval issues with debug logging."""
print(f"Troubleshooting data issues for {symbol}")
# Enable debug mode
yf.enable_debug_mode()
try:
ticker = yf.Ticker(symbol)
# Test different data sources
print("\n=== Testing Basic Info ===")
info = ticker.info
print(f"Info keys available: {len(info.keys()) if info else 0}")
print("\n=== Testing Historical Data ===")
history = ticker.history(period="5d")
print(f"History shape: {history.shape if not history.empty else 'Empty'}")
print("\n=== Testing Financial Data ===")
financials = ticker.income_stmt
print(f"Financials available: {not financials.empty}")
print("\n=== Testing Options Data ===")
options = ticker.options
print(f"Options expirations: {len(options) if options else 0}")
except Exception as e:
print(f"Error during troubleshooting: {e}")
import traceback
traceback.print_exc()
# Usage
troubleshoot_data_issues("AAPL")Configure and manage yfinance's internal caching system for improved performance and reduced API calls.
def set_tz_cache_location(cache_dir: str):
"""
Set custom location for timezone cache data.
Parameters:
- cache_dir: str, directory path for storing cache files
Directory will be created if it doesn't exist
"""import yfinance as yf
import os
# Set custom cache location
custom_cache_dir = os.path.expanduser("~/yfinance_cache")
yf.set_tz_cache_location(custom_cache_dir)
# Now timezone data will be cached in the custom directory
# This is useful for:
# - Persistent caching across application restarts
# - Shared cache in multi-user environments
# - Custom cache management policiesimport os
import shutil
from pathlib import Path
class YFinanceCacheManager:
"""Advanced cache management for yfinance."""
def __init__(self, base_cache_dir=None):
self.base_cache_dir = base_cache_dir or os.path.expanduser("~/.yfinance_cache")
self.setup_cache_structure()
def setup_cache_structure(self):
"""Create organized cache directory structure."""
cache_dirs = [
os.path.join(self.base_cache_dir, "timezone"),
os.path.join(self.base_cache_dir, "session_data"),
os.path.join(self.base_cache_dir, "temp")
]
for cache_dir in cache_dirs:
os.makedirs(cache_dir, exist_ok=True)
# Configure yfinance to use our timezone cache
tz_cache_dir = os.path.join(self.base_cache_dir, "timezone")
yf.set_tz_cache_location(tz_cache_dir)
def clear_cache(self, cache_type="all"):
"""Clear cache data."""
if cache_type == "all":
if os.path.exists(self.base_cache_dir):
shutil.rmtree(self.base_cache_dir)
self.setup_cache_structure()
print("All cache data cleared")
elif cache_type == "timezone":
tz_cache = os.path.join(self.base_cache_dir, "timezone")
if os.path.exists(tz_cache):
shutil.rmtree(tz_cache)
os.makedirs(tz_cache, exist_ok=True)
print("Timezone cache cleared")
def get_cache_size(self):
"""Get cache directory size in MB."""
total_size = 0
for dirpath, dirnames, filenames in os.walk(self.base_cache_dir):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
total_size += os.path.getsize(filepath)
return total_size / (1024 * 1024) # Convert to MB
def cache_info(self):
"""Get cache information."""
info = {
'cache_dir': self.base_cache_dir,
'exists': os.path.exists(self.base_cache_dir),
'size_mb': self.get_cache_size() if os.path.exists(self.base_cache_dir) else 0
}
# Count files in each subdirectory
for subdir in ['timezone', 'session_data', 'temp']:
subdir_path = os.path.join(self.base_cache_dir, subdir)
if os.path.exists(subdir_path):
file_count = len([f for f in os.listdir(subdir_path)
if os.path.isfile(os.path.join(subdir_path, f))])
info[f'{subdir}_files'] = file_count
return info
# Usage
cache_manager = YFinanceCacheManager("/tmp/yfinance_cache")
# View cache information
cache_info = cache_manager.cache_info()
print(f"Cache location: {cache_info['cache_dir']}")
print(f"Cache size: {cache_info['size_mb']:.2f} MB")
# Clear specific cache type
cache_manager.clear_cache("timezone")
# Clear all cache
cache_manager.clear_cache("all")Optimize performance through proper session management and connection pooling.
import requests
import yfinance as yf
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_optimized_session():
"""Create an optimized requests session for yfinance."""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3, # Total number of retries
backoff_factor=1, # Wait time between retries
status_forcelist=[429, 500, 502, 503, 504], # HTTP status codes to retry
method_whitelist=["HEAD", "GET", "OPTIONS"]
)
# Configure adapter with retry strategy
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=20, # Number of connection pools
pool_maxsize=20, # Connections per pool
pool_block=False # Don't block when pool is full
)
# Mount adapter for both HTTP and HTTPS
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set timeout
session.timeout = 30
# Set user agent
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
return session
# Usage with yfinance
optimized_session = create_optimized_session()
# Use session with Ticker objects
ticker = yf.Ticker("AAPL", session=optimized_session)
data = ticker.history(period="1mo")
# Use session with download function
bulk_data = yf.download(["AAPL", "GOOGL", "MSFT"],
period="1mo",
session=optimized_session)Utilities for efficient batch processing of multiple tickers and operations.
def batch_process_tickers(ticker_symbols, operations, batch_size=10,
session=None, error_handling='continue'):
"""
Process multiple tickers in batches with error handling.
Parameters:
- ticker_symbols: list, ticker symbols to process
- operations: list, operations to perform on each ticker
- batch_size: int, number of tickers to process simultaneously
- session: requests.Session, optional session for efficiency
- error_handling: str, 'continue', 'stop', or 'collect'
Returns:
dict with results and any errors encountered
"""
results = {}
errors = {}
# Process in batches
for i in range(0, len(ticker_symbols), batch_size):
batch = ticker_symbols[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {batch}")
for symbol in batch:
try:
ticker = yf.Ticker(symbol, session=session)
ticker_results = {}
# Perform requested operations
for operation in operations:
if operation == 'info':
ticker_results['info'] = ticker.info
elif operation == 'history':
ticker_results['history'] = ticker.history(period="1mo")
elif operation == 'financials':
ticker_results['financials'] = ticker.income_stmt
elif operation == 'recommendations':
ticker_results['recommendations'] = ticker.recommendations
# Add more operations as needed
results[symbol] = ticker_results
except Exception as e:
error_msg = f"Error processing {symbol}: {str(e)}"
errors[symbol] = error_msg
if error_handling == 'stop':
raise Exception(error_msg)
elif error_handling == 'continue':
print(f"Warning: {error_msg}")
# 'collect' mode just collects errors silently
return {
'results': results,
'errors': errors,
'success_count': len(results),
'error_count': len(errors)
}
# Usage
tickers = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
operations = ['info', 'history', 'recommendations']
batch_results = batch_process_tickers(
tickers,
operations,
batch_size=3,
session=create_optimized_session(),
error_handling='continue'
)
print(f"Successfully processed: {batch_results['success_count']} tickers")
print(f"Errors encountered: {batch_results['error_count']} tickers")Implement rate limiting to avoid API restrictions and ensure reliable data access.
import time
from functools import wraps
from collections import deque
from threading import Lock
class RateLimiter:
"""Rate limiter for yfinance API calls."""
def __init__(self, max_calls=100, time_window=60):
self.max_calls = max_calls
self.time_window = time_window
self.calls = deque()
self.lock = Lock()
def wait_if_needed(self):
"""Wait if rate limit would be exceeded."""
with self.lock:
now = time.time()
# Remove old calls outside time window
while self.calls and self.calls[0] <= now - self.time_window:
self.calls.popleft()
# Check if we need to wait
if len(self.calls) >= self.max_calls:
sleep_time = self.calls[0] + self.time_window - now
if sleep_time > 0:
print(f"Rate limit reached. Waiting {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
# Clean up old calls after waiting
while self.calls and self.calls[0] <= time.time() - self.time_window:
self.calls.popleft()
# Record this call
self.calls.append(now)
def rate_limited_ticker_operation(rate_limiter):
"""Decorator to apply rate limiting to ticker operations."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
rate_limiter.wait_if_needed()
return func(*args, **kwargs)
return wrapper
return decorator
# Usage
api_limiter = RateLimiter(max_calls=50, time_window=60) # 50 calls per minute
@rate_limited_ticker_operation(api_limiter)
def get_ticker_data(symbol, data_type='history'):
"""Get ticker data with rate limiting."""
ticker = yf.Ticker(symbol)
if data_type == 'history':
return ticker.history(period="1mo")
elif data_type == 'info':
return ticker.info
elif data_type == 'financials':
return ticker.income_stmt
return None
# Process many tickers with automatic rate limiting
tickers = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "ORCL"]
results = {}
for symbol in tickers:
print(f"Processing {symbol}...")
results[symbol] = get_ticker_data(symbol, 'history')
print(f"Completed {symbol}")
print(f"Processed {len(results)} tickers with rate limiting")Set up comprehensive logging for yfinance operations in production environments.
import logging
import sys
from datetime import datetime
def setup_yfinance_logging(log_level=logging.INFO, log_file=None):
"""Set up comprehensive logging for yfinance operations."""
# Create logger
logger = logging.getLogger('yfinance_app')
logger.setLevel(log_level)
# Remove existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler (optional)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# Usage
logger = setup_yfinance_logging(
log_level=logging.INFO,
log_file=f"yfinance_{datetime.now().strftime('%Y%m%d')}.log"
)
# Log yfinance operations
def logged_ticker_operation(symbol, operation):
"""Perform ticker operation with logging."""
logger.info(f"Starting {operation} for {symbol}")
start_time = time.time()
try:
ticker = yf.Ticker(symbol)
if operation == 'history':
result = ticker.history(period="1mo")
logger.info(f"Retrieved {len(result)} days of history for {symbol}")
elif operation == 'info':
result = ticker.info
logger.info(f"Retrieved info for {symbol}: {result.get('shortName', 'N/A')}")
elapsed_time = time.time() - start_time
logger.info(f"Completed {operation} for {symbol} in {elapsed_time:.2f}s")
return result
except Exception as e:
logger.error(f"Error in {operation} for {symbol}: {str(e)}")
raise
# Usage
data = logged_ticker_operation("AAPL", "history")
info = logged_ticker_operation("GOOGL", "info")Monitor and optimize yfinance performance in production applications.
import time
import psutil
import threading
from collections import defaultdict
class YFinancePerformanceMonitor:
"""Monitor performance metrics for yfinance operations."""
def __init__(self):
self.metrics = defaultdict(list)
self.active_operations = {}
self.lock = threading.Lock()
def start_operation(self, operation_name, symbol=None):
"""Start monitoring an operation."""
operation_id = f"{operation_name}_{symbol}_{int(time.time() * 1000)}"
with self.lock:
self.active_operations[operation_id] = {
'name': operation_name,
'symbol': symbol,
'start_time': time.time(),
'start_memory': psutil.Process().memory_info().rss / 1024 / 1024 # MB
}
return operation_id
def end_operation(self, operation_id, success=True, error_msg=None):
"""End monitoring an operation and record metrics."""
with self.lock:
if operation_id not in self.active_operations:
return
operation = self.active_operations.pop(operation_id)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
metrics = {
'name': operation['name'],
'symbol': operation['symbol'],
'duration': end_time - operation['start_time'],
'memory_delta': end_memory - operation['start_memory'],
'success': success,
'error': error_msg,
'timestamp': end_time
}
self.metrics[operation['name']].append(metrics)
def get_performance_summary(self, operation_name=None):
"""Get performance summary for operations."""
if operation_name:
data = self.metrics.get(operation_name, [])
else:
data = []
for op_metrics in self.metrics.values():
data.extend(op_metrics)
if not data:
return {}
successful_ops = [m for m in data if m['success']]
failed_ops = [m for m in data if not m['success']]
if successful_ops:
durations = [m['duration'] for m in successful_ops]
memory_deltas = [m['memory_delta'] for m in successful_ops]
summary = {
'total_operations': len(data),
'successful_operations': len(successful_ops),
'failed_operations': len(failed_ops),
'success_rate': len(successful_ops) / len(data) * 100,
'avg_duration': sum(durations) / len(durations),
'max_duration': max(durations),
'min_duration': min(durations),
'avg_memory_delta': sum(memory_deltas) / len(memory_deltas),
'max_memory_delta': max(memory_deltas)
}
else:
summary = {
'total_operations': len(data),
'successful_operations': 0,
'failed_operations': len(failed_ops),
'success_rate': 0
}
return summary
# Usage
monitor = YFinancePerformanceMonitor()
def monitored_ticker_operation(symbol, operation):
"""Perform ticker operation with performance monitoring."""
op_id = monitor.start_operation(operation, symbol)
try:
ticker = yf.Ticker(symbol)
if operation == 'history':
result = ticker.history(period="1mo")
elif operation == 'info':
result = ticker.info
monitor.end_operation(op_id, success=True)
return result
except Exception as e:
monitor.end_operation(op_id, success=False, error_msg=str(e))
raise
# Run monitored operations
symbols = ["AAPL", "GOOGL", "MSFT"]
for symbol in symbols:
try:
data = monitored_ticker_operation(symbol, "history")
info = monitored_ticker_operation(symbol, "info")
except Exception as e:
print(f"Error processing {symbol}: {e}")
# Get performance summary
history_perf = monitor.get_performance_summary("history")
print(f"History operations: {history_perf['success_rate']:.1f}% success rate, "
f"avg duration: {history_perf['avg_duration']:.2f}s")
all_perf = monitor.get_performance_summary()
print(f"All operations: {all_perf['total_operations']} total, "
f"{all_perf['success_rate']:.1f}% success rate")import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class YFinanceConfig:
"""Configuration class for yfinance deployment."""
# Network settings
proxy_http: Optional[str] = None
proxy_https: Optional[str] = None
timeout: int = 30
max_retries: int = 3
# Cache settings
cache_dir: Optional[str] = None
enable_cache: bool = True
# Performance settings
max_concurrent_requests: int = 10
rate_limit_calls: int = 100
rate_limit_window: int = 60
# Logging settings
log_level: str = "INFO"
log_file: Optional[str] = None
enable_debug: bool = False
@classmethod
def from_environment(cls):
"""Create configuration from environment variables."""
return cls(
proxy_http=os.getenv('YFINANCE_PROXY_HTTP'),
proxy_https=os.getenv('YFINANCE_PROXY_HTTPS'),
timeout=int(os.getenv('YFINANCE_TIMEOUT', '30')),
max_retries=int(os.getenv('YFINANCE_MAX_RETRIES', '3')),
cache_dir=os.getenv('YFINANCE_CACHE_DIR'),
enable_cache=os.getenv('YFINANCE_ENABLE_CACHE', 'true').lower() == 'true',
max_concurrent_requests=int(os.getenv('YFINANCE_MAX_CONCURRENT', '10')),
rate_limit_calls=int(os.getenv('YFINANCE_RATE_LIMIT_CALLS', '100')),
rate_limit_window=int(os.getenv('YFINANCE_RATE_LIMIT_WINDOW', '60')),
log_level=os.getenv('YFINANCE_LOG_LEVEL', 'INFO'),
log_file=os.getenv('YFINANCE_LOG_FILE'),
enable_debug=os.getenv('YFINANCE_DEBUG', 'false').lower() == 'true'
)
def apply_configuration(self):
"""Apply configuration to yfinance."""
# Configure proxy
if self.proxy_http or self.proxy_https:
proxy_config = {}
if self.proxy_http:
proxy_config['http'] = self.proxy_http
if self.proxy_https:
proxy_config['https'] = self.proxy_https
yf.set_config(proxy=proxy_config)
# Configure cache
if self.enable_cache and self.cache_dir:
yf.set_tz_cache_location(self.cache_dir)
# Configure debug mode
if self.enable_debug:
yf.enable_debug_mode()
# Usage
config = YFinanceConfig.from_environment()
config.apply_configuration()
print(f"yfinance configured with:")
print(f" Cache: {config.cache_dir if config.enable_cache else 'Disabled'}")
print(f" Proxy: {config.proxy_http or config.proxy_https or 'None'}")
print(f" Debug: {config.enable_debug}")Install with Tessl CLI
npx tessl i tessl/pypi-yfinance