0
# Real-time Streaming
1
2
MQTT-based real-time data streaming for live price updates and order status changes with subscription management and callback handling.
3
4
## Prerequisites
5
6
Real-time streaming requires:
7
1. Successful login with valid session
8
2. Device ID (did) from webull session
9
3. Access token for streaming authentication
10
11
## Capabilities
12
13
### StreamConn Class
14
15
Main streaming connection class for real-time data.
16
17
```python { .api }
18
class StreamConn:
19
def __init__(self, debug_flg=False):
20
"""
21
Initialize streaming connection.
22
23
Parameters:
24
- debug_flg (bool): Enable debug logging for streaming events
25
"""
26
```
27
28
### Connection Management
29
30
Establish and manage MQTT connections for real-time data streaming.
31
32
```python { .api }
33
def connect(self, did, access_token=None):
34
"""
35
Connect to Webull streaming service.
36
37
Parameters:
38
- did (str): Device ID from webull session
39
- access_token (str, optional): Access token for authentication
40
41
Returns:
42
bool: True if connection successful, False otherwise
43
"""
44
45
def run_blocking_loop(self):
46
"""
47
Run the streaming event loop in blocking mode.
48
49
This method blocks execution and continuously processes streaming messages.
50
Use this for dedicated streaming applications.
51
"""
52
53
def run_loop_once(self):
54
"""
55
Process streaming events once without blocking.
56
57
Returns after processing available messages. Use this method when
58
integrating streaming with other application logic.
59
"""
60
```
61
62
Usage examples:
63
64
```python
65
from webull import webull, StreamConn
66
67
# Initialize clients
68
wb = webull()
69
stream = StreamConn(debug_flg=True)
70
71
# Login to get session data
72
wb.login('your_email@example.com', 'your_password')
73
74
# Connect to streaming service
75
did = wb._get_did() # Get device ID from session
76
access_token = wb._access_token # Get access token
77
78
success = stream.connect(did, access_token)
79
if success:
80
print("Connected to streaming service")
81
else:
82
print("Failed to connect")
83
84
# Start streaming (blocking)
85
stream.run_blocking_loop()
86
```
87
88
### Subscription Management
89
90
Subscribe to and unsubscribe from real-time data feeds.
91
92
```python { .api }
93
def subscribe(self, tId=None, level=105):
94
"""
95
Subscribe to real-time price updates for a security.
96
97
Parameters:
98
- tId (int): Ticker ID to subscribe to
99
- level (int): Subscription level (105 is most comprehensive)
100
- 101: Basic price data
101
- 102: Enhanced price data with volume
102
- 103: Trade tick data
103
- 104: Level 2 order book data
104
- 105: Comprehensive data (recommended)
105
- 106-108: Specialized data combinations
106
107
Returns:
108
bool: True if subscription successful
109
"""
110
111
def unsubscribe(self, tId=None, level=105):
112
"""
113
Unsubscribe from real-time updates for a security.
114
115
Parameters:
116
- tId (int): Ticker ID to unsubscribe from
117
- level (int): Subscription level to remove
118
119
Returns:
120
bool: True if unsubscription successful
121
"""
122
```
123
124
Usage examples:
125
126
```python
127
# Get ticker ID for subscription
128
ticker_id = wb.get_ticker('AAPL')
129
130
# Subscribe to real-time data
131
stream.subscribe(tId=ticker_id, level=105)
132
133
# Subscribe to multiple stocks
134
symbols = ['AAPL', 'TSLA', 'MSFT']
135
for symbol in symbols:
136
tid = wb.get_ticker(symbol)
137
stream.subscribe(tId=tid, level=105)
138
139
# Unsubscribe when done
140
stream.unsubscribe(tId=ticker_id, level=105)
141
```
142
143
### Message Callbacks
144
145
Handle incoming real-time price and order messages.
146
147
```python { .api }
148
def on_price_message(self, topic, data):
149
"""
150
Callback function for price update messages.
151
152
This method should be overridden to handle price updates.
153
154
Parameters:
155
- topic (str): Message topic containing message type and ticker ID
156
- data (dict): Price update data containing:
157
- tickerId: Security ticker ID
158
- price: Current price
159
- change: Price change
160
- changeRatio: Percentage change
161
- volume: Trading volume
162
- high: Session high
163
- low: Session low
164
- status: Market status (F=pre, T=regular, A=after)
165
"""
166
167
def on_order_message(self, topic, data):
168
"""
169
Callback function for order status messages.
170
171
This method should be overridden to handle order updates.
172
173
Parameters:
174
- topic (str): Message topic
175
- data (dict): Order update data containing:
176
- orderId: Order identifier
177
- status: Order status
178
- filledQuantity: Quantity filled
179
- tickerId: Security ticker ID
180
- action: Order action (BUY/SELL)
181
"""
182
```
183
184
## Custom Message Handlers
185
186
Implement custom message handlers by subclassing or setting callback functions:
187
188
```python
189
class CustomStreamConn(StreamConn):
190
def __init__(self, debug_flg=False):
191
super().__init__(debug_flg)
192
self.price_data = {}
193
self.order_updates = []
194
195
def on_price_message(self, topic, data):
196
"""Custom price message handler."""
197
ticker_id = data.get('tickerId')
198
price = data.get('price', 0)
199
change_pct = data.get('changeRatio', 0)
200
201
# Store latest price data
202
self.price_data[ticker_id] = {
203
'price': price,
204
'change_pct': change_pct,
205
'timestamp': time.time()
206
}
207
208
# Print price updates
209
print(f"Price Update - Ticker {ticker_id}: ${price} ({change_pct:+.2f}%)")
210
211
# Custom logic for price alerts
212
if abs(change_pct) > 5.0: # Alert on >5% moves
213
print(f"π¨ Large move alert: {change_pct:+.2f}%")
214
215
def on_order_message(self, topic, data):
216
"""Custom order message handler."""
217
order_id = data.get('orderId')
218
status = data.get('status')
219
filled_qty = data.get('filledQuantity', 0)
220
221
# Store order updates
222
self.order_updates.append({
223
'orderId': order_id,
224
'status': status,
225
'filledQuantity': filled_qty,
226
'timestamp': time.time()
227
})
228
229
print(f"Order Update - {order_id}: {status} (Filled: {filled_qty})")
230
231
# Custom logic for order notifications
232
if status == 'FILLED':
233
print(f"β Order {order_id} completely filled!")
234
elif status == 'CANCELLED':
235
print(f"β Order {order_id} cancelled")
236
237
# Usage
238
custom_stream = CustomStreamConn(debug_flg=True)
239
```
240
241
## Streaming Data Types
242
243
### Price Message Topics
244
245
Different message types provide various levels of market data:
246
247
- **Topic 101**: Basic close price, change, market value, change ratio
248
- **Topic 102**: Enhanced data with high, low, open, close, volume during market hours; pre/after market price data during extended hours
249
- **Topic 103**: Individual trade tick data with price, volume, and trade time
250
- **Topic 104**: Level 2 order book data with bid/ask lists and depths
251
- **Topic 105**: Combination of 102 and 103 (most commonly used)
252
- **Topic 106**: Combination of 102 (recommended for most applications)
253
- **Topic 107**: Combination of 103 and 104
254
- **Topic 108**: Combination of 103, 104, and additional depth data
255
256
### Market Status Indicators
257
258
Price messages include status field indicating market session:
259
- **F**: Pre-market hours
260
- **T**: Regular trading hours
261
- **A**: After-market hours
262
263
## Complete Streaming Example
264
265
```python
266
import time
267
import threading
268
from webull import webull, StreamConn
269
270
class TradingStreamMonitor(StreamConn):
271
def __init__(self, wb_client, debug_flg=False):
272
super().__init__(debug_flg)
273
self.wb = wb_client
274
self.watchlist = {}
275
self.alerts = {}
276
self.running = False
277
278
def add_stock_to_watch(self, symbol, alert_threshold=5.0):
279
"""Add stock to watchlist with price alert threshold."""
280
try:
281
ticker_id = self.wb.get_ticker(symbol)
282
self.watchlist[ticker_id] = {
283
'symbol': symbol,
284
'alert_threshold': alert_threshold,
285
'last_price': None,
286
'last_update': None
287
}
288
289
# Subscribe to real-time data
290
self.subscribe(tId=ticker_id, level=105)
291
print(f"Added {symbol} (ID: {ticker_id}) to watchlist")
292
293
except Exception as e:
294
print(f"Error adding {symbol} to watchlist: {e}")
295
296
def on_price_message(self, topic, data):
297
"""Handle price updates with custom alerts."""
298
ticker_id = data.get('tickerId')
299
if ticker_id not in self.watchlist:
300
return
301
302
stock_info = self.watchlist[ticker_id]
303
symbol = stock_info['symbol']
304
305
# Extract price data
306
price = data.get('price', 0)
307
change_pct = data.get('changeRatio', 0)
308
volume = data.get('volume', 0)
309
market_status = data.get('status', 'T')
310
311
# Update watchlist
312
stock_info['last_price'] = price
313
stock_info['last_update'] = time.time()
314
315
# Market status indicator
316
status_emoji = {"F": "π ", "T": "π", "A": "π"}.get(market_status, "π")
317
318
print(f"{status_emoji} {symbol}: ${price:.2f} ({change_pct:+.2f}%) Vol: {volume:,}")
319
320
# Price alerts
321
threshold = stock_info['alert_threshold']
322
if abs(change_pct) >= threshold:
323
direction = "π" if change_pct > 0 else "π"
324
print(f"π¨ ALERT: {symbol} {direction} {change_pct:+.2f}% - Threshold: {threshold}%")
325
326
def on_order_message(self, topic, data):
327
"""Handle order status updates."""
328
order_id = data.get('orderId', 'Unknown')
329
status = data.get('status', 'Unknown')
330
filled_qty = data.get('filledQuantity', 0)
331
ticker_id = data.get('tickerId')
332
333
# Find symbol for ticker ID
334
symbol = 'Unknown'
335
for tid, info in self.watchlist.items():
336
if tid == ticker_id:
337
symbol = info['symbol']
338
break
339
340
status_emoji = {
341
'FILLED': 'β ',
342
'PARTIAL': 'π',
343
'CANCELLED': 'β',
344
'PENDING': 'β³'
345
}.get(status, 'π')
346
347
print(f"{status_emoji} Order {order_id} ({symbol}): {status} - Filled: {filled_qty}")
348
349
def start_monitoring(self):
350
"""Start the streaming monitor."""
351
self.running = True
352
print("Starting real-time monitoring...")
353
354
# Run streaming loop in separate thread
355
def stream_loop():
356
while self.running:
357
self.run_loop_once()
358
time.sleep(0.1) # Small delay to prevent excessive CPU usage
359
360
stream_thread = threading.Thread(target=stream_loop, daemon=True)
361
stream_thread.start()
362
return stream_thread
363
364
def stop_monitoring(self):
365
"""Stop the streaming monitor."""
366
self.running = False
367
print("Stopping real-time monitoring...")
368
369
# Main usage example
370
def main():
371
# Initialize clients
372
wb = webull()
373
stream_monitor = TradingStreamMonitor(wb, debug_flg=False)
374
375
try:
376
# Login
377
wb.login('your_email@example.com', 'your_password')
378
379
# Connect to streaming
380
did = wb._get_did()
381
access_token = wb._access_token
382
383
if stream_monitor.connect(did, access_token):
384
print("Connected to streaming service successfully")
385
386
# Add stocks to monitor
387
watchlist = [
388
('AAPL', 3.0), # Alert on 3%+ moves
389
('TSLA', 5.0), # Alert on 5%+ moves
390
('MSFT', 2.0), # Alert on 2%+ moves
391
('NVDA', 4.0), # Alert on 4%+ moves
392
]
393
394
for symbol, threshold in watchlist:
395
stream_monitor.add_stock_to_watch(symbol, threshold)
396
397
# Start monitoring
398
stream_thread = stream_monitor.start_monitoring()
399
400
# Keep monitoring for specified time
401
print("Monitoring for 60 seconds...")
402
time.sleep(60)
403
404
# Stop monitoring
405
stream_monitor.stop_monitoring()
406
stream_thread.join()
407
408
else:
409
print("Failed to connect to streaming service")
410
411
except KeyboardInterrupt:
412
print("\nStopping due to user interrupt...")
413
stream_monitor.stop_monitoring()
414
415
except Exception as e:
416
print(f"Streaming error: {e}")
417
418
if __name__ == "__main__":
419
main()
420
```
421
422
## Advanced Streaming Patterns
423
424
### Portfolio Monitoring
425
426
Monitor all positions in real-time:
427
428
```python
429
def monitor_portfolio_realtime(wb, stream):
430
"""Monitor all portfolio positions in real-time."""
431
432
# Get current positions
433
positions = wb.get_positions()
434
435
print(f"Monitoring {len(positions)} positions in real-time...")
436
437
for position in positions:
438
symbol = position['ticker']['symbol']
439
ticker_id = position['ticker']['tickerId']
440
shares = position['position']
441
442
print(f"Subscribing to {symbol} ({shares} shares)")
443
stream.subscribe(tId=ticker_id, level=105)
444
445
# Usage
446
monitor_portfolio_realtime(wb, stream)
447
```
448
449
### Order Execution Monitoring
450
451
Track order fills in real-time:
452
453
```python
454
class OrderTracker(StreamConn):
455
def __init__(self, webull_client):
456
super().__init__()
457
self.wb = webull_client
458
self.pending_orders = {}
459
460
def track_order(self, order_id):
461
"""Start tracking a specific order."""
462
self.pending_orders[order_id] = {
463
'start_time': time.time(),
464
'fills': []
465
}
466
467
def on_order_message(self, topic, data):
468
"""Track order execution."""
469
order_id = data.get('orderId')
470
471
if order_id in self.pending_orders:
472
status = data.get('status')
473
filled_qty = data.get('filledQuantity', 0)
474
475
self.pending_orders[order_id]['fills'].append({
476
'timestamp': time.time(),
477
'status': status,
478
'filled_qty': filled_qty
479
})
480
481
if status in ['FILLED', 'CANCELLED']:
482
# Order completed, stop tracking
483
order_info = self.pending_orders.pop(order_id)
484
execution_time = time.time() - order_info['start_time']
485
print(f"Order {order_id} completed in {execution_time:.1f}s: {status}")
486
487
# Usage
488
order_tracker = OrderTracker(wb)
489
# After placing order, track it
490
order_result = wb.place_order(stock='AAPL', price=150.0, action='BUY', quant=10)
491
order_tracker.track_order(order_result['orderId'])
492
```
493
494
## Error Handling & Reconnection
495
496
Handle streaming connection issues:
497
498
```python
499
class RobustStreamConn(StreamConn):
500
def __init__(self, wb_client, max_retries=5):
501
super().__init__(debug_flg=True)
502
self.wb = wb_client
503
self.max_retries = max_retries
504
self.reconnect_count = 0
505
self.subscriptions = set() # Track active subscriptions
506
507
def connect_with_retry(self):
508
"""Connect with automatic retry on failure."""
509
for attempt in range(self.max_retries):
510
try:
511
did = self.wb._get_did()
512
access_token = self.wb._access_token
513
514
if self.connect(did, access_token):
515
print(f"Connected successfully (attempt {attempt + 1})")
516
self.reconnect_count = 0
517
518
# Restore subscriptions after reconnection
519
self.restore_subscriptions()
520
return True
521
522
except Exception as e:
523
print(f"Connection attempt {attempt + 1} failed: {e}")
524
time.sleep(2 ** attempt) # Exponential backoff
525
526
print("Max connection retries exceeded")
527
return False
528
529
def subscribe(self, tId=None, level=105):
530
"""Subscribe and track subscription."""
531
success = super().subscribe(tId, level)
532
if success:
533
self.subscriptions.add((tId, level))
534
return success
535
536
def restore_subscriptions(self):
537
"""Restore all subscriptions after reconnection."""
538
print(f"Restoring {len(self.subscriptions)} subscriptions...")
539
for tId, level in self.subscriptions:
540
super().subscribe(tId, level)
541
542
# Usage
543
robust_stream = RobustStreamConn(wb)
544
robust_stream.connect_with_retry()
545
```