or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-data.mdconfig-utils.mdindex.mdlive-streaming.mdmarket-sector.mdscreening.mdsearch-lookup.mdticker-data.md

live-streaming.mddocs/

0

# Real-time Data Streaming

1

2

Live financial data streaming using WebSocket connections with both synchronous and asynchronous support. Enable real-time price updates, volume changes, and market data streaming for active trading and monitoring applications.

3

4

## Capabilities

5

6

### Synchronous WebSocket Streaming

7

8

Real-time data streaming using synchronous WebSocket connections for traditional applications.

9

10

```python { .api }

11

class WebSocket:

12

def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2",

13

verbose: bool = True):

14

"""

15

Create a WebSocket connection for real-time data streaming.

16

17

Parameters:

18

- url: str, WebSocket server URL (default: Yahoo Finance streaming endpoint)

19

- verbose: bool, enable verbose logging of connection events

20

"""

21

22

def subscribe(self, symbols: Union[str, List[str]]):

23

"""

24

Subscribe to real-time data for specified symbols.

25

26

Parameters:

27

- symbols: str or list, ticker symbols to subscribe to

28

"""

29

30

def unsubscribe(self, symbols: Union[str, List[str]]):

31

"""

32

Unsubscribe from real-time data for specified symbols.

33

34

Parameters:

35

- symbols: str or list, ticker symbols to unsubscribe from

36

"""

37

38

def listen(self, message_handler: Optional[Callable] = None):

39

"""

40

Start listening for real-time data messages.

41

42

Parameters:

43

- message_handler: function to handle incoming messages

44

If None, messages are printed to console

45

"""

46

47

def close(self):

48

"""

49

Close the WebSocket connection.

50

"""

51

```

52

53

#### Context Manager Support

54

55

```python { .api }

56

# WebSocket can be used as a context manager

57

with WebSocket(verbose=True) as ws:

58

ws.subscribe(["AAPL", "GOOGL"])

59

ws.listen(message_handler=custom_handler)

60

# Connection automatically closed when exiting context

61

```

62

63

#### Usage Examples

64

65

```python

66

import yfinance as yf

67

68

# Basic real-time streaming

69

def handle_price_update(message):

70

symbol = message.get('symbol', 'Unknown')

71

price = message.get('price', 0)

72

print(f"{symbol}: ${price}")

73

74

ws = yf.WebSocket(verbose=True)

75

ws.subscribe(["AAPL", "GOOGL", "MSFT"])

76

ws.listen(message_handler=handle_price_update)

77

78

# Using context manager

79

with yf.WebSocket() as ws:

80

ws.subscribe("AAPL")

81

ws.listen() # Uses default console output

82

83

# Managing subscriptions

84

ws = yf.WebSocket()

85

ws.subscribe(["AAPL", "GOOGL"]) # Initial subscription

86

ws.subscribe("MSFT") # Add more symbols

87

ws.unsubscribe("GOOGL") # Remove specific symbol

88

ws.listen(handle_price_update)

89

```

90

91

### Asynchronous WebSocket Streaming

92

93

Real-time data streaming using asynchronous WebSocket connections for modern async applications.

94

95

```python { .api }

96

class AsyncWebSocket:

97

def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2",

98

verbose: bool = True):

99

"""

100

Create an async WebSocket connection for real-time data streaming.

101

102

Parameters:

103

- url: str, WebSocket server URL

104

- verbose: bool, enable verbose logging

105

"""

106

107

async def subscribe(self, symbols: Union[str, List[str]]):

108

"""

109

Asynchronously subscribe to real-time data.

110

111

Parameters:

112

- symbols: str or list, ticker symbols to subscribe to

113

"""

114

115

async def unsubscribe(self, symbols: Union[str, List[str]]):

116

"""

117

Asynchronously unsubscribe from real-time data.

118

119

Parameters:

120

- symbols: str or list, ticker symbols to unsubscribe from

121

"""

122

123

async def listen(self, message_handler: Optional[Callable] = None):

124

"""

125

Asynchronously listen for real-time data messages.

126

127

Parameters:

128

- message_handler: async function to handle incoming messages

129

"""

130

131

async def close(self):

132

"""

133

Asynchronously close the WebSocket connection.

134

"""

135

```

136

137

#### Async Context Manager Support

