The unofficial python interface for the WeBull API
MQTT-based real-time data streaming for live price updates and order status changes with subscription management and callback handling.
Real-time streaming requires:
Main streaming connection class for real-time data.
class StreamConn:
def __init__(self, debug_flg=False):
"""
Initialize streaming connection.
Parameters:
- debug_flg (bool): Enable debug logging for streaming events
"""Establish and manage MQTT connections for real-time data streaming.
def connect(self, did, access_token=None):
"""
Connect to Webull streaming service.
Parameters:
- did (str): Device ID from webull session
- access_token (str, optional): Access token for authentication
Returns:
bool: True if connection successful, False otherwise
"""
def run_blocking_loop(self):
"""
Run the streaming event loop in blocking mode.
This method blocks execution and continuously processes streaming messages.
Use this for dedicated streaming applications.
"""
def run_loop_once(self):
"""
Process streaming events once without blocking.
Returns after processing available messages. Use this method when
integrating streaming with other application logic.
"""Usage examples:
from webull import webull, StreamConn
# Initialize clients
wb = webull()
stream = StreamConn(debug_flg=True)
# Login to get session data
wb.login('your_email@example.com', 'your_password')
# Connect to streaming service
did = wb._get_did() # Get device ID from session
access_token = wb._access_token # Get access token
success = stream.connect(did, access_token)
if success:
print("Connected to streaming service")
else:
print("Failed to connect")
# Start streaming (blocking)
stream.run_blocking_loop()Subscribe to and unsubscribe from real-time data feeds.
def subscribe(self, tId=None, level=105):
"""
Subscribe to real-time price updates for a security.
Parameters:
- tId (int): Ticker ID to subscribe to
- level (int): Subscription level (105 is most comprehensive)
- 101: Basic price data
- 102: Enhanced price data with volume
- 103: Trade tick data
- 104: Level 2 order book data
- 105: Comprehensive data (recommended)
- 106-108: Specialized data combinations
Returns:
bool: True if subscription successful
"""
def unsubscribe(self, tId=None, level=105):
"""
Unsubscribe from real-time updates for a security.
Parameters:
- tId (int): Ticker ID to unsubscribe from
- level (int): Subscription level to remove
Returns:
bool: True if unsubscription successful
"""Usage examples:
# Get ticker ID for subscription
ticker_id = wb.get_ticker('AAPL')
# Subscribe to real-time data
stream.subscribe(tId=ticker_id, level=105)
# Subscribe to multiple stocks
symbols = ['AAPL', 'TSLA', 'MSFT']
for symbol in symbols:
tid = wb.get_ticker(symbol)
stream.subscribe(tId=tid, level=105)
# Unsubscribe when done
stream.unsubscribe(tId=ticker_id, level=105)Handle incoming real-time price and order messages.
def on_price_message(self, topic, data):
"""
Callback function for price update messages.
This method should be overridden to handle price updates.
Parameters:
- topic (str): Message topic containing message type and ticker ID
- data (dict): Price update data containing:
- tickerId: Security ticker ID
- price: Current price
- change: Price change
- changeRatio: Percentage change
- volume: Trading volume
- high: Session high
- low: Session low
- status: Market status (F=pre, T=regular, A=after)
"""
def on_order_message(self, topic, data):
"""
Callback function for order status messages.
This method should be overridden to handle order updates.
Parameters:
- topic (str): Message topic
- data (dict): Order update data containing:
- orderId: Order identifier
- status: Order status
- filledQuantity: Quantity filled
- tickerId: Security ticker ID
- action: Order action (BUY/SELL)
"""Implement custom message handlers by subclassing or setting callback functions:
class CustomStreamConn(StreamConn):
def __init__(self, debug_flg=False):
super().__init__(debug_flg)
self.price_data = {}
self.order_updates = []
def on_price_message(self, topic, data):
"""Custom price message handler."""
ticker_id = data.get('tickerId')
price = data.get('price', 0)
change_pct = data.get('changeRatio', 0)
# Store latest price data
self.price_data[ticker_id] = {
'price': price,
'change_pct': change_pct,
'timestamp': time.time()
}
# Print price updates
print(f"Price Update - Ticker {ticker_id}: ${price} ({change_pct:+.2f}%)")
# Custom logic for price alerts
if abs(change_pct) > 5.0: # Alert on >5% moves
print(f"🚨 Large move alert: {change_pct:+.2f}%")
def on_order_message(self, topic, data):
"""Custom order message handler."""
order_id = data.get('orderId')
status = data.get('status')
filled_qty = data.get('filledQuantity', 0)
# Store order updates
self.order_updates.append({
'orderId': order_id,
'status': status,
'filledQuantity': filled_qty,
'timestamp': time.time()
})
print(f"Order Update - {order_id}: {status} (Filled: {filled_qty})")
# Custom logic for order notifications
if status == 'FILLED':
print(f"✅ Order {order_id} completely filled!")
elif status == 'CANCELLED':
print(f"❌ Order {order_id} cancelled")
# Usage
custom_stream = CustomStreamConn(debug_flg=True)Different message types provide various levels of market data:
Price messages include status field indicating market session:
import time
import threading
from webull import webull, StreamConn
class TradingStreamMonitor(StreamConn):
def __init__(self, wb_client, debug_flg=False):
super().__init__(debug_flg)
self.wb = wb_client
self.watchlist = {}
self.alerts = {}
self.running = False
def add_stock_to_watch(self, symbol, alert_threshold=5.0):
"""Add stock to watchlist with price alert threshold."""
try:
ticker_id = self.wb.get_ticker(symbol)
self.watchlist[ticker_id] = {
'symbol': symbol,
'alert_threshold': alert_threshold,
'last_price': None,
'last_update': None
}
# Subscribe to real-time data
self.subscribe(tId=ticker_id, level=105)
print(f"Added {symbol} (ID: {ticker_id}) to watchlist")
except Exception as e:
print(f"Error adding {symbol} to watchlist: {e}")
def on_price_message(self, topic, data):
"""Handle price updates with custom alerts."""
ticker_id = data.get('tickerId')
if ticker_id not in self.watchlist:
return
stock_info = self.watchlist[ticker_id]
symbol = stock_info['symbol']
# Extract price data
price = data.get('price', 0)
change_pct = data.get('changeRatio', 0)
volume = data.get('volume', 0)
market_status = data.get('status', 'T')
# Update watchlist
stock_info['last_price'] = price
stock_info['last_update'] = time.time()
# Market status indicator
status_emoji = {"F": "🌅", "T": "📈", "A": "🌃"}.get(market_status, "📊")
print(f"{status_emoji} {symbol}: ${price:.2f} ({change_pct:+.2f}%) Vol: {volume:,}")
# Price alerts
threshold = stock_info['alert_threshold']
if abs(change_pct) >= threshold:
direction = "🚀" if change_pct > 0 else "📉"
print(f"🚨 ALERT: {symbol} {direction} {change_pct:+.2f}% - Threshold: {threshold}%")
def on_order_message(self, topic, data):
"""Handle order status updates."""
order_id = data.get('orderId', 'Unknown')
status = data.get('status', 'Unknown')
filled_qty = data.get('filledQuantity', 0)
ticker_id = data.get('tickerId')
# Find symbol for ticker ID
symbol = 'Unknown'
for tid, info in self.watchlist.items():
if tid == ticker_id:
symbol = info['symbol']
break
status_emoji = {
'FILLED': '✅',
'PARTIAL': '🔄',
'CANCELLED': '❌',
'PENDING': '⏳'
}.get(status, '📋')
print(f"{status_emoji} Order {order_id} ({symbol}): {status} - Filled: {filled_qty}")
def start_monitoring(self):
"""Start the streaming monitor."""
self.running = True
print("Starting real-time monitoring...")
# Run streaming loop in separate thread
def stream_loop():
while self.running:
self.run_loop_once()
time.sleep(0.1) # Small delay to prevent excessive CPU usage
stream_thread = threading.Thread(target=stream_loop, daemon=True)
stream_thread.start()
return stream_thread
def stop_monitoring(self):
"""Stop the streaming monitor."""
self.running = False
print("Stopping real-time monitoring...")
# Main usage example
def main():
# Initialize clients
wb = webull()
stream_monitor = TradingStreamMonitor(wb, debug_flg=False)
try:
# Login
wb.login('your_email@example.com', 'your_password')
# Connect to streaming
did = wb._get_did()
access_token = wb._access_token
if stream_monitor.connect(did, access_token):
print("Connected to streaming service successfully")
# Add stocks to monitor
watchlist = [
('AAPL', 3.0), # Alert on 3%+ moves
('TSLA', 5.0), # Alert on 5%+ moves
('MSFT', 2.0), # Alert on 2%+ moves
('NVDA', 4.0), # Alert on 4%+ moves
]
for symbol, threshold in watchlist:
stream_monitor.add_stock_to_watch(symbol, threshold)
# Start monitoring
stream_thread = stream_monitor.start_monitoring()
# Keep monitoring for specified time
print("Monitoring for 60 seconds...")
time.sleep(60)
# Stop monitoring
stream_monitor.stop_monitoring()
stream_thread.join()
else:
print("Failed to connect to streaming service")
except KeyboardInterrupt:
print("\nStopping due to user interrupt...")
stream_monitor.stop_monitoring()
except Exception as e:
print(f"Streaming error: {e}")
if __name__ == "__main__":
main()Monitor all positions in real-time:
def monitor_portfolio_realtime(wb, stream):
"""Monitor all portfolio positions in real-time."""
# Get current positions
positions = wb.get_positions()
print(f"Monitoring {len(positions)} positions in real-time...")
for position in positions:
symbol = position['ticker']['symbol']
ticker_id = position['ticker']['tickerId']
shares = position['position']
print(f"Subscribing to {symbol} ({shares} shares)")
stream.subscribe(tId=ticker_id, level=105)
# Usage
monitor_portfolio_realtime(wb, stream)Track order fills in real-time:
class OrderTracker(StreamConn):
def __init__(self, webull_client):
super().__init__()
self.wb = webull_client
self.pending_orders = {}
def track_order(self, order_id):
"""Start tracking a specific order."""
self.pending_orders[order_id] = {
'start_time': time.time(),
'fills': []
}
def on_order_message(self, topic, data):
"""Track order execution."""
order_id = data.get('orderId')
if order_id in self.pending_orders:
status = data.get('status')
filled_qty = data.get('filledQuantity', 0)
self.pending_orders[order_id]['fills'].append({
'timestamp': time.time(),
'status': status,
'filled_qty': filled_qty
})
if status in ['FILLED', 'CANCELLED']:
# Order completed, stop tracking
order_info = self.pending_orders.pop(order_id)
execution_time = time.time() - order_info['start_time']
print(f"Order {order_id} completed in {execution_time:.1f}s: {status}")
# Usage
order_tracker = OrderTracker(wb)
# After placing order, track it
order_result = wb.place_order(stock='AAPL', price=150.0, action='BUY', quant=10)
order_tracker.track_order(order_result['orderId'])Handle streaming connection issues:
class RobustStreamConn(StreamConn):
def __init__(self, wb_client, max_retries=5):
super().__init__(debug_flg=True)
self.wb = wb_client
self.max_retries = max_retries
self.reconnect_count = 0
self.subscriptions = set() # Track active subscriptions
def connect_with_retry(self):
"""Connect with automatic retry on failure."""
for attempt in range(self.max_retries):
try:
did = self.wb._get_did()
access_token = self.wb._access_token
if self.connect(did, access_token):
print(f"Connected successfully (attempt {attempt + 1})")
self.reconnect_count = 0
# Restore subscriptions after reconnection
self.restore_subscriptions()
return True
except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
print("Max connection retries exceeded")
return False
def subscribe(self, tId=None, level=105):
"""Subscribe and track subscription."""
success = super().subscribe(tId, level)
if success:
self.subscriptions.add((tId, level))
return success
def restore_subscriptions(self):
"""Restore all subscriptions after reconnection."""
print(f"Restoring {len(self.subscriptions)} subscriptions...")
for tId, level in self.subscriptions:
super().subscribe(tId, level)
# Usage
robust_stream = RobustStreamConn(wb)
robust_stream.connect_with_retry()Install with Tessl CLI
npx tessl i tessl/pypi-webull