CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-webull

The unofficial python interface for the WeBull API

Overview
Eval results
Files

streaming.mddocs/

Real-time Streaming

MQTT-based real-time data streaming for live price updates and order status changes with subscription management and callback handling.

Prerequisites

Real-time streaming requires:

  1. Successful login with valid session
  2. Device ID (did) from webull session
  3. Access token for streaming authentication

Capabilities

StreamConn Class

Main streaming connection class for real-time data.

class StreamConn:
    def __init__(self, debug_flg=False):
        """
        Initialize streaming connection.
        
        Parameters:
        - debug_flg (bool): Enable debug logging for streaming events
        """

Connection Management

Establish and manage MQTT connections for real-time data streaming.

def connect(self, did, access_token=None):
    """
    Connect to Webull streaming service.
    
    Parameters:
    - did (str): Device ID from webull session
    - access_token (str, optional): Access token for authentication
    
    Returns:
    bool: True if connection successful, False otherwise
    """

def run_blocking_loop(self):
    """
    Run the streaming event loop in blocking mode.
    
    This method blocks execution and continuously processes streaming messages.
    Use this for dedicated streaming applications.
    """

def run_loop_once(self):
    """
    Process streaming events once without blocking.
    
    Returns after processing available messages. Use this method when
    integrating streaming with other application logic.
    """

Usage examples:

from webull import webull, StreamConn

# Initialize clients
wb = webull()
stream = StreamConn(debug_flg=True)

# Login to get session data
wb.login('your_email@example.com', 'your_password')

# Connect to streaming service
did = wb._get_did()  # Get device ID from session
access_token = wb._access_token  # Get access token

success = stream.connect(did, access_token)
if success:
    print("Connected to streaming service")
else:
    print("Failed to connect")

# Start streaming (blocking)
stream.run_blocking_loop()

Subscription Management

Subscribe to and unsubscribe from real-time data feeds.

def subscribe(self, tId=None, level=105):
    """
    Subscribe to real-time price updates for a security.
    
    Parameters:
    - tId (int): Ticker ID to subscribe to
    - level (int): Subscription level (105 is most comprehensive)
        - 101: Basic price data
        - 102: Enhanced price data with volume
        - 103: Trade tick data
        - 104: Level 2 order book data  
        - 105: Comprehensive data (recommended)
        - 106-108: Specialized data combinations
    
    Returns:
    bool: True if subscription successful
    """

def unsubscribe(self, tId=None, level=105):
    """
    Unsubscribe from real-time updates for a security.
    
    Parameters:
    - tId (int): Ticker ID to unsubscribe from
    - level (int): Subscription level to remove
    
    Returns:
    bool: True if unsubscription successful
    """

Usage examples:

# Get ticker ID for subscription
ticker_id = wb.get_ticker('AAPL')

# Subscribe to real-time data
stream.subscribe(tId=ticker_id, level=105)

# Subscribe to multiple stocks
symbols = ['AAPL', 'TSLA', 'MSFT']
for symbol in symbols:
    tid = wb.get_ticker(symbol)
    stream.subscribe(tId=tid, level=105)

# Unsubscribe when done
stream.unsubscribe(tId=ticker_id, level=105)

Message Callbacks

Handle incoming real-time price and order messages.

def on_price_message(self, topic, data):
    """
    Callback function for price update messages.
    
    This method should be overridden to handle price updates.
    
    Parameters:
    - topic (str): Message topic containing message type and ticker ID
    - data (dict): Price update data containing:
        - tickerId: Security ticker ID
        - price: Current price
        - change: Price change
        - changeRatio: Percentage change
        - volume: Trading volume
        - high: Session high
        - low: Session low
        - status: Market status (F=pre, T=regular, A=after)
    """

def on_order_message(self, topic, data):
    """
    Callback function for order status messages.
    
    This method should be overridden to handle order updates.
    
    Parameters:
    - topic (str): Message topic
    - data (dict): Order update data containing:
        - orderId: Order identifier
        - status: Order status
        - filledQuantity: Quantity filled
        - tickerId: Security ticker ID
        - action: Order action (BUY/SELL)
    """

Custom Message Handlers

Implement custom message handlers by subclassing or setting callback functions:

class CustomStreamConn(StreamConn):
    def __init__(self, debug_flg=False):
        super().__init__(debug_flg)
        self.price_data = {}
        self.order_updates = []
    
    def on_price_message(self, topic, data):
        """Custom price message handler."""
        ticker_id = data.get('tickerId')
        price = data.get('price', 0)
        change_pct = data.get('changeRatio', 0)
        
        # Store latest price data
        self.price_data[ticker_id] = {
            'price': price,
            'change_pct': change_pct,
            'timestamp': time.time()
        }
        
        # Print price updates
        print(f"Price Update - Ticker {ticker_id}: ${price} ({change_pct:+.2f}%)")
        
        # Custom logic for price alerts
        if abs(change_pct) > 5.0:  # Alert on >5% moves
            print(f"🚨 Large move alert: {change_pct:+.2f}%")
    
    def on_order_message(self, topic, data):
        """Custom order message handler."""
        order_id = data.get('orderId')
        status = data.get('status')
        filled_qty = data.get('filledQuantity', 0)
        
        # Store order updates
        self.order_updates.append({
            'orderId': order_id,
            'status': status,
            'filledQuantity': filled_qty,
            'timestamp': time.time()
        })
        
        print(f"Order Update - {order_id}: {status} (Filled: {filled_qty})")
        
        # Custom logic for order notifications
        if status == 'FILLED':
            print(f"✅ Order {order_id} completely filled!")
        elif status == 'CANCELLED':
            print(f"❌ Order {order_id} cancelled")

