The official Python client for communicating with the Upstox API, providing complete trading and investment platform functionality.
Real-time data streaming for live market data feeds and portfolio updates using WebSocket connections with event-driven architecture. Support for both callback-based feeders and event-driven streamers.
Stream live market data including last traded price, full market quotes, and market depth using WebSocket connections.
class MarketDataStreamer:
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:
"""
Initialize market data streamer.
Parameters:
- api_client: Authenticated API client
- instrumentKeys: List of instrument tokens to subscribe
- mode: Subscription mode ('ltpc', 'full', 'quote')
"""
def connect() -> None:
"""Establish WebSocket connection to market data feed"""
def subscribe(instrumentKeys: list, mode: str) -> None:
"""
Subscribe to market data for instruments.
Parameters:
- instrumentKeys: List of instrument tokens
- mode: Data mode ('ltpc', 'full', 'quote')
"""
def unsubscribe(instrumentKeys: list) -> None:
"""
Unsubscribe from market data.
Parameters:
- instrumentKeys: List of instrument tokens to unsubscribe
"""
def change_mode(instrumentKeys: list, newMode: str) -> None:
"""
Change subscription mode for instruments.
Parameters:
- instrumentKeys: List of instrument tokens
- newMode: New subscription mode
"""
def clear_subscriptions() -> None:
"""Remove all active subscriptions"""
def on(event: str, listener: callable) -> None:
"""
Register event listener.
Parameters:
- event: Event name ('open', 'message', 'error', 'close')
- listener: Callback function
"""
def disconnect() -> None:
"""Disconnect from WebSocket"""
def auto_reconnect(enable: bool, interval: int = None, retry_count: int = None) -> None:
"""
Configure automatic reconnection.
Parameters:
- enable: Enable/disable auto-reconnect
- interval: Reconnection interval in seconds
- retry_count: Maximum retry attempts
"""from upstox_client.feeder import MarketDataStreamer
from upstox_client import Configuration, ApiClient
import json
# Setup
config = Configuration()
config.access_token = 'your_access_token'
api_client = ApiClient(config)
# Initialize streamer
instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"] # Reliance, Infosys
streamer = MarketDataStreamer(api_client, instruments, mode='full')
# Event handlers
def on_open():
print("WebSocket connection opened")
print(f"Subscribed to {len(instruments)} instruments")
def on_message(data):
"""Handle incoming market data"""
try:
market_data = json.loads(data)
if 'feeds' in market_data:
for instrument_token, feed_data in market_data['feeds'].items():
if 'ff' in feed_data: # Full feed
ff = feed_data['ff']
print(f"{instrument_token}:")
print(f" LTP: ₹{ff.get('ltp', 0):.2f}")
print(f" Volume: {ff.get('v', 0)}")
print(f" Change: {ff.get('nc', 0):.2f}")
print(f" % Change: {ff.get('pc', 0):.2f}%")
# Market depth
if 'marketFF' in ff:
depth = ff['marketFF']
print(f" Best Bid: ₹{depth.get('bp1', 0):.2f} x {depth.get('bq1', 0)}")
print(f" Best Ask: ₹{depth.get('sp1', 0):.2f} x {depth.get('sq1', 0)}")
print()
except Exception as e:
print(f"Error processing message: {e}")
def on_error(error):
print(f"WebSocket error: {error}")
def on_close():
print("WebSocket connection closed")
# Register event handlers
streamer.on('open', on_open)
streamer.on('message', on_message)
streamer.on('error', on_error)
streamer.on('close', on_close)
# Enable auto-reconnect
streamer.auto_reconnect(enable=True, interval=5, retry_count=10)
# Connect to start streaming
streamer.connect()
# Runtime subscription management
import time
time.sleep(10) # Stream for 10 seconds
# Add more instruments
new_instruments = ["NSE_EQ|INE040A01034"] # HDFC Bank
streamer.subscribe(new_instruments, mode='ltpc')
# Change mode for existing subscription
streamer.change_mode(["NSE_EQ|INE002A01018"], newMode='ltpc')
# Unsubscribe from specific instruments
streamer.unsubscribe(["NSE_EQ|INE009A01021"])
# Clear all subscriptions
# streamer.clear_subscriptions()
# Disconnect when done
# streamer.disconnect()Improved market data streamer with enhanced performance and features.
class MarketDataStreamerV3:
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:
"""
Initialize V3 market data streamer with enhanced modes.
Parameters:
- api_client: Authenticated API client
- instrumentKeys: List of instrument tokens to subscribe
- mode: Subscription mode ('ltpc', 'full', 'option_greeks', 'full_d30')
"""
def connect() -> None:
"""Establish V3 WebSocket connection"""
def subscribe(instrumentKeys: list, mode: str) -> None:
"""Subscribe to V3 market data feed"""
def unsubscribe(instrumentKeys: list) -> None:
"""Unsubscribe from V3 market data feed"""
def change_mode(instrumentKeys: list, newMode: str) -> None:
"""Change V3 subscription mode"""
def clear_subscriptions() -> None:
"""Clear all V3 subscriptions"""Stream live portfolio updates including order status changes, position updates, and holdings modifications.
class PortfolioDataStreamer:
def __init__(api_client: ApiClient = None, order_update: bool = None, position_update: bool = None, holding_update: bool = None, gtt_update: bool = None) -> None:
"""
Initialize portfolio data streamer.
Parameters:
- api_client: Authenticated API client
- order_update: Enable order status updates
- position_update: Enable position updates
- holding_update: Enable holdings updates
- gtt_update: Enable GTT order updates
"""
def connect() -> None:
"""Connect to portfolio data stream"""
def on(event: str, listener: callable) -> None:
"""
Register event listener for portfolio updates.
Parameters:
- event: Event name ('open', 'message', 'error', 'close')
- listener: Callback function
"""from upstox_client.feeder import PortfolioDataStreamer
import json
# Initialize portfolio streamer
portfolio_streamer = PortfolioDataStreamer(
api_client=api_client,
order_update=True,
position_update=True,
holding_update=True,
gtt_update=True
)
def on_portfolio_open():
print("Portfolio WebSocket connection opened")
def on_portfolio_message(data):
"""Handle portfolio updates"""
try:
update = json.loads(data)
update_type = update.get('type')
if update_type == 'order':
order_data = update.get('data', {})
print(f"Order Update:")
print(f" Order ID: {order_data.get('order_id')}")
print(f" Status: {order_data.get('status')}")
print(f" Symbol: {order_data.get('tradingsymbol')}")
print(f" Quantity: {order_data.get('quantity')}")
print(f" Price: ₹{order_data.get('price', 0):.2f}")
elif update_type == 'position':
position_data = update.get('data', {})
print(f"Position Update:")
print(f" Symbol: {position_data.get('tradingsymbol')}")
print(f" Net Quantity: {position_data.get('quantity')}")
print(f" P&L: ₹{position_data.get('pnl', 0):.2f}")
elif update_type == 'holding':
holding_data = update.get('data', {})
print(f"Holding Update:")
print(f" Symbol: {holding_data.get('tradingsymbol')}")
print(f" Quantity: {holding_data.get('quantity')}")
print(f" Current Value: ₹{holding_data.get('current_value', 0):.2f}")
elif update_type == 'gtt':
gtt_data = update.get('data', {})
print(f"GTT Update:")
print(f" GTT ID: {gtt_data.get('gtt_order_id')}")
print(f" Status: {gtt_data.get('status')}")
except Exception as e:
print(f"Error processing portfolio update: {e}")
def on_portfolio_error(error):
print(f"Portfolio WebSocket error: {error}")
def on_portfolio_close():
print("Portfolio WebSocket connection closed")
# Register event handlers
portfolio_streamer.on('open', on_portfolio_open)
portfolio_streamer.on('message', on_portfolio_message)
portfolio_streamer.on('error', on_portfolio_error)
portfolio_streamer.on('close', on_portfolio_close)
# Connect to start receiving updates
portfolio_streamer.connect()Alternative feeder classes that use callback functions instead of event listeners.
class MarketDataFeeder:
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:
"""
Initialize market data feeder with callbacks.
Parameters:
- api_client: Authenticated API client
- instrumentKeys: List of instrument tokens
- mode: Subscription mode
- on_open: Connection opened callback
- on_message: Message received callback
- on_error: Error occurred callback
- on_close: Connection closed callback
"""
def connect() -> None:
"""Start market data feed with callbacks"""
def subscribe(instrumentKeys: list, mode: str = None) -> None:
"""Subscribe to instruments with callback handling"""
def unsubscribe(instrumentKeys: list) -> None:
"""Unsubscribe from instruments"""
def change_mode(instrumentKeys: list, newMode: str) -> None:
"""Change subscription mode"""
class MarketDataFeederV3:
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:
"""Initialize V3 market data feeder with callbacks"""
class PortfolioDataFeeder:
def connect() -> None:
"""Connect to portfolio data feed with callbacks"""from upstox_client.feeder import MarketDataFeeder
def on_feed_open():
print("Market data feed connected")
def on_feed_message(data):
print(f"Received market data: {data}")
def on_feed_error(error):
print(f"Feed error: {error}")
def on_feed_close():
print("Market data feed disconnected")
# Initialize feeder with callbacks
feeder = MarketDataFeeder(
api_client=api_client,
instrumentKeys=["NSE_EQ|INE002A01018"],
mode='full',
on_open=on_feed_open,
on_message=on_feed_message,
on_error=on_feed_error,
on_close=on_feed_close
)
# Start feeding
feeder.connect()Get WebSocket connection URLs and authorization for different data feeds.
def get_market_data_feed(api_version: str) -> WebsocketAuthRedirectResponse:
"""
Get market data WebSocket feed URL.
Parameters:
- api_version: API version ('2.0')
Returns:
WebsocketAuthRedirectResponse with WebSocket URL
"""
def get_market_data_feed_authorize(api_version: str) -> WebsocketAuthRedirectResponse:
"""
Get authorized market data WebSocket feed URL.
Parameters:
- api_version: API version ('2.0')
Returns:
WebsocketAuthRedirectResponse with authorized WebSocket URL
"""
def get_market_data_feed_v3() -> WebsocketAuthRedirectResponse:
"""
Get V3 market data WebSocket feed URL.
Returns:
WebsocketAuthRedirectResponse with V3 WebSocket URL
"""
def get_market_data_feed_authorize_v3() -> WebsocketAuthRedirectResponse:
"""
Get authorized V3 market data WebSocket feed URL.
Returns:
WebsocketAuthRedirectResponse with authorized V3 WebSocket URL
"""
def get_portfolio_stream_feed_authorize(api_version: str, order_update: bool = None, position_update: bool = None, holding_update: bool = None) -> WebsocketAuthRedirectResponse:
"""
Get authorized portfolio stream WebSocket URL.
Parameters:
- api_version: API version ('2.0')
- order_update: Enable order updates
- position_update: Enable position updates
- holding_update: Enable holding updates
Returns:
WebsocketAuthRedirectResponse with portfolio WebSocket URL
""""ltpc" - Last Traded Price & Change: Minimal data with price and change information"quote" - Quote: Price, volume, and basic market data"full" - Full: Complete market data including depth, OHLC, and all available fields"ltpc" - Last Traded Price & Change (V3)"full" - Full market data with improved performance (V3)"option_greeks" - Option Greeks data (Delta, Gamma, Theta, Vega)"full_d30" - Full market data with 30-day historical context{
"feeds": {
"NSE_EQ|INE002A01018": {
"ltpc": {
"ltp": 1520.50,
"ltt": "2024-09-06T15:29:45.000Z",
"ltq": 100,
"cp": 1515.75
}
}
}
}{
"feeds": {
"NSE_EQ|INE002A01018": {
"ff": {
"ltp": 1520.50,
"ltt": "2024-09-06T15:29:45.000Z",
"ltq": 100,
"cp": 1515.75,
"v": 1250000,
"o": 1518.00,
"h": 1525.00,
"l": 1510.00,
"c": 1515.75,
"ap": 1520.25,
"oi": 0,
"marketFF": {
"bp1": 1520.00, "bq1": 500,
"bp2": 1519.50, "bq2": 300,
"sp1": 1520.50, "sq1": 400,
"sp2": 1521.00, "sq2": 600
}
}
}
}
}# Configure automatic reconnection
streamer.auto_reconnect(
enable=True,
interval=5, # Reconnect after 5 seconds
retry_count=10 # Maximum 10 retry attempts
)
# Handle connection errors
def on_error(error):
print(f"WebSocket error: {error}")
# Implement custom error handling logic
if "authentication" in str(error).lower():
# Handle auth errors - refresh token
pass
elif "network" in str(error).lower():
# Handle network issues
pass
# Handle disconnections
def on_close():
print("Connection closed - auto-reconnect will attempt to reconnect")
# Log disconnection, update UI state, etc.# 1. Subscribe to only required instruments
essential_instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"]
streamer = MarketDataStreamer(api_client, essential_instruments, mode='ltpc')
# 2. Use appropriate subscription mode
streamer.subscribe(essential_instruments, mode='ltpc') # For price tracking
streamer.subscribe(depth_instruments, mode='full') # For depth analysis
# 3. Implement efficient message handling
def on_message(data):
# Process data efficiently
market_data = json.loads(data)
# Update only changed values in UI
update_ui_efficiently(market_data)# 1. Enable auto-reconnect for production
streamer.auto_reconnect(enable=True, interval=3, retry_count=5)
# 2. Graceful disconnection
def cleanup():
streamer.clear_subscriptions()
streamer.disconnect()
portfolio_streamer.disconnect()
# 3. Connection monitoring
def on_open():
# Reset connection error counters
connection_errors = 0
def on_error(error):
connection_errors += 1
if connection_errors > 5:
# Implement fallback mechanism
switch_to_rest_api_polling()# Dynamic subscription based on user activity
active_instruments = get_user_watchlist()
streamer.clear_subscriptions()
streamer.subscribe(active_instruments, mode='full')
# Efficient mode switching
def switch_to_trading_mode():
# Switch to full mode for active trading
streamer.change_mode(trading_instruments, newMode='full')
def switch_to_monitoring_mode():
# Switch to LTPC for passive monitoring
streamer.change_mode(all_instruments, newMode='ltpc')class WebsocketAuthRedirectResponse:
status: str
data: WebsocketAuthRedirectResponseData
class WebsocketAuthRedirectResponseData:
authorized_redirect_uri: str # WebSocket connection URL
# Market Data Feed Structure (parsed from JSON)
class MarketDataFeed:
feeds: dict[str, FeedData]
class FeedData:
ltpc: LTPCData # For LTPC mode
ff: FullFeedData # For full mode
class LTPCData:
ltp: float # Last traded price
ltt: str # Last trade time
ltq: int # Last trade quantity
cp: float # Close price
class FullFeedData:
ltp: float # Last traded price
v: int # Volume
o: float # Open
h: float # High
l: float # Low
c: float # Close
ap: float # Average price
oi: int # Open interest
marketFF: MarketDepth # Market depth
class MarketDepth:
bp1: float # Best bid price 1
bq1: int # Best bid quantity 1
sp1: float # Best ask price 1
sq1: int # Best ask quantity 1
# ... up to 5 levels (bp1-bp5, bq1-bq5, sp1-sp5, sq1-sq5)Install with Tessl CLI
npx tessl i tessl/pypi-upstox-python-sdk