or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

alerts-screening.mdauthentication.mdindex.mdmarket-data.mdoptions.mdpaper-trading.mdportfolio.mdstreaming.mdtrading.md

streaming.mddocs/

0

# Real-time Streaming

1

2

MQTT-based real-time data streaming for live price updates and order status changes with subscription management and callback handling.

3

4

## Prerequisites

5

6

Real-time streaming requires:

7

1. Successful login with valid session

8

2. Device ID (did) from webull session

9

3. Access token for streaming authentication

10

11

## Capabilities

12

13

### StreamConn Class

14

15

Main streaming connection class for real-time data.

16

17

```python { .api }

18

class StreamConn:

19

def __init__(self, debug_flg=False):

20

"""

21

Initialize streaming connection.

22

23

Parameters:

24

- debug_flg (bool): Enable debug logging for streaming events

25

"""

26

```

27

28

### Connection Management

29

30

Establish and manage MQTT connections for real-time data streaming.

31

32

```python { .api }

33

def connect(self, did, access_token=None):

34

"""

35

Connect to Webull streaming service.

36

37

Parameters:

38

- did (str): Device ID from webull session

39

- access_token (str, optional): Access token for authentication

40

41

Returns:

42

bool: True if connection successful, False otherwise

43

"""

44

45

def run_blocking_loop(self):

46

"""

47

Run the streaming event loop in blocking mode.

48

49

This method blocks execution and continuously processes streaming messages.

50

Use this for dedicated streaming applications.

51

"""

52

53

def run_loop_once(self):

54

"""

55

Process streaming events once without blocking.

56

57

Returns after processing available messages. Use this method when

58

integrating streaming with other application logic.

59

"""

60

```

61

62

Usage examples:

63

64

```python

65

from webull import webull, StreamConn

66

67

# Initialize clients

68

wb = webull()

69

stream = StreamConn(debug_flg=True)

70

71

# Login to get session data

72

wb.login('your_email@example.com', 'your_password')

73

74

# Connect to streaming service

75

did = wb._get_did() # Get device ID from session

76

access_token = wb._access_token # Get access token

77

78

success = stream.connect(did, access_token)

79

if success:

80

print("Connected to streaming service")

81

else:

82

print("Failed to connect")

83

84

# Start streaming (blocking)

85

stream.run_blocking_loop()

86

```

87

88

### Subscription Management

89

90

Subscribe to and unsubscribe from real-time data feeds.

91

92

```python { .api }

93

def subscribe(self, tId=None, level=105):

94

"""

95

Subscribe to real-time price updates for a security.

96

97

Parameters:

98

- tId (int): Ticker ID to subscribe to

99

- level (int): Subscription level (105 is most comprehensive)

100

- 101: Basic price data

101

- 102: Enhanced price data with volume

102

- 103: Trade tick data

103

- 104: Level 2 order book data

104

- 105: Comprehensive data (recommended)

105

- 106-108: Specialized data combinations

106

107

Returns:

108

bool: True if subscription successful

109

"""

110

111

def unsubscribe(self, tId=None, level=105):

112

"""

113

Unsubscribe from real-time updates for a security.

114

115

Parameters:

116

- tId (int): Ticker ID to unsubscribe from

117

- level (int): Subscription level to remove

118

119

Returns:

120

bool: True if unsubscription successful

121

"""

122

```

123

124

Usage examples:

125

126

```python

127

# Get ticker ID for subscription

128

ticker_id = wb.get_ticker('AAPL')

129

130

# Subscribe to real-time data

131

stream.subscribe(tId=ticker_id, level=105)

132

133

# Subscribe to multiple stocks

134

symbols = ['AAPL', 'TSLA', 'MSFT']

135

for symbol in symbols:

136

tid = wb.get_ticker(symbol)

137

stream.subscribe(tId=tid, level=105)

138

139

# Unsubscribe when done

140

stream.unsubscribe(tId=ticker_id, level=105)

141

```

142

143

### Message Callbacks

144

145

Handle incoming real-time price and order messages.

146

147

