0
# Real-time Data Streaming
1
2
Live financial data streaming using WebSocket connections with both synchronous and asynchronous support. Enable real-time price updates, volume changes, and market data streaming for active trading and monitoring applications.
3
4
## Capabilities
5
6
### Synchronous WebSocket Streaming
7
8
Real-time data streaming using synchronous WebSocket connections for traditional applications.
9
10
```python { .api }
11
class WebSocket:
12
def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2",
13
verbose: bool = True):
14
"""
15
Create a WebSocket connection for real-time data streaming.
16
17
Parameters:
18
- url: str, WebSocket server URL (default: Yahoo Finance streaming endpoint)
19
- verbose: bool, enable verbose logging of connection events
20
"""
21
22
def subscribe(self, symbols: Union[str, List[str]]):
23
"""
24
Subscribe to real-time data for specified symbols.
25
26
Parameters:
27
- symbols: str or list, ticker symbols to subscribe to
28
"""
29
30
def unsubscribe(self, symbols: Union[str, List[str]]):
31
"""
32
Unsubscribe from real-time data for specified symbols.
33
34
Parameters:
35
- symbols: str or list, ticker symbols to unsubscribe from
36
"""
37
38
def listen(self, message_handler: Optional[Callable] = None):
39
"""
40
Start listening for real-time data messages.
41
42
Parameters:
43
- message_handler: function to handle incoming messages
44
If None, messages are printed to console
45
"""
46
47
def close(self):
48
"""
49
Close the WebSocket connection.
50
"""
51
```
52
53
#### Context Manager Support
54
55
```python { .api }
56
# WebSocket can be used as a context manager
57
with WebSocket(verbose=True) as ws:
58
ws.subscribe(["AAPL", "GOOGL"])
59
ws.listen(message_handler=custom_handler)
60
# Connection automatically closed when exiting context
61
```
62
63
#### Usage Examples
64
65
```python
66
import yfinance as yf
67
68
# Basic real-time streaming
69
def handle_price_update(message):
70
symbol = message.get('symbol', 'Unknown')
71
price = message.get('price', 0)
72
print(f"{symbol}: ${price}")
73
74
ws = yf.WebSocket(verbose=True)
75
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
76
ws.listen(message_handler=handle_price_update)
77
78
# Using context manager
79
with yf.WebSocket() as ws:
80
ws.subscribe("AAPL")
81
ws.listen() # Uses default console output
82
83
# Managing subscriptions
84
ws = yf.WebSocket()
85
ws.subscribe(["AAPL", "GOOGL"]) # Initial subscription
86
ws.subscribe("MSFT") # Add more symbols
87
ws.unsubscribe("GOOGL") # Remove specific symbol
88
ws.listen(handle_price_update)
89
```
90
91
### Asynchronous WebSocket Streaming
92
93
Real-time data streaming using asynchronous WebSocket connections for modern async applications.
94
95
```python { .api }
96
class AsyncWebSocket:
97
def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2",
98
verbose: bool = True):
99
"""
100
Create an async WebSocket connection for real-time data streaming.
101
102
Parameters:
103
- url: str, WebSocket server URL
104
- verbose: bool, enable verbose logging
105
"""
106
107
async def subscribe(self, symbols: Union[str, List[str]]):
108
"""
109
Asynchronously subscribe to real-time data.
110
111
Parameters:
112
- symbols: str or list, ticker symbols to subscribe to
113
"""
114
115
async def unsubscribe(self, symbols: Union[str, List[str]]):
116
"""
117
Asynchronously unsubscribe from real-time data.
118
119
Parameters:
120
- symbols: str or list, ticker symbols to unsubscribe from
121
"""
122
123
async def listen(self, message_handler: Optional[Callable] = None):
124
"""
125
Asynchronously listen for real-time data messages.
126
127
Parameters:
128
- message_handler: async function to handle incoming messages
129
"""
130
131
async def close(self):
132
"""
133
Asynchronously close the WebSocket connection.
134
"""
135
```
136
137
#### Async Context Manager Support
138
139
```python { .api }
140
# AsyncWebSocket can be used as an async context manager
141
async with AsyncWebSocket(verbose=True) as ws:
142
await ws.subscribe(["AAPL", "GOOGL"])
143
await ws.listen(message_handler=async_handler)
144
# Connection automatically closed when exiting context
145
```
146
147
#### Usage Examples
148
149
```python
150
import asyncio
151
import yfinance as yf
152
153
# Basic async streaming
154
async def async_price_handler(message):
155
symbol = message.get('symbol', 'Unknown')
156
price = message.get('price', 0)
157
print(f"Async: {symbol} -> ${price}")
158
159
async def main():
160
ws = yf.AsyncWebSocket(verbose=True)
161
await ws.subscribe(["AAPL", "GOOGL", "MSFT"])
162
await ws.listen(message_handler=async_price_handler)
163
164
# Run the async function
165
asyncio.run(main())
166
167
# Using async context manager
168
async def stream_with_context():
169
async with yf.AsyncWebSocket() as ws:
170
await ws.subscribe("AAPL")
171
await ws.listen()
172
173
asyncio.run(stream_with_context())
174
```
175
176
### Ticker-Level Streaming
177
178
Start real-time streaming directly from Ticker and Tickers objects.
179
180
```python { .api }
181
# Available on Ticker class
182
def live(self, message_handler: Callable = None, verbose: bool = True):
183
"""
184
Start real-time streaming for this ticker.
185
186
Parameters:
187
- message_handler: function to handle incoming messages
188
- verbose: bool, enable verbose logging
189
"""
190
191
# Available on Tickers class
192
def live(self, message_handler: Callable = None, verbose: bool = True):
193
"""
194
Start real-time streaming for all tickers in the collection.
195
196
Parameters:
197
- message_handler: function to handle incoming messages
198
- verbose: bool, enable verbose logging
199
"""
200
```
201
202
#### Usage Examples
203
204
```python
205
# Single ticker streaming
206
ticker = yf.Ticker("AAPL")
207
ticker.live(message_handler=handle_price_update)
208
209
# Multiple ticker streaming
210
portfolio = yf.Tickers(["AAPL", "GOOGL", "MSFT"])
211
portfolio.live(message_handler=handle_portfolio_update)
212
```
213
214
## Message Structure and Handling
215
216
### Message Format
217
218
Real-time messages contain various fields depending on the data type:
219
220
```python
221
# Typical message structure
222
{
223
'symbol': 'AAPL',
224
'price': 175.43,
225
'change': 2.15,
226
'changePercent': 1.24,
227
'volume': 45672100,
228
'timestamp': 1640995200,
229
'marketHours': 'REGULAR_MARKET',
230
'dayHigh': 176.12,
231
'dayLow': 173.78,
232
'bid': 175.42,
233
'ask': 175.44,
234
'bidSize': 100,
235
'askSize': 200
236
}
237
```
238
239
### Advanced Message Handlers
240
241
```python
242
def comprehensive_message_handler(message):
243
"""Advanced message handler with detailed processing."""
244
245
symbol = message.get('symbol', 'Unknown')
246
price = message.get('price', 0)
247
change = message.get('change', 0)
248
volume = message.get('volume', 0)
249
timestamp = message.get('timestamp', 0)
250
251
# Price movement analysis
252
if change > 0:
253
direction = "β²"
254
color_code = "\033[92m" # Green
255
elif change < 0:
256
direction = "βΌ"
257
color_code = "\033[91m" # Red
258
else:
259
direction = "β"
260
color_code = "\033[93m" # Yellow
261
262
# Format timestamp
263
import datetime
264
time_str = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
265
266
# Display formatted message
267
print(f"{color_code}{time_str} {symbol} {direction} ${price:.2f} "
268
f"({change:+.2f}) Vol: {volume:,}\033[0m")
269
270
# Usage
271
ws = yf.WebSocket()
272
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
273
ws.listen(message_handler=comprehensive_message_handler)
274
```
275
276
### Async Message Processing
277
278
```python
279
import asyncio
280
from collections import deque
281
282
class AsyncMessageProcessor:
283
def __init__(self):
284
self.message_queue = deque()
285
self.processing = False
286
287
async def handle_message(self, message):
288
"""Add message to queue for processing."""
289
self.message_queue.append(message)
290
291
if not self.processing:
292
await self.process_messages()
293
294
async def process_messages(self):
295
"""Process queued messages asynchronously."""
296
self.processing = True
297
298
while self.message_queue:
299
message = self.message_queue.popleft()
300
301
# Simulate async processing (database write, API call, etc.)
302
await asyncio.sleep(0.01)
303
304
# Process message
305
symbol = message.get('symbol', 'Unknown')
306
price = message.get('price', 0)
307
print(f"Processed: {symbol} @ ${price}")
308
309
self.processing = False
310
311
# Usage
312
async def main():
313
processor = AsyncMessageProcessor()
314
315
async with yf.AsyncWebSocket() as ws:
316
await ws.subscribe(["AAPL", "GOOGL"])
317
await ws.listen(message_handler=processor.handle_message)
318
319
asyncio.run(main())
320
```
321
322
## Advanced Streaming Patterns
323
324
### Portfolio Monitoring
325
326
```python
327
class PortfolioMonitor:
328
def __init__(self, portfolio_symbols, alerts=None):
329
self.portfolio = {symbol: {'price': 0, 'change': 0} for symbol in portfolio_symbols}
330
self.alerts = alerts or {}
331
self.total_value = 0
332
333
def handle_update(self, message):
334
symbol = message.get('symbol')
335
price = message.get('price', 0)
336
change = message.get('change', 0)
337
338
if symbol in self.portfolio:
339
self.portfolio[symbol].update({'price': price, 'change': change})
340
341
# Check alerts
342
if symbol in self.alerts:
343
self.check_alerts(symbol, price)
344
345
# Update portfolio summary
346
self.update_portfolio_summary()
347
348
def check_alerts(self, symbol, price):
349
alerts = self.alerts[symbol]
350
351
if 'stop_loss' in alerts and price <= alerts['stop_loss']:
352
print(f"π¨ STOP LOSS ALERT: {symbol} @ ${price} (Stop: ${alerts['stop_loss']})")
353
354
if 'take_profit' in alerts and price >= alerts['take_profit']:
355
print(f"π― TAKE PROFIT ALERT: {symbol} @ ${price} (Target: ${alerts['take_profit']})")
356
357
def update_portfolio_summary(self):
358
total_change = sum(data['change'] for data in self.portfolio.values())
359
print(f"Portfolio Update - Total Change: ${total_change:+.2f}")
360
361
# Usage
362
portfolio_symbols = ["AAPL", "GOOGL", "MSFT"]
363
alerts = {
364
"AAPL": {"stop_loss": 160.0, "take_profit": 180.0},
365
"GOOGL": {"stop_loss": 90.0, "take_profit": 110.0}
366
}
367
368
monitor = PortfolioMonitor(portfolio_symbols, alerts)
369
ws = yf.WebSocket()
370
ws.subscribe(portfolio_symbols)
371
ws.listen(message_handler=monitor.handle_update)
372
```
373
374
### Market Scanner
375
376
```python
377
class MarketScanner:
378
def __init__(self, scan_criteria):
379
self.criteria = scan_criteria
380
self.matches = []
381
382
def scan_message(self, message):
383
symbol = message.get('symbol')
384
price = message.get('price', 0)
385
change_percent = message.get('changePercent', 0)
386
volume = message.get('volume', 0)
387
388
# Apply scanning criteria
389
if self.meets_criteria(message):
390
match = {
391
'symbol': symbol,
392
'price': price,
393
'change_percent': change_percent,
394
'volume': volume,
395
'timestamp': message.get('timestamp', 0)
396
}
397
398
self.matches.append(match)
399
print(f"π SCANNER MATCH: {symbol} - {change_percent:+.2f}% @ ${price}")
400
401
def meets_criteria(self, message):
402
change_percent = message.get('changePercent', 0)
403
volume = message.get('volume', 0)
404
405
# Example criteria: Large price movement with high volume
406
return (abs(change_percent) > self.criteria.get('min_change_percent', 5) and
407
volume > self.criteria.get('min_volume', 1000000))
408
409
# Usage
410
scan_criteria = {
411
'min_change_percent': 3.0, # 3% minimum price change
412
'min_volume': 2000000 # 2M minimum volume
413
}
414
415
scanner = MarketScanner(scan_criteria)
416
watchlist = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA"]
417
418
ws = yf.WebSocket()
419
ws.subscribe(watchlist)
420
ws.listen(message_handler=scanner.scan_message)
421
```
422
423
### Data Recording and Analysis
424
425
```python
426
import pandas as pd
427
from datetime import datetime
428
import sqlite3
429
430
class StreamingDataRecorder:
431
def __init__(self, db_path="streaming_data.db"):
432
self.db_path = db_path
433
self.setup_database()
434
435
def setup_database(self):
436
"""Create database table for streaming data."""
437
conn = sqlite3.connect(self.db_path)
438
cursor = conn.cursor()
439
440
cursor.execute('''
441
CREATE TABLE IF NOT EXISTS streaming_data (
442
timestamp INTEGER,
443
symbol TEXT,
444
price REAL,
445
change_amount REAL,
446
change_percent REAL,
447
volume INTEGER,
448
bid REAL,
449
ask REAL
450
)
451
''')
452
453
conn.commit()
454
conn.close()
455
456
def record_message(self, message):
457
"""Record streaming message to database."""
458
conn = sqlite3.connect(self.db_path)
459
cursor = conn.cursor()
460
461
cursor.execute('''
462
INSERT INTO streaming_data
463
(timestamp, symbol, price, change_amount, change_percent, volume, bid, ask)
464
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
465
''', (
466
message.get('timestamp', int(datetime.now().timestamp())),
467
message.get('symbol', ''),
468
message.get('price', 0),
469
message.get('change', 0),
470
message.get('changePercent', 0),
471
message.get('volume', 0),
472
message.get('bid', 0),
473
message.get('ask', 0)
474
))
475
476
conn.commit()
477
conn.close()
478
479
# Print confirmation
480
symbol = message.get('symbol', 'Unknown')
481
price = message.get('price', 0)
482
print(f"π Recorded: {symbol} @ ${price}")
483
484
def get_recorded_data(self, symbol=None, hours=1):
485
"""Retrieve recorded data for analysis."""
486
conn = sqlite3.connect(self.db_path)
487
488
query = '''
489
SELECT * FROM streaming_data
490
WHERE timestamp > ?
491
'''
492
params = [int(datetime.now().timestamp()) - (hours * 3600)]
493
494
if symbol:
495
query += ' AND symbol = ?'
496
params.append(symbol)
497
498
query += ' ORDER BY timestamp DESC'
499
500
df = pd.read_sql_query(query, conn, params=params)
501
conn.close()
502
503
return df
504
505
# Usage
506
recorder = StreamingDataRecorder()
507
508
ws = yf.WebSocket()
509
ws.subscribe(["AAPL", "GOOGL", "MSFT"])
510
ws.listen(message_handler=recorder.record_message)
511
512
# Later, analyze recorded data
513
recent_data = recorder.get_recorded_data(symbol="AAPL", hours=2)
514
print(recent_data.describe())
515
```
516
517
## Error Handling and Reconnection
518
519
### Robust Connection Management
520
521
```python
522
import time
523
import logging
524
525
class RobustWebSocketClient:
526
def __init__(self, symbols, message_handler, max_retries=5):
527
self.symbols = symbols
528
self.message_handler = message_handler
529
self.max_retries = max_retries
530
self.retry_count = 0
531
self.ws = None
532
533
def connect_with_retry(self):
534
"""Connect with automatic retry logic."""
535
while self.retry_count < self.max_retries:
536
try:
537
self.ws = yf.WebSocket(verbose=True)
538
self.ws.subscribe(self.symbols)
539
print(f"β Connected successfully (attempt {self.retry_count + 1})")
540
541
# Reset retry count on successful connection
542
self.retry_count = 0
543
544
# Start listening
545
self.ws.listen(message_handler=self.handle_with_error_recovery)
546
547
except Exception as e:
548
self.retry_count += 1
549
wait_time = min(2 ** self.retry_count, 60) # Exponential backoff
550
551
print(f"β Connection failed (attempt {self.retry_count}): {e}")
552
print(f"β³ Retrying in {wait_time} seconds...")
553
554
time.sleep(wait_time)
555
556
print(f"π« Max retries ({self.max_retries}) exceeded. Giving up.")
557
558
def handle_with_error_recovery(self, message):
559
"""Message handler with error recovery."""
560
try:
561
self.message_handler(message)
562
except Exception as e:
563
print(f"β οΈ Error processing message: {e}")
564
# Continue processing other messages
565
566
def close(self):
567
"""Safely close connection."""
568
if self.ws:
569
try:
570
self.ws.close()
571
print("π Connection closed successfully")
572
except Exception as e:
573
print(f"β οΈ Error closing connection: {e}")
574
575
# Usage
576
def safe_message_handler(message):
577
symbol = message.get('symbol', 'Unknown')
578
price = message.get('price', 0)
579
print(f"{symbol}: ${price}")
580
581
client = RobustWebSocketClient(
582
symbols=["AAPL", "GOOGL", "MSFT"],
583
message_handler=safe_message_handler,
584
max_retries=3
585
)
586
587
try:
588
client.connect_with_retry()
589
except KeyboardInterrupt:
590
print("\nπ Stopping...")
591
client.close()
592
```
593
594
## Performance Considerations
595
596
### Memory Management
597
598
```python
599
from collections import deque
600
import threading
601
602
class EfficientStreamProcessor:
603
def __init__(self, buffer_size=1000):
604
self.buffer = deque(maxlen=buffer_size) # Fixed-size buffer
605
self.lock = threading.Lock()
606
607
def process_message(self, message):
608
"""Process message with memory-efficient buffering."""
609
with self.lock:
610
# Add to buffer (automatically removes oldest when full)
611
self.buffer.append({
612
'symbol': message.get('symbol'),
613
'price': message.get('price'),
614
'timestamp': message.get('timestamp')
615
})
616
617
# Process latest message
618
self.handle_latest_update(message)
619
620
def handle_latest_update(self, message):
621
"""Handle the latest update efficiently."""
622
symbol = message.get('symbol', 'Unknown')
623
price = message.get('price', 0)
624
print(f"{symbol}: ${price}")
625
626
def get_recent_data(self, count=10):
627
"""Get recent data from buffer."""
628
with self.lock:
629
return list(self.buffer)[-count:]
630
631
# Usage
632
processor = EfficientStreamProcessor(buffer_size=500)
633
ws = yf.WebSocket()
634
ws.subscribe(["AAPL", "GOOGL"])
635
ws.listen(message_handler=processor.process_message)
636
```
637
638
### Batch Processing
639
640
```python
641
import asyncio
642
from datetime import datetime, timedelta
643
644
class BatchStreamProcessor:
645
def __init__(self, batch_size=10, batch_timeout=5):
646
self.batch_size = batch_size
647
self.batch_timeout = batch_timeout
648
self.message_batch = []
649
self.last_batch_time = datetime.now()
650
651
async def handle_message(self, message):
652
"""Add message to batch and process when ready."""
653
self.message_batch.append(message)
654
655
# Process batch if size limit reached or timeout exceeded
656
if (len(self.message_batch) >= self.batch_size or
657
datetime.now() - self.last_batch_time >= timedelta(seconds=self.batch_timeout)):
658
await self.process_batch()
659
660
async def process_batch(self):
661
"""Process accumulated messages in batch."""
662
if not self.message_batch:
663
return
664
665
# Simulate batch processing (database write, API call, etc.)
666
print(f"π¦ Processing batch of {len(self.message_batch)} messages")
667
668
# Process each message in the batch
669
for message in self.message_batch:
670
symbol = message.get('symbol', 'Unknown')
671
price = message.get('price', 0)
672
# Batch processing logic here
673
674
# Clear batch and reset timer
675
self.message_batch.clear()
676
self.last_batch_time = datetime.now()
677
678
# Usage
679
async def main():
680
processor = BatchStreamProcessor(batch_size=5, batch_timeout=3)
681
682
async with yf.AsyncWebSocket() as ws:
683
await ws.subscribe(["AAPL", "GOOGL", "MSFT"])
684
await ws.listen(message_handler=processor.handle_message)
685
686
asyncio.run(main())
687
```