CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-upstox-python-sdk

The official Python client for communicating with the Upstox API, providing complete trading and investment platform functionality.

Overview
Eval results
Files

websocket-streaming.mddocs/

WebSocket Streaming

Real-time data streaming for live market data feeds and portfolio updates using WebSocket connections with event-driven architecture. Support for both callback-based feeders and event-driven streamers.

Capabilities

Market Data Streaming

Stream live market data including last traded price, full market quotes, and market depth using WebSocket connections.

class MarketDataStreamer:
    def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:
        """
        Initialize market data streamer.
        
        Parameters:
        - api_client: Authenticated API client
        - instrumentKeys: List of instrument tokens to subscribe
        - mode: Subscription mode ('ltpc', 'full', 'quote')
        """
    
    def connect() -> None:
        """Establish WebSocket connection to market data feed"""
    
    def subscribe(instrumentKeys: list, mode: str) -> None:
        """
        Subscribe to market data for instruments.
        
        Parameters:
        - instrumentKeys: List of instrument tokens
        - mode: Data mode ('ltpc', 'full', 'quote')
        """
    
    def unsubscribe(instrumentKeys: list) -> None:
        """
        Unsubscribe from market data.
        
        Parameters:
        - instrumentKeys: List of instrument tokens to unsubscribe
        """
    
    def change_mode(instrumentKeys: list, newMode: str) -> None:
        """
        Change subscription mode for instruments.
        
        Parameters:
        - instrumentKeys: List of instrument tokens
        - newMode: New subscription mode
        """
    
    def clear_subscriptions() -> None:
        """Remove all active subscriptions"""
    
    def on(event: str, listener: callable) -> None:
        """
        Register event listener.
        
        Parameters:
        - event: Event name ('open', 'message', 'error', 'close')
        - listener: Callback function
        """
    
    def disconnect() -> None:
        """Disconnect from WebSocket"""
    
    def auto_reconnect(enable: bool, interval: int = None, retry_count: int = None) -> None:
        """
        Configure automatic reconnection.
        
        Parameters:
        - enable: Enable/disable auto-reconnect
        - interval: Reconnection interval in seconds
        - retry_count: Maximum retry attempts
        """

Usage Example

from upstox_client.feeder import MarketDataStreamer
from upstox_client import Configuration, ApiClient
import json

# Setup
config = Configuration()
config.access_token = 'your_access_token'
api_client = ApiClient(config)

# Initialize streamer
instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"]  # Reliance, Infosys
streamer = MarketDataStreamer(api_client, instruments, mode='full')

# Event handlers
def on_open():
    print("WebSocket connection opened")
    print(f"Subscribed to {len(instruments)} instruments")

def on_message(data):
    """Handle incoming market data"""
    try:
        market_data = json.loads(data)
        if 'feeds' in market_data:
            for instrument_token, feed_data in market_data['feeds'].items():
                if 'ff' in feed_data:  # Full feed
                    ff = feed_data['ff']
                    print(f"{instrument_token}:")
                    print(f"  LTP: ₹{ff.get('ltp', 0):.2f}")
                    print(f"  Volume: {ff.get('v', 0)}")
                    print(f"  Change: {ff.get('nc', 0):.2f}")
                    print(f"  % Change: {ff.get('pc', 0):.2f}%")
                    
                    # Market depth
                    if 'marketFF' in ff:
                        depth = ff['marketFF']
                        print(f"  Best Bid: ₹{depth.get('bp1', 0):.2f} x {depth.get('bq1', 0)}")
                        print(f"  Best Ask: ₹{depth.get('sp1', 0):.2f} x {depth.get('sq1', 0)}")
                    print()
    except Exception as e:
        print(f"Error processing message: {e}")

def on_error(error):
    print(f"WebSocket error: {error}")

def on_close():
    print("WebSocket connection closed")

# Register event handlers
streamer.on('open', on_open)
streamer.on('message', on_message)
streamer.on('error', on_error)
streamer.on('close', on_close)

# Enable auto-reconnect
streamer.auto_reconnect(enable=True, interval=5, retry_count=10)

# Connect to start streaming
streamer.connect()

# Runtime subscription management
import time
time.sleep(10)  # Stream for 10 seconds

# Add more instruments
new_instruments = ["NSE_EQ|INE040A01034"]  # HDFC Bank
streamer.subscribe(new_instruments, mode='ltpc')