138

139

```python { .api }

140

# AsyncWebSocket can be used as an async context manager

141

async with AsyncWebSocket(verbose=True) as ws:

142

await ws.subscribe(["AAPL", "GOOGL"])

143

await ws.listen(message_handler=async_handler)

144

# Connection automatically closed when exiting context

145

```

146

147

#### Usage Examples

148

149

```python

150

import asyncio

151

import yfinance as yf

152

153

# Basic async streaming

154

async def async_price_handler(message):

155

symbol = message.get('symbol', 'Unknown')

156

price = message.get('price', 0)

157

print(f"Async: {symbol} -> ${price}")

158

159

async def main():

160

ws = yf.AsyncWebSocket(verbose=True)

161

await ws.subscribe(["AAPL", "GOOGL", "MSFT"])

162

await ws.listen(message_handler=async_price_handler)

163

164

# Run the async function

165

asyncio.run(main())

166

167

# Using async context manager

168

async def stream_with_context():

169

async with yf.AsyncWebSocket() as ws:

170

await ws.subscribe("AAPL")

171

await ws.listen()

172

173

asyncio.run(stream_with_context())

174

```

175

176

### Ticker-Level Streaming

177

178

Start real-time streaming directly from Ticker and Tickers objects.

179

180

```python { .api }

181

# Available on Ticker class

182

def live(self, message_handler: Callable = None, verbose: bool = True):

183

"""

184

Start real-time streaming for this ticker.

185

186

Parameters:

187

- message_handler: function to handle incoming messages

188

- verbose: bool, enable verbose logging

189

"""

190

191

# Available on Tickers class

192

def live(self, message_handler: Callable = None, verbose: bool = True):

193

"""

194

Start real-time streaming for all tickers in the collection.

195

196

Parameters:

197

- message_handler: function to handle incoming messages

198

- verbose: bool, enable verbose logging

199

"""

200

```

201

202

#### Usage Examples

203

204

```python

205

# Single ticker streaming

206

ticker = yf.Ticker("AAPL")

207

ticker.live(message_handler=handle_price_update)

208

209

# Multiple ticker streaming

210

portfolio = yf.Tickers(["AAPL", "GOOGL", "MSFT"])

211

portfolio.live(message_handler=handle_portfolio_update)

212

```

213

214

## Message Structure and Handling

215

216

### Message Format

217

218

Real-time messages contain various fields depending on the data type:

219

220

```python

221

# Typical message structure

222

{

223

'symbol': 'AAPL',

224

'price': 175.43,

225

'change': 2.15,

226

'changePercent': 1.24,

227

'volume': 45672100,

228

'timestamp': 1640995200,

229

'marketHours': 'REGULAR_MARKET',

230

'dayHigh': 176.12,

231

'dayLow': 173.78,

232

'bid': 175.42,

233

'ask': 175.44,

234

'bidSize': 100,

235

'askSize': 200

236

}

237

```

238

239

### Advanced Message Handlers

240

241

```python

242

def comprehensive_message_handler(message):

243

"""Advanced message handler with detailed processing."""

244

245

symbol = message.get('symbol', 'Unknown')

246

price = message.get('price', 0)

247

change = message.get('change', 0)

248

volume = message.get('volume', 0)

249

timestamp = message.get('timestamp', 0)

250

251

# Price movement analysis

252

if change > 0:

253

direction = "β–²"

254

color_code = "\033[92m" # Green

255

elif change < 0:

256

direction = "β–Ό"

257

color_code = "\033[91m" # Red

258

else:

259

direction = "β†’"

260

color_code = "\033[93m" # Yellow

261

262

# Format timestamp

263

import datetime

264

time_str = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')

265

266

# Display formatted message

267

print(f"{color_code}{time_str} {symbol} {direction} ${price:.2f} "

268

f"({change:+.2f}) Vol: {volume:,}\033[0m")

269

270

# Usage

271

ws = yf.WebSocket()

272

ws.subscribe(["AAPL", "GOOGL", "MSFT"])

273

ws.listen(message_handler=comprehensive_message_handler)

274

```

275

276

### Async Message Processing

277

278