```python { .api }

148

def on_price_message(self, topic, data):

149

"""

150

Callback function for price update messages.

151

152

This method should be overridden to handle price updates.

153

154

Parameters:

155

- topic (str): Message topic containing message type and ticker ID

156

- data (dict): Price update data containing:

157

- tickerId: Security ticker ID

158

- price: Current price

159

- change: Price change

160

- changeRatio: Percentage change

161

- volume: Trading volume

162

- high: Session high

163

- low: Session low

164

- status: Market status (F=pre, T=regular, A=after)

165

"""

166

167

def on_order_message(self, topic, data):

168

"""

169

Callback function for order status messages.

170

171

This method should be overridden to handle order updates.

172

173

Parameters:

174

- topic (str): Message topic

175

- data (dict): Order update data containing:

176

- orderId: Order identifier

177

- status: Order status

178

- filledQuantity: Quantity filled

179

- tickerId: Security ticker ID

180

- action: Order action (BUY/SELL)

181

"""

182

```

183

184

## Custom Message Handlers

185

186

Implement custom message handlers by subclassing or setting callback functions:

187

188

```python

189

class CustomStreamConn(StreamConn):

190

def __init__(self, debug_flg=False):

191

super().__init__(debug_flg)

192

self.price_data = {}

193

self.order_updates = []

194

195

def on_price_message(self, topic, data):

196

"""Custom price message handler."""

197

ticker_id = data.get('tickerId')

198

price = data.get('price', 0)

199

change_pct = data.get('changeRatio', 0)

200

201

# Store latest price data

202

self.price_data[ticker_id] = {

203

'price': price,

204

'change_pct': change_pct,

205

'timestamp': time.time()

206

}

207

208

# Print price updates

209

print(f"Price Update - Ticker {ticker_id}: ${price} ({change_pct:+.2f}%)")

210

211

# Custom logic for price alerts

212

if abs(change_pct) > 5.0: # Alert on >5% moves

213

print(f"🚨 Large move alert: {change_pct:+.2f}%")

214

215

def on_order_message(self, topic, data):

216

"""Custom order message handler."""

217

order_id = data.get('orderId')

218

status = data.get('status')

219

filled_qty = data.get('filledQuantity', 0)

220

221

# Store order updates

222

self.order_updates.append({

223

'orderId': order_id,

224

'status': status,

225

'filledQuantity': filled_qty,

226

'timestamp': time.time()

227

})

228

229

print(f"Order Update - {order_id}: {status} (Filled: {filled_qty})")

230

231

# Custom logic for order notifications

232

if status == 'FILLED':

233

print(f"βœ… Order {order_id} completely filled!")

234

elif status == 'CANCELLED':

235

print(f"❌ Order {order_id} cancelled")

236

237

# Usage

238

custom_stream = CustomStreamConn(debug_flg=True)

239

```

240

241

## Streaming Data Types

242

243

### Price Message Topics

244

245

Different message types provide various levels of market data:

246

247

- **Topic 101**: Basic close price, change, market value, change ratio

248

- **Topic 102**: Enhanced data with high, low, open, close, volume during market hours; pre/after market price data during extended hours

249

- **Topic 103**: Individual trade tick data with price, volume, and trade time

250

- **Topic 104**: Level 2 order book data with bid/ask lists and depths

251

- **Topic 105**: Combination of 102 and 103 (most commonly used)

252

- **Topic 106**: Combination of 102 (recommended for most applications)

253

- **Topic 107**: Combination of 103 and 104

254

- **Topic 108**: Combination of 103, 104, and additional depth data

255

256

### Market Status Indicators

257

258

Price messages include status field indicating market session:

259

- **F**: Pre-market hours

260

- **T**: Regular trading hours

261

- **A**: After-market hours

262

263

## Complete Streaming Example

264

265