# Usage
custom_stream = CustomStreamConn(debug_flg=True)

Streaming Data Types

Price Message Topics

Different message types provide various levels of market data:

  • Topic 101: Basic close price, change, market value, change ratio
  • Topic 102: Enhanced data with high, low, open, close, volume during market hours; pre/after market price data during extended hours
  • Topic 103: Individual trade tick data with price, volume, and trade time
  • Topic 104: Level 2 order book data with bid/ask lists and depths
  • Topic 105: Combination of 102 and 103 (most commonly used)
  • Topic 106: Combination of 102 (recommended for most applications)
  • Topic 107: Combination of 103 and 104
  • Topic 108: Combination of 103, 104, and additional depth data

Market Status Indicators

Price messages include status field indicating market session:

  • F: Pre-market hours
  • T: Regular trading hours
  • A: After-market hours

Complete Streaming Example

import time
import threading
from webull import webull, StreamConn

class TradingStreamMonitor(StreamConn):
    def __init__(self, wb_client, debug_flg=False):
        super().__init__(debug_flg)
        self.wb = wb_client
        self.watchlist = {}
        self.alerts = {}
        self.running = False
    
    def add_stock_to_watch(self, symbol, alert_threshold=5.0):
        """Add stock to watchlist with price alert threshold."""
        try:
            ticker_id = self.wb.get_ticker(symbol)
            self.watchlist[ticker_id] = {
                'symbol': symbol,
                'alert_threshold': alert_threshold,
                'last_price': None,
                'last_update': None
            }
            
            # Subscribe to real-time data
            self.subscribe(tId=ticker_id, level=105)
            print(f"Added {symbol} (ID: {ticker_id}) to watchlist")
            
        except Exception as e:
            print(f"Error adding {symbol} to watchlist: {e}")
    
    def on_price_message(self, topic, data):
        """Handle price updates with custom alerts."""
        ticker_id = data.get('tickerId')
        if ticker_id not in self.watchlist:
            return
        
        stock_info = self.watchlist[ticker_id]
        symbol = stock_info['symbol']
        
        # Extract price data
        price = data.get('price', 0)
        change_pct = data.get('changeRatio', 0)
        volume = data.get('volume', 0)
        market_status = data.get('status', 'T')
        
        # Update watchlist
        stock_info['last_price'] = price
        stock_info['last_update'] = time.time()
        
        # Market status indicator
        status_emoji = {"F": "🌅", "T": "📈", "A": "🌃"}.get(market_status, "📊")
        
        print(f"{status_emoji} {symbol}: ${price:.2f} ({change_pct:+.2f}%) Vol: {volume:,}")
        
        # Price alerts
        threshold = stock_info['alert_threshold']
        if abs(change_pct) >= threshold:
            direction = "🚀" if change_pct > 0 else "📉"
            print(f"🚨 ALERT: {symbol} {direction} {change_pct:+.2f}% - Threshold: {threshold}%")
    
    def on_order_message(self, topic, data):
        """Handle order status updates."""
        order_id = data.get('orderId', 'Unknown')
        status = data.get('status', 'Unknown')
        filled_qty = data.get('filledQuantity', 0)
        ticker_id = data.get('tickerId')
        
        # Find symbol for ticker ID
        symbol = 'Unknown'
        for tid, info in self.watchlist.items():
            if tid == ticker_id:
                symbol = info['symbol']
                break
        
        status_emoji = {
            'FILLED': '✅',
            'PARTIAL': '🔄', 
            'CANCELLED': '❌',
            'PENDING': '⏳'
        }.get(status, '📋')
        
        print(f"{status_emoji} Order {order_id} ({symbol}): {status} - Filled: {filled_qty}")
    
    def start_monitoring(self):
        """Start the streaming monitor."""
        self.running = True
        print("Starting real-time monitoring...")
        
        # Run streaming loop in separate thread
        def stream_loop():
            while self.running:
                self.run_loop_once()
                time.sleep(0.1)  # Small delay to prevent excessive CPU usage
        
        stream_thread = threading.Thread(target=stream_loop, daemon=True)
        stream_thread.start()
        return stream_thread
    
    def stop_monitoring(self):
        """Stop the streaming monitor."""
        self.running = False
        print("Stopping real-time monitoring...")