```python

279

import asyncio

280

from collections import deque

281

282

class AsyncMessageProcessor:

283

def __init__(self):

284

self.message_queue = deque()

285

self.processing = False

286

287

async def handle_message(self, message):

288

"""Add message to queue for processing."""

289

self.message_queue.append(message)

290

291

if not self.processing:

292

await self.process_messages()

293

294

async def process_messages(self):

295

"""Process queued messages asynchronously."""

296

self.processing = True

297

298

while self.message_queue:

299

message = self.message_queue.popleft()

300

301

# Simulate async processing (database write, API call, etc.)

302

await asyncio.sleep(0.01)

303

304

# Process message

305

symbol = message.get('symbol', 'Unknown')

306

price = message.get('price', 0)

307

print(f"Processed: {symbol} @ ${price}")

308

309

self.processing = False

310

311

# Usage

312

async def main():

313

processor = AsyncMessageProcessor()

314

315

async with yf.AsyncWebSocket() as ws:

316

await ws.subscribe(["AAPL", "GOOGL"])

317

await ws.listen(message_handler=processor.handle_message)

318

319

asyncio.run(main())

320

```

321

322

## Advanced Streaming Patterns

323

324

### Portfolio Monitoring

325

326

```python

327

class PortfolioMonitor:

328

def __init__(self, portfolio_symbols, alerts=None):

329

self.portfolio = {symbol: {'price': 0, 'change': 0} for symbol in portfolio_symbols}

330

self.alerts = alerts or {}

331

self.total_value = 0

332

333

def handle_update(self, message):

334

symbol = message.get('symbol')

335

price = message.get('price', 0)

336

change = message.get('change', 0)

337

338

if symbol in self.portfolio:

339

self.portfolio[symbol].update({'price': price, 'change': change})

340

341

# Check alerts

342

if symbol in self.alerts:

343

self.check_alerts(symbol, price)

344

345

# Update portfolio summary

346

self.update_portfolio_summary()

347

348

def check_alerts(self, symbol, price):

349

alerts = self.alerts[symbol]

350

351

if 'stop_loss' in alerts and price <= alerts['stop_loss']:

352

print(f"🚨 STOP LOSS ALERT: {symbol} @ ${price} (Stop: ${alerts['stop_loss']})")

353

354

if 'take_profit' in alerts and price >= alerts['take_profit']:

355

print(f"🎯 TAKE PROFIT ALERT: {symbol} @ ${price} (Target: ${alerts['take_profit']})")

356

357

def update_portfolio_summary(self):

358

total_change = sum(data['change'] for data in self.portfolio.values())

359

print(f"Portfolio Update - Total Change: ${total_change:+.2f}")

360

361

# Usage

362

portfolio_symbols = ["AAPL", "GOOGL", "MSFT"]

363

alerts = {

364

"AAPL": {"stop_loss": 160.0, "take_profit": 180.0},

365

"GOOGL": {"stop_loss": 90.0, "take_profit": 110.0}

366

}

367

368

monitor = PortfolioMonitor(portfolio_symbols, alerts)

369

ws = yf.WebSocket()

370

ws.subscribe(portfolio_symbols)

371

ws.listen(message_handler=monitor.handle_update)

372

```

373

374

### Market Scanner

375

376

```python

377

class MarketScanner:

378

def __init__(self, scan_criteria):

379

self.criteria = scan_criteria

380

self.matches = []

381

382

def scan_message(self, message):

383

symbol = message.get('symbol')

384

price = message.get('price', 0)

385

change_percent = message.get('changePercent', 0)

386

volume = message.get('volume', 0)

387

388

# Apply scanning criteria

389

if self.meets_criteria(message):

390

match = {

391

'symbol': symbol,

392

'price': price,

393

'change_percent': change_percent,

394

'volume': volume,

395

'timestamp': message.get('timestamp', 0)

396

}

397

398

self.matches.append(match)

399

print(f"πŸ” SCANNER MATCH: {symbol} - {change_percent:+.2f}% @ ${price}")

400

401

def meets_criteria(self, message):

402

change_percent = message.get('changePercent', 0)

403

volume = message.get('volume', 0)

404

405

# Example criteria: Large price movement with high volume

406

return (abs(change_percent) > self.criteria.get('min_change_percent', 5) and

407

volume > self.criteria.get('min_volume', 1000000))

408

409

# Usage

410

scan_criteria = {

411

'min_change_percent': 3.0, # 3% minimum price change

412

'min_volume': 2000000 # 2M minimum volume

413

}

414

415

scanner = MarketScanner(scan_criteria)

416

watchlist = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA"]

417

418

ws = yf.WebSocket()

419

ws.subscribe(watchlist)

420

ws.listen(message_handler=scanner.scan_message)

421

```