# Change mode for existing subscription
streamer.change_mode(["NSE_EQ|INE002A01018"], newMode='ltpc')

# Unsubscribe from specific instruments
streamer.unsubscribe(["NSE_EQ|INE009A01021"])

# Clear all subscriptions
# streamer.clear_subscriptions()

# Disconnect when done
# streamer.disconnect()

Enhanced Market Data Streaming (V3)

Improved market data streamer with enhanced performance and features.

class MarketDataStreamerV3:
    def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:
        """
        Initialize V3 market data streamer with enhanced modes.
        
        Parameters:
        - api_client: Authenticated API client
        - instrumentKeys: List of instrument tokens to subscribe
        - mode: Subscription mode ('ltpc', 'full', 'option_greeks', 'full_d30')
        """
    
    def connect() -> None:
        """Establish V3 WebSocket connection"""
    
    def subscribe(instrumentKeys: list, mode: str) -> None:
        """Subscribe to V3 market data feed"""
    
    def unsubscribe(instrumentKeys: list) -> None:
        """Unsubscribe from V3 market data feed"""
    
    def change_mode(instrumentKeys: list, newMode: str) -> None:
        """Change V3 subscription mode"""
    
    def clear_subscriptions() -> None:
        """Clear all V3 subscriptions"""

Portfolio Data Streaming

Stream live portfolio updates including order status changes, position updates, and holdings modifications.

class PortfolioDataStreamer:
    def __init__(api_client: ApiClient = None, order_update: bool = None, position_update: bool = None, holding_update: bool = None, gtt_update: bool = None) -> None:
        """
        Initialize portfolio data streamer.
        
        Parameters:
        - api_client: Authenticated API client
        - order_update: Enable order status updates
        - position_update: Enable position updates
        - holding_update: Enable holdings updates
        - gtt_update: Enable GTT order updates
        """
    
    def connect() -> None:
        """Connect to portfolio data stream"""
    
    def on(event: str, listener: callable) -> None:
        """
        Register event listener for portfolio updates.
        
        Parameters:
        - event: Event name ('open', 'message', 'error', 'close')
        - listener: Callback function
        """

Usage Example

from upstox_client.feeder import PortfolioDataStreamer
import json

# Initialize portfolio streamer
portfolio_streamer = PortfolioDataStreamer(
    api_client=api_client,
    order_update=True,
    position_update=True,
    holding_update=True,
    gtt_update=True
)

def on_portfolio_open():
    print("Portfolio WebSocket connection opened")

def on_portfolio_message(data):
    """Handle portfolio updates"""
    try:
        update = json.loads(data)
        update_type = update.get('type')
        
        if update_type == 'order':
            order_data = update.get('data', {})
            print(f"Order Update:")
            print(f"  Order ID: {order_data.get('order_id')}")
            print(f"  Status: {order_data.get('status')}")
            print(f"  Symbol: {order_data.get('tradingsymbol')}")
            print(f"  Quantity: {order_data.get('quantity')}")
            print(f"  Price: ₹{order_data.get('price', 0):.2f}")
            
        elif update_type == 'position':
            position_data = update.get('data', {})
            print(f"Position Update:")
            print(f"  Symbol: {position_data.get('tradingsymbol')}")
            print(f"  Net Quantity: {position_data.get('quantity')}")
            print(f"  P&L: ₹{position_data.get('pnl', 0):.2f}")
            
        elif update_type == 'holding':
            holding_data = update.get('data', {})
            print(f"Holding Update:")
            print(f"  Symbol: {holding_data.get('tradingsymbol')}")
            print(f"  Quantity: {holding_data.get('quantity')}")
            print(f"  Current Value: ₹{holding_data.get('current_value', 0):.2f}")
            
        elif update_type == 'gtt':
            gtt_data = update.get('data', {})
            print(f"GTT Update:")
            print(f"  GTT ID: {gtt_data.get('gtt_order_id')}")
            print(f"  Status: {gtt_data.get('status')}")
            
    except Exception as e:
        print(f"Error processing portfolio update: {e}")

def on_portfolio_error(error):
    print(f"Portfolio WebSocket error: {error}")

def on_portfolio_close():
    print("Portfolio WebSocket connection closed")

# Register event handlers
portfolio_streamer.on('open', on_portfolio_open)
portfolio_streamer.on('message', on_portfolio_message)
portfolio_streamer.on('error', on_portfolio_error)
portfolio_streamer.on('close', on_portfolio_close)