# Main usage example
def main():
    # Initialize clients
    wb = webull()
    stream_monitor = TradingStreamMonitor(wb, debug_flg=False)
    
    try:
        # Login
        wb.login('your_email@example.com', 'your_password')
        
        # Connect to streaming
        did = wb._get_did()
        access_token = wb._access_token
        
        if stream_monitor.connect(did, access_token):
            print("Connected to streaming service successfully")
            
            # Add stocks to monitor
            watchlist = [
                ('AAPL', 3.0),   # Alert on 3%+ moves
                ('TSLA', 5.0),   # Alert on 5%+ moves  
                ('MSFT', 2.0),   # Alert on 2%+ moves
                ('NVDA', 4.0),   # Alert on 4%+ moves
            ]
            
            for symbol, threshold in watchlist:
                stream_monitor.add_stock_to_watch(symbol, threshold)
            
            # Start monitoring
            stream_thread = stream_monitor.start_monitoring()
            
            # Keep monitoring for specified time
            print("Monitoring for 60 seconds...")
            time.sleep(60)
            
            # Stop monitoring
            stream_monitor.stop_monitoring()
            stream_thread.join()
            
        else:
            print("Failed to connect to streaming service")
    
    except KeyboardInterrupt:
        print("\nStopping due to user interrupt...")
        stream_monitor.stop_monitoring()
    
    except Exception as e:
        print(f"Streaming error: {e}")

if __name__ == "__main__":
    main()

Advanced Streaming Patterns

Portfolio Monitoring

Monitor all positions in real-time:

def monitor_portfolio_realtime(wb, stream):
    """Monitor all portfolio positions in real-time."""
    
    # Get current positions
    positions = wb.get_positions()
    
    print(f"Monitoring {len(positions)} positions in real-time...")
    
    for position in positions:
        symbol = position['ticker']['symbol']
        ticker_id = position['ticker']['tickerId']
        shares = position['position']
        
        print(f"Subscribing to {symbol} ({shares} shares)")
        stream.subscribe(tId=ticker_id, level=105)

# Usage
monitor_portfolio_realtime(wb, stream)

Order Execution Monitoring

Track order fills in real-time:

class OrderTracker(StreamConn):
    def __init__(self, webull_client):
        super().__init__()
        self.wb = webull_client
        self.pending_orders = {}
    
    def track_order(self, order_id):
        """Start tracking a specific order."""
        self.pending_orders[order_id] = {
            'start_time': time.time(),
            'fills': []
        }
    
    def on_order_message(self, topic, data):
        """Track order execution."""
        order_id = data.get('orderId')
        
        if order_id in self.pending_orders:
            status = data.get('status')
            filled_qty = data.get('filledQuantity', 0)
            
            self.pending_orders[order_id]['fills'].append({
                'timestamp': time.time(),
                'status': status,
                'filled_qty': filled_qty
            })
            
            if status in ['FILLED', 'CANCELLED']:
                # Order completed, stop tracking
                order_info = self.pending_orders.pop(order_id)
                execution_time = time.time() - order_info['start_time']
                print(f"Order {order_id} completed in {execution_time:.1f}s: {status}")

# Usage
order_tracker = OrderTracker(wb)
# After placing order, track it
order_result = wb.place_order(stock='AAPL', price=150.0, action='BUY', quant=10)
order_tracker.track_order(order_result['orderId'])

Error Handling & Reconnection

Handle streaming connection issues:

class RobustStreamConn(StreamConn):
    def __init__(self, wb_client, max_retries=5):
        super().__init__(debug_flg=True)
        self.wb = wb_client
        self.max_retries = max_retries
        self.reconnect_count = 0
        self.subscriptions = set()  # Track active subscriptions
    
    def connect_with_retry(self):
        """Connect with automatic retry on failure."""
        for attempt in range(self.max_retries):
            try:
                did = self.wb._get_did()
                access_token = self.wb._access_token
                
                if self.connect(did, access_token):
                    print(f"Connected successfully (attempt {attempt + 1})")
                    self.reconnect_count = 0
                    
                    # Restore subscriptions after reconnection
                    self.restore_subscriptions()
                    return True
                    
            except Exception as e:
                print(f"Connection attempt {attempt + 1} failed: {e}")
                time.sleep(2 ** attempt)  # Exponential backoff
        
        print("Max connection retries exceeded")
        return False
    
    def subscribe(self, tId=None, level=105):
        """Subscribe and track subscription."""
        success = super().subscribe(tId, level)
        if success:
            self.subscriptions.add((tId, level))
        return success
    
    def restore_subscriptions(self):
        """Restore all subscriptions after reconnection."""
        print(f"Restoring {len(self.subscriptions)} subscriptions...")
        for tId, level in self.subscriptions:
            super().subscribe(tId, level)

# Usage
robust_stream = RobustStreamConn(wb)
robust_stream.connect_with_retry()

Install with Tessl CLI

npx tessl i tessl/pypi-webull

docs

alerts-screening.md

authentication.md

index.md

market-data.md

options.md

paper-trading.md

portfolio.md

streaming.md

trading.md

tile.json