422

423

### Data Recording and Analysis

424

425

```python

426

import pandas as pd

427

from datetime import datetime

428

import sqlite3

429

430

class StreamingDataRecorder:

431

def __init__(self, db_path="streaming_data.db"):

432

self.db_path = db_path

433

self.setup_database()

434

435

def setup_database(self):

436

"""Create database table for streaming data."""

437

conn = sqlite3.connect(self.db_path)

438

cursor = conn.cursor()

439

440

cursor.execute('''

441

CREATE TABLE IF NOT EXISTS streaming_data (

442

timestamp INTEGER,

443

symbol TEXT,

444

price REAL,

445

change_amount REAL,

446

change_percent REAL,

447

volume INTEGER,

448

bid REAL,

449

ask REAL

450

)

451

''')

452

453

conn.commit()

454

conn.close()

455

456

def record_message(self, message):

457

"""Record streaming message to database."""

458

conn = sqlite3.connect(self.db_path)

459

cursor = conn.cursor()

460

461

cursor.execute('''

462

INSERT INTO streaming_data

463

(timestamp, symbol, price, change_amount, change_percent, volume, bid, ask)

464

VALUES (?, ?, ?, ?, ?, ?, ?, ?)

465

''', (

466

message.get('timestamp', int(datetime.now().timestamp())),

467

message.get('symbol', ''),

468

message.get('price', 0),

469

message.get('change', 0),

470

message.get('changePercent', 0),

471

message.get('volume', 0),

472

message.get('bid', 0),

473

message.get('ask', 0)

474

))

475

476

conn.commit()

477

conn.close()

478

479

# Print confirmation

480

symbol = message.get('symbol', 'Unknown')

481

price = message.get('price', 0)

482

print(f"πŸ“Š Recorded: {symbol} @ ${price}")

483

484

def get_recorded_data(self, symbol=None, hours=1):

485

"""Retrieve recorded data for analysis."""

486

conn = sqlite3.connect(self.db_path)

487

488

query = '''

489

SELECT * FROM streaming_data

490

WHERE timestamp > ?

491

'''

492

params = [int(datetime.now().timestamp()) - (hours * 3600)]

493

494

if symbol:

495

query += ' AND symbol = ?'

496

params.append(symbol)

497

498

query += ' ORDER BY timestamp DESC'

499

500

df = pd.read_sql_query(query, conn, params=params)

501

conn.close()

502

503

return df

504

505

# Usage

506

recorder = StreamingDataRecorder()

507

508

ws = yf.WebSocket()

509

ws.subscribe(["AAPL", "GOOGL", "MSFT"])

510

ws.listen(message_handler=recorder.record_message)

511

512

# Later, analyze recorded data

513

recent_data = recorder.get_recorded_data(symbol="AAPL", hours=2)

514

print(recent_data.describe())

515

```

516

517

## Error Handling and Reconnection

518

519

### Robust Connection Management

520

521