# Connect to start receiving updates
portfolio_streamer.connect()

Callback-based Feeders

Alternative feeder classes that use callback functions instead of event listeners.

class MarketDataFeeder:
    def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:
        """
        Initialize market data feeder with callbacks.
        
        Parameters:
        - api_client: Authenticated API client
        - instrumentKeys: List of instrument tokens
        - mode: Subscription mode
        - on_open: Connection opened callback
        - on_message: Message received callback
        - on_error: Error occurred callback
        - on_close: Connection closed callback
        """
    
    def connect() -> None:
        """Start market data feed with callbacks"""
    
    def subscribe(instrumentKeys: list, mode: str = None) -> None:
        """Subscribe to instruments with callback handling"""
    
    def unsubscribe(instrumentKeys: list) -> None:
        """Unsubscribe from instruments"""
    
    def change_mode(instrumentKeys: list, newMode: str) -> None:
        """Change subscription mode"""

class MarketDataFeederV3:
    def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:
        """Initialize V3 market data feeder with callbacks"""

class PortfolioDataFeeder:
    def connect() -> None:
        """Connect to portfolio data feed with callbacks"""

Usage Example

from upstox_client.feeder import MarketDataFeeder

def on_feed_open():
    print("Market data feed connected")

def on_feed_message(data):
    print(f"Received market data: {data}")

def on_feed_error(error):
    print(f"Feed error: {error}")

def on_feed_close():
    print("Market data feed disconnected")

# Initialize feeder with callbacks
feeder = MarketDataFeeder(
    api_client=api_client,
    instrumentKeys=["NSE_EQ|INE002A01018"],
    mode='full',
    on_open=on_feed_open,
    on_message=on_feed_message,
    on_error=on_feed_error,
    on_close=on_feed_close
)

# Start feeding
feeder.connect()

WebSocket Authorization

Get WebSocket connection URLs and authorization for different data feeds.

def get_market_data_feed(api_version: str) -> WebsocketAuthRedirectResponse:
    """
    Get market data WebSocket feed URL.
    
    Parameters:
    - api_version: API version ('2.0')
    
    Returns:
    WebsocketAuthRedirectResponse with WebSocket URL
    """

def get_market_data_feed_authorize(api_version: str) -> WebsocketAuthRedirectResponse:
    """
    Get authorized market data WebSocket feed URL.
    
    Parameters:
    - api_version: API version ('2.0')
    
    Returns:
    WebsocketAuthRedirectResponse with authorized WebSocket URL
    """

def get_market_data_feed_v3() -> WebsocketAuthRedirectResponse:
    """
    Get V3 market data WebSocket feed URL.
    
    Returns:
    WebsocketAuthRedirectResponse with V3 WebSocket URL
    """

def get_market_data_feed_authorize_v3() -> WebsocketAuthRedirectResponse:
    """
    Get authorized V3 market data WebSocket feed URL.
    
    Returns:
    WebsocketAuthRedirectResponse with authorized V3 WebSocket URL
    """

def get_portfolio_stream_feed_authorize(api_version: str, order_update: bool = None, position_update: bool = None, holding_update: bool = None) -> WebsocketAuthRedirectResponse:
    """
    Get authorized portfolio stream WebSocket URL.
    
    Parameters:
    - api_version: API version ('2.0')
    - order_update: Enable order updates
    - position_update: Enable position updates
    - holding_update: Enable holding updates
    
    Returns:
    WebsocketAuthRedirectResponse with portfolio WebSocket URL
    """

Subscription Modes

Market Data Modes

Standard Modes

  • "ltpc" - Last Traded Price & Change: Minimal data with price and change information
  • "quote" - Quote: Price, volume, and basic market data
  • "full" - Full: Complete market data including depth, OHLC, and all available fields

V3 Enhanced Modes

  • "ltpc" - Last Traded Price & Change (V3)
  • "full" - Full market data with improved performance (V3)
  • "option_greeks" - Option Greeks data (Delta, Gamma, Theta, Vega)
  • "full_d30" - Full market data with 30-day historical context

Data Format Examples

LTPC Mode

{
  "feeds": {
    "NSE_EQ|INE002A01018": {
      "ltpc": {
        "ltp": 1520.50,
        "ltt": "2024-09-06T15:29:45.000Z",
        "ltq": 100,
        "cp": 1515.75
      }
    }
  }
}

Full Mode