```python

266

import time

267

import threading

268

from webull import webull, StreamConn

269

270

class TradingStreamMonitor(StreamConn):

271

def __init__(self, wb_client, debug_flg=False):

272

super().__init__(debug_flg)

273

self.wb = wb_client

274

self.watchlist = {}

275

self.alerts = {}

276

self.running = False

277

278

def add_stock_to_watch(self, symbol, alert_threshold=5.0):

279

"""Add stock to watchlist with price alert threshold."""

280

try:

281

ticker_id = self.wb.get_ticker(symbol)

282

self.watchlist[ticker_id] = {

283

'symbol': symbol,

284

'alert_threshold': alert_threshold,

285

'last_price': None,

286

'last_update': None

287

}

288

289

# Subscribe to real-time data

290

self.subscribe(tId=ticker_id, level=105)

291

print(f"Added {symbol} (ID: {ticker_id}) to watchlist")

292

293

except Exception as e:

294

print(f"Error adding {symbol} to watchlist: {e}")

295

296

def on_price_message(self, topic, data):

297

"""Handle price updates with custom alerts."""

298

ticker_id = data.get('tickerId')

299

if ticker_id not in self.watchlist:

300

return

301

302

stock_info = self.watchlist[ticker_id]

303

symbol = stock_info['symbol']

304

305

# Extract price data

306

price = data.get('price', 0)

307

change_pct = data.get('changeRatio', 0)

308

volume = data.get('volume', 0)

309

market_status = data.get('status', 'T')

310

311

# Update watchlist

312

stock_info['last_price'] = price

313

stock_info['last_update'] = time.time()

314

315

# Market status indicator

316

status_emoji = {"F": "πŸŒ…", "T": "πŸ“ˆ", "A": "πŸŒƒ"}.get(market_status, "πŸ“Š")

317

318

print(f"{status_emoji} {symbol}: ${price:.2f} ({change_pct:+.2f}%) Vol: {volume:,}")

319

320

# Price alerts

321

threshold = stock_info['alert_threshold']

322

if abs(change_pct) >= threshold:

323

direction = "πŸš€" if change_pct > 0 else "πŸ“‰"

324

print(f"🚨 ALERT: {symbol} {direction} {change_pct:+.2f}% - Threshold: {threshold}%")

325

326

def on_order_message(self, topic, data):

327

"""Handle order status updates."""

328

order_id = data.get('orderId', 'Unknown')

329

status = data.get('status', 'Unknown')

330

filled_qty = data.get('filledQuantity', 0)

331

ticker_id = data.get('tickerId')

332

333

# Find symbol for ticker ID

334

symbol = 'Unknown'

335

for tid, info in self.watchlist.items():

336

if tid == ticker_id:

337

symbol = info['symbol']

338

break

339

340

status_emoji = {

341

'FILLED': 'βœ…',

342

'PARTIAL': 'πŸ”„',

343

'CANCELLED': '❌',

344

'PENDING': '⏳'

345

}.get(status, 'πŸ“‹')

346

347

print(f"{status_emoji} Order {order_id} ({symbol}): {status} - Filled: {filled_qty}")

348

349

def start_monitoring(self):

350

"""Start the streaming monitor."""

351

self.running = True

352

print("Starting real-time monitoring...")

353

354

# Run streaming loop in separate thread

355

def stream_loop():

356

while self.running:

357

self.run_loop_once()

358

time.sleep(0.1) # Small delay to prevent excessive CPU usage

359

360

stream_thread = threading.Thread(target=stream_loop, daemon=True)

361

stream_thread.start()

362

return stream_thread

363

364

def stop_monitoring(self):

365

"""Stop the streaming monitor."""

366

self.running = False

367

print("Stopping real-time monitoring...")

368

369

# Main usage example

370

def main():

371

# Initialize clients

372

wb = webull()

373

stream_monitor = TradingStreamMonitor(wb, debug_flg=False)

374

375

try:

376

# Login

377

wb.login('your_email@example.com', 'your_password')

378

379

# Connect to streaming

380

did = wb._get_did()

381

access_token = wb._access_token

382

383

if stream_monitor.connect(did, access_token):

384

print("Connected to streaming service successfully")

385

386

# Add stocks to monitor

387

watchlist = [

388

('AAPL', 3.0), # Alert on 3%+ moves

389

('TSLA', 5.0), # Alert on 5%+ moves

390

('MSFT', 2.0), # Alert on 2%+ moves

391

('NVDA', 4.0), # Alert on 4%+ moves

392

]

393

394

for symbol, threshold in watchlist:

395

stream_monitor.add_stock_to_watch(symbol, threshold)

396

397

# Start monitoring

398

stream_thread = stream_monitor.start_monitoring()

399

400

# Keep monitoring for specified time

401

print("Monitoring for 60 seconds...")

402

time.sleep(60)

403

404

# Stop monitoring

405

stream_monitor.stop_monitoring()

406

stream_thread.join()

407

408

else:

409

print("Failed to connect to streaming service")

410

411

except KeyboardInterrupt:

412

print("\nStopping due to user interrupt...")

413

stream_monitor.stop_monitoring()

414

415

except Exception as e:

416

print(f"Streaming error: {e}")

417

418

if __name__ == "__main__":

419

main()

420

```