```python

522

import time

523

import logging

524

525

class RobustWebSocketClient:

526

def __init__(self, symbols, message_handler, max_retries=5):

527

self.symbols = symbols

528

self.message_handler = message_handler

529

self.max_retries = max_retries

530

self.retry_count = 0

531

self.ws = None

532

533

def connect_with_retry(self):

534

"""Connect with automatic retry logic."""

535

while self.retry_count < self.max_retries:

536

try:

537

self.ws = yf.WebSocket(verbose=True)

538

self.ws.subscribe(self.symbols)

539

print(f"βœ… Connected successfully (attempt {self.retry_count + 1})")

540

541

# Reset retry count on successful connection

542

self.retry_count = 0

543

544

# Start listening

545

self.ws.listen(message_handler=self.handle_with_error_recovery)

546

547

except Exception as e:

548

self.retry_count += 1

549

wait_time = min(2 ** self.retry_count, 60) # Exponential backoff

550

551

print(f"❌ Connection failed (attempt {self.retry_count}): {e}")

552

print(f"⏳ Retrying in {wait_time} seconds...")

553

554

time.sleep(wait_time)

555

556

print(f"🚫 Max retries ({self.max_retries}) exceeded. Giving up.")

557

558

def handle_with_error_recovery(self, message):

559

"""Message handler with error recovery."""

560

try:

561

self.message_handler(message)

562

except Exception as e:

563

print(f"⚠️ Error processing message: {e}")

564

# Continue processing other messages

565

566

def close(self):

567

"""Safely close connection."""

568

if self.ws:

569

try:

570

self.ws.close()

571

print("πŸ”Œ Connection closed successfully")

572

except Exception as e:

573

print(f"⚠️ Error closing connection: {e}")

574

575

# Usage

576

def safe_message_handler(message):

577

symbol = message.get('symbol', 'Unknown')

578

price = message.get('price', 0)

579

print(f"{symbol}: ${price}")

580

581

client = RobustWebSocketClient(

582

symbols=["AAPL", "GOOGL", "MSFT"],

583

message_handler=safe_message_handler,

584

max_retries=3

585

)

586

587

try:

588

client.connect_with_retry()

589

except KeyboardInterrupt:

590

print("\nπŸ›‘ Stopping...")

591

client.close()

592

```

593

594

## Performance Considerations

595

596

### Memory Management

597

598

```python

599

from collections import deque

600

import threading

601

602

class EfficientStreamProcessor:

603

def __init__(self, buffer_size=1000):

604

self.buffer = deque(maxlen=buffer_size) # Fixed-size buffer

605

self.lock = threading.Lock()

606

607

def process_message(self, message):

608

"""Process message with memory-efficient buffering."""

609

with self.lock:

610

# Add to buffer (automatically removes oldest when full)

611

self.buffer.append({

612

'symbol': message.get('symbol'),

613

'price': message.get('price'),

614

'timestamp': message.get('timestamp')

615

})

616

617

# Process latest message

618

self.handle_latest_update(message)

619

620

def handle_latest_update(self, message):

621

"""Handle the latest update efficiently."""

622

symbol = message.get('symbol', 'Unknown')

623

price = message.get('price', 0)

624

print(f"{symbol}: ${price}")

625

626

def get_recent_data(self, count=10):

627

"""Get recent data from buffer."""

628

with self.lock:

629

return list(self.buffer)[-count:]

630

631

# Usage

632

processor = EfficientStreamProcessor(buffer_size=500)

633

ws = yf.WebSocket()

634

ws.subscribe(["AAPL", "GOOGL"])

635

ws.listen(message_handler=processor.process_message)

636

```

637

638

### Batch Processing

639

640

```python

641

import asyncio

642

from datetime import datetime, timedelta

643

644

class BatchStreamProcessor:

645

def __init__(self, batch_size=10, batch_timeout=5):

646

self.batch_size = batch_size

647

self.batch_timeout = batch_timeout

648

self.message_batch = []

649

self.last_batch_time = datetime.now()

650

651

async def handle_message(self, message):

652

"""Add message to batch and process when ready."""

653

self.message_batch.append(message)

654

655

# Process batch if size limit reached or timeout exceeded

656

if (len(self.message_batch) >= self.batch_size or

657

datetime.now() - self.last_batch_time >= timedelta(seconds=self.batch_timeout)):

658

await self.process_batch()

659

660

async def process_batch(self):

661

"""Process accumulated messages in batch."""

662

if not self.message_batch:

663

return

664

665

# Simulate batch processing (database write, API call, etc.)

666

print(f"πŸ“¦ Processing batch of {len(self.message_batch)} messages")

667

668

# Process each message in the batch

669

for message in self.message_batch:

670

symbol = message.get('symbol', 'Unknown')

671

price = message.get('price', 0)

672

# Batch processing logic here

673

674

# Clear batch and reset timer

675

self.message_batch.clear()

676

self.last_batch_time = datetime.now()

677

678

# Usage

679

async def main():

680

processor = BatchStreamProcessor(batch_size=5, batch_timeout=3)

681

682

async with yf.AsyncWebSocket() as ws:

683

await ws.subscribe(["AAPL", "GOOGL", "MSFT"])

684

await ws.listen(message_handler=processor.handle_message)

685

686

asyncio.run(main())

687

```