{
  "feeds": {
    "NSE_EQ|INE002A01018": {
      "ff": {
        "ltp": 1520.50,
        "ltt": "2024-09-06T15:29:45.000Z",
        "ltq": 100,
        "cp": 1515.75,
        "v": 1250000,
        "o": 1518.00,
        "h": 1525.00,
        "l": 1510.00,
        "c": 1515.75,
        "ap": 1520.25,
        "oi": 0,
        "marketFF": {
          "bp1": 1520.00, "bq1": 500,
          "bp2": 1519.50, "bq2": 300,
          "sp1": 1520.50, "sq1": 400,
          "sp2": 1521.00, "sq2": 600
        }
      }
    }
  }
}

Error Handling & Reconnection

# Configure automatic reconnection
streamer.auto_reconnect(
    enable=True,
    interval=5,  # Reconnect after 5 seconds
    retry_count=10  # Maximum 10 retry attempts
)

# Handle connection errors
def on_error(error):
    print(f"WebSocket error: {error}")
    # Implement custom error handling logic
    if "authentication" in str(error).lower():
        # Handle auth errors - refresh token
        pass
    elif "network" in str(error).lower():
        # Handle network issues
        pass

# Handle disconnections
def on_close():
    print("Connection closed - auto-reconnect will attempt to reconnect")
    # Log disconnection, update UI state, etc.

Best Practices

Performance Optimization

# 1. Subscribe to only required instruments
essential_instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"]
streamer = MarketDataStreamer(api_client, essential_instruments, mode='ltpc')

# 2. Use appropriate subscription mode
streamer.subscribe(essential_instruments, mode='ltpc')  # For price tracking
streamer.subscribe(depth_instruments, mode='full')     # For depth analysis

# 3. Implement efficient message handling
def on_message(data):
    # Process data efficiently
    market_data = json.loads(data)
    # Update only changed values in UI
    update_ui_efficiently(market_data)

Connection Management

# 1. Enable auto-reconnect for production
streamer.auto_reconnect(enable=True, interval=3, retry_count=5)

# 2. Graceful disconnection
def cleanup():
    streamer.clear_subscriptions()
    streamer.disconnect()
    portfolio_streamer.disconnect()

# 3. Connection monitoring
def on_open():
    # Reset connection error counters
    connection_errors = 0
    
def on_error(error):
    connection_errors += 1
    if connection_errors > 5:
        # Implement fallback mechanism
        switch_to_rest_api_polling()

Subscription Management

# Dynamic subscription based on user activity
active_instruments = get_user_watchlist()
streamer.clear_subscriptions()
streamer.subscribe(active_instruments, mode='full')

# Efficient mode switching
def switch_to_trading_mode():
    # Switch to full mode for active trading
    streamer.change_mode(trading_instruments, newMode='full')

def switch_to_monitoring_mode():
    # Switch to LTPC for passive monitoring
    streamer.change_mode(all_instruments, newMode='ltpc')

WebSocket Response Types

class WebsocketAuthRedirectResponse:
    status: str
    data: WebsocketAuthRedirectResponseData

class WebsocketAuthRedirectResponseData:
    authorized_redirect_uri: str  # WebSocket connection URL
    
# Market Data Feed Structure (parsed from JSON)
class MarketDataFeed:
    feeds: dict[str, FeedData]

class FeedData:
    ltpc: LTPCData  # For LTPC mode
    ff: FullFeedData  # For full mode

class LTPCData:
    ltp: float  # Last traded price
    ltt: str   # Last trade time
    ltq: int   # Last trade quantity
    cp: float  # Close price

class FullFeedData:
    ltp: float  # Last traded price
    v: int     # Volume
    o: float   # Open
    h: float   # High
    l: float   # Low
    c: float   # Close
    ap: float  # Average price
    oi: int    # Open interest
    marketFF: MarketDepth  # Market depth

class MarketDepth:
    bp1: float  # Best bid price 1
    bq1: int    # Best bid quantity 1
    sp1: float  # Best ask price 1
    sq1: int    # Best ask quantity 1
    # ... up to 5 levels (bp1-bp5, bq1-bq5, sp1-sp5, sq1-sq5)

Install with Tessl CLI

npx tessl i tessl/pypi-upstox-python-sdk

docs

authentication.md

charges-analytics.md

index.md

market-data.md

order-management.md

portfolio-management.md

websocket-streaming.md

tile.json