421

422

## Advanced Streaming Patterns

423

424

### Portfolio Monitoring

425

426

Monitor all positions in real-time:

427

428

```python

429

def monitor_portfolio_realtime(wb, stream):

430

"""Monitor all portfolio positions in real-time."""

431

432

# Get current positions

433

positions = wb.get_positions()

434

435

print(f"Monitoring {len(positions)} positions in real-time...")

436

437

for position in positions:

438

symbol = position['ticker']['symbol']

439

ticker_id = position['ticker']['tickerId']

440

shares = position['position']

441

442

print(f"Subscribing to {symbol} ({shares} shares)")

443

stream.subscribe(tId=ticker_id, level=105)

444

445

# Usage

446

monitor_portfolio_realtime(wb, stream)

447

```

448

449

### Order Execution Monitoring

450

451

Track order fills in real-time:

452

453

```python

454

class OrderTracker(StreamConn):

455

def __init__(self, webull_client):

456

super().__init__()

457

self.wb = webull_client

458

self.pending_orders = {}

459

460

def track_order(self, order_id):

461

"""Start tracking a specific order."""

462

self.pending_orders[order_id] = {

463

'start_time': time.time(),

464

'fills': []

465

}

466

467

def on_order_message(self, topic, data):

468

"""Track order execution."""

469

order_id = data.get('orderId')

470

471

if order_id in self.pending_orders:

472

status = data.get('status')

473

filled_qty = data.get('filledQuantity', 0)

474

475

self.pending_orders[order_id]['fills'].append({

476

'timestamp': time.time(),

477

'status': status,

478

'filled_qty': filled_qty

479

})

480

481

if status in ['FILLED', 'CANCELLED']:

482

# Order completed, stop tracking

483

order_info = self.pending_orders.pop(order_id)

484

execution_time = time.time() - order_info['start_time']

485

print(f"Order {order_id} completed in {execution_time:.1f}s: {status}")

486

487

# Usage

488

order_tracker = OrderTracker(wb)

489

# After placing order, track it

490

order_result = wb.place_order(stock='AAPL', price=150.0, action='BUY', quant=10)

491

order_tracker.track_order(order_result['orderId'])

492

```

493

494

## Error Handling & Reconnection

495

496

Handle streaming connection issues:

497

498

```python

499

class RobustStreamConn(StreamConn):

500

def __init__(self, wb_client, max_retries=5):

501

super().__init__(debug_flg=True)

502

self.wb = wb_client

503

self.max_retries = max_retries

504

self.reconnect_count = 0

505

self.subscriptions = set() # Track active subscriptions

506

507

def connect_with_retry(self):

508

"""Connect with automatic retry on failure."""

509

for attempt in range(self.max_retries):

510

try:

511

did = self.wb._get_did()

512

access_token = self.wb._access_token

513

514

if self.connect(did, access_token):

515

print(f"Connected successfully (attempt {attempt + 1})")

516

self.reconnect_count = 0

517

518

# Restore subscriptions after reconnection

519

self.restore_subscriptions()

520

return True

521

522

except Exception as e:

523

print(f"Connection attempt {attempt + 1} failed: {e}")

524

time.sleep(2 ** attempt) # Exponential backoff

525

526

print("Max connection retries exceeded")

527

return False

528

529

def subscribe(self, tId=None, level=105):

530

"""Subscribe and track subscription."""

531

success = super().subscribe(tId, level)

532

if success:

533

self.subscriptions.add((tId, level))

534

return success

535

536

def restore_subscriptions(self):

537

"""Restore all subscriptions after reconnection."""

538

print(f"Restoring {len(self.subscriptions)} subscriptions...")

539

for tId, level in self.subscriptions:

540

super().subscribe(tId, level)

541

542

# Usage

543

robust_stream = RobustStreamConn(wb)

544

robust_stream.connect_with_retry()

545

```