0
# WebSocket Streaming
1
2
Real-time data streaming for live market data feeds and portfolio updates using WebSocket connections with event-driven architecture. Support for both callback-based feeders and event-driven streamers.
3
4
## Capabilities
5
6
### Market Data Streaming
7
8
Stream live market data including last traded price, full market quotes, and market depth using WebSocket connections.
9
10
```python { .api }
11
class MarketDataStreamer:
12
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:
13
"""
14
Initialize market data streamer.
15
16
Parameters:
17
- api_client: Authenticated API client
18
- instrumentKeys: List of instrument tokens to subscribe
19
- mode: Subscription mode ('ltpc', 'full', 'quote')
20
"""
21
22
def connect() -> None:
23
"""Establish WebSocket connection to market data feed"""
24
25
def subscribe(instrumentKeys: list, mode: str) -> None:
26
"""
27
Subscribe to market data for instruments.
28
29
Parameters:
30
- instrumentKeys: List of instrument tokens
31
- mode: Data mode ('ltpc', 'full', 'quote')
32
"""
33
34
def unsubscribe(instrumentKeys: list) -> None:
35
"""
36
Unsubscribe from market data.
37
38
Parameters:
39
- instrumentKeys: List of instrument tokens to unsubscribe
40
"""
41
42
def change_mode(instrumentKeys: list, newMode: str) -> None:
43
"""
44
Change subscription mode for instruments.
45
46
Parameters:
47
- instrumentKeys: List of instrument tokens
48
- newMode: New subscription mode
49
"""
50
51
def clear_subscriptions() -> None:
52
"""Remove all active subscriptions"""
53
54
def on(event: str, listener: callable) -> None:
55
"""
56
Register event listener.
57
58
Parameters:
59
- event: Event name ('open', 'message', 'error', 'close')
60
- listener: Callback function
61
"""
62
63
def disconnect() -> None:
64
"""Disconnect from WebSocket"""
65
66
def auto_reconnect(enable: bool, interval: int = None, retry_count: int = None) -> None:
67
"""
68
Configure automatic reconnection.
69
70
Parameters:
71
- enable: Enable/disable auto-reconnect
72
- interval: Reconnection interval in seconds
73
- retry_count: Maximum retry attempts
74
"""
75
```
76
77
#### Usage Example
78
79
```python
80
from upstox_client.feeder import MarketDataStreamer
81
from upstox_client import Configuration, ApiClient
82
import json
83
84
# Setup
85
config = Configuration()
86
config.access_token = 'your_access_token'
87
api_client = ApiClient(config)
88
89
# Initialize streamer
90
instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"] # Reliance, Infosys
91
streamer = MarketDataStreamer(api_client, instruments, mode='full')
92
93
# Event handlers
94
def on_open():
95
print("WebSocket connection opened")
96
print(f"Subscribed to {len(instruments)} instruments")
97
98
def on_message(data):
99
"""Handle incoming market data"""
100
try:
101
market_data = json.loads(data)
102
if 'feeds' in market_data:
103
for instrument_token, feed_data in market_data['feeds'].items():
104
if 'ff' in feed_data: # Full feed
105
ff = feed_data['ff']
106
print(f"{instrument_token}:")
107
print(f" LTP: ₹{ff.get('ltp', 0):.2f}")
108
print(f" Volume: {ff.get('v', 0)}")
109
print(f" Change: {ff.get('nc', 0):.2f}")
110
print(f" % Change: {ff.get('pc', 0):.2f}%")
111
112
# Market depth
113
if 'marketFF' in ff:
114
depth = ff['marketFF']
115
print(f" Best Bid: ₹{depth.get('bp1', 0):.2f} x {depth.get('bq1', 0)}")
116
print(f" Best Ask: ₹{depth.get('sp1', 0):.2f} x {depth.get('sq1', 0)}")
117
print()
118
except Exception as e:
119
print(f"Error processing message: {e}")
120
121
def on_error(error):
122
print(f"WebSocket error: {error}")
123
124
def on_close():
125
print("WebSocket connection closed")
126
127
# Register event handlers
128
streamer.on('open', on_open)
129
streamer.on('message', on_message)
130
streamer.on('error', on_error)
131
streamer.on('close', on_close)
132
133
# Enable auto-reconnect
134
streamer.auto_reconnect(enable=True, interval=5, retry_count=10)
135
136
# Connect to start streaming
137
streamer.connect()
138
139
# Runtime subscription management
140
import time
141
time.sleep(10) # Stream for 10 seconds
142
143
# Add more instruments
144
new_instruments = ["NSE_EQ|INE040A01034"] # HDFC Bank
145
streamer.subscribe(new_instruments, mode='ltpc')
146
147
# Change mode for existing subscription
148
streamer.change_mode(["NSE_EQ|INE002A01018"], newMode='ltpc')
149
150
# Unsubscribe from specific instruments
151
streamer.unsubscribe(["NSE_EQ|INE009A01021"])
152
153
# Clear all subscriptions
154
# streamer.clear_subscriptions()
155
156
# Disconnect when done
157
# streamer.disconnect()
158
```
159
160
### Enhanced Market Data Streaming (V3)
161
162
Improved market data streamer with enhanced performance and features.
163
164
```python { .api }
165
class MarketDataStreamerV3:
166
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:
167
"""
168
Initialize V3 market data streamer with enhanced modes.
169
170
Parameters:
171
- api_client: Authenticated API client
172
- instrumentKeys: List of instrument tokens to subscribe
173
- mode: Subscription mode ('ltpc', 'full', 'option_greeks', 'full_d30')
174
"""
175
176
def connect() -> None:
177
"""Establish V3 WebSocket connection"""
178
179
def subscribe(instrumentKeys: list, mode: str) -> None:
180
"""Subscribe to V3 market data feed"""
181
182
def unsubscribe(instrumentKeys: list) -> None:
183
"""Unsubscribe from V3 market data feed"""
184
185
def change_mode(instrumentKeys: list, newMode: str) -> None:
186
"""Change V3 subscription mode"""
187
188
def clear_subscriptions() -> None:
189
"""Clear all V3 subscriptions"""
190
```
191
192
### Portfolio Data Streaming
193
194
Stream live portfolio updates including order status changes, position updates, and holdings modifications.
195
196
```python { .api }
197
class PortfolioDataStreamer:
198
def __init__(api_client: ApiClient = None, order_update: bool = None, position_update: bool = None, holding_update: bool = None, gtt_update: bool = None) -> None:
199
"""
200
Initialize portfolio data streamer.
201
202
Parameters:
203
- api_client: Authenticated API client
204
- order_update: Enable order status updates
205
- position_update: Enable position updates
206
- holding_update: Enable holdings updates
207
- gtt_update: Enable GTT order updates
208
"""
209
210
def connect() -> None:
211
"""Connect to portfolio data stream"""
212
213
def on(event: str, listener: callable) -> None:
214
"""
215
Register event listener for portfolio updates.
216
217
Parameters:
218
- event: Event name ('open', 'message', 'error', 'close')
219
- listener: Callback function
220
"""
221
```
222
223
#### Usage Example
224
225
```python
226
from upstox_client.feeder import PortfolioDataStreamer
227
import json
228
229
# Initialize portfolio streamer
230
portfolio_streamer = PortfolioDataStreamer(
231
api_client=api_client,
232
order_update=True,
233
position_update=True,
234
holding_update=True,
235
gtt_update=True
236
)
237
238
def on_portfolio_open():
239
print("Portfolio WebSocket connection opened")
240
241
def on_portfolio_message(data):
242
"""Handle portfolio updates"""
243
try:
244
update = json.loads(data)
245
update_type = update.get('type')
246
247
if update_type == 'order':
248
order_data = update.get('data', {})
249
print(f"Order Update:")
250
print(f" Order ID: {order_data.get('order_id')}")
251
print(f" Status: {order_data.get('status')}")
252
print(f" Symbol: {order_data.get('tradingsymbol')}")
253
print(f" Quantity: {order_data.get('quantity')}")
254
print(f" Price: ₹{order_data.get('price', 0):.2f}")
255
256
elif update_type == 'position':
257
position_data = update.get('data', {})
258
print(f"Position Update:")
259
print(f" Symbol: {position_data.get('tradingsymbol')}")
260
print(f" Net Quantity: {position_data.get('quantity')}")
261
print(f" P&L: ₹{position_data.get('pnl', 0):.2f}")
262
263
elif update_type == 'holding':
264
holding_data = update.get('data', {})
265
print(f"Holding Update:")
266
print(f" Symbol: {holding_data.get('tradingsymbol')}")
267
print(f" Quantity: {holding_data.get('quantity')}")
268
print(f" Current Value: ₹{holding_data.get('current_value', 0):.2f}")
269
270
elif update_type == 'gtt':
271
gtt_data = update.get('data', {})
272
print(f"GTT Update:")
273
print(f" GTT ID: {gtt_data.get('gtt_order_id')}")
274
print(f" Status: {gtt_data.get('status')}")
275
276
except Exception as e:
277
print(f"Error processing portfolio update: {e}")
278
279
def on_portfolio_error(error):
280
print(f"Portfolio WebSocket error: {error}")
281
282
def on_portfolio_close():
283
print("Portfolio WebSocket connection closed")
284
285
# Register event handlers
286
portfolio_streamer.on('open', on_portfolio_open)
287
portfolio_streamer.on('message', on_portfolio_message)
288
portfolio_streamer.on('error', on_portfolio_error)
289
portfolio_streamer.on('close', on_portfolio_close)
290
291
# Connect to start receiving updates
292
portfolio_streamer.connect()
293
```
294
295
### Callback-based Feeders
296
297
Alternative feeder classes that use callback functions instead of event listeners.
298
299
```python { .api }
300
class MarketDataFeeder:
301
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:
302
"""
303
Initialize market data feeder with callbacks.
304
305
Parameters:
306
- api_client: Authenticated API client
307
- instrumentKeys: List of instrument tokens
308
- mode: Subscription mode
309
- on_open: Connection opened callback
310
- on_message: Message received callback
311
- on_error: Error occurred callback
312
- on_close: Connection closed callback
313
"""
314
315
def connect() -> None:
316
"""Start market data feed with callbacks"""
317
318
def subscribe(instrumentKeys: list, mode: str = None) -> None:
319
"""Subscribe to instruments with callback handling"""
320
321
def unsubscribe(instrumentKeys: list) -> None:
322
"""Unsubscribe from instruments"""
323
324
def change_mode(instrumentKeys: list, newMode: str) -> None:
325
"""Change subscription mode"""
326
327
class MarketDataFeederV3:
328
def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:
329
"""Initialize V3 market data feeder with callbacks"""
330
331
class PortfolioDataFeeder:
332
def connect() -> None:
333
"""Connect to portfolio data feed with callbacks"""
334
```
335
336
#### Usage Example
337
338
```python
339
from upstox_client.feeder import MarketDataFeeder
340
341
def on_feed_open():
342
print("Market data feed connected")
343
344
def on_feed_message(data):
345
print(f"Received market data: {data}")
346
347
def on_feed_error(error):
348
print(f"Feed error: {error}")
349
350
def on_feed_close():
351
print("Market data feed disconnected")
352
353
# Initialize feeder with callbacks
354
feeder = MarketDataFeeder(
355
api_client=api_client,
356
instrumentKeys=["NSE_EQ|INE002A01018"],
357
mode='full',
358
on_open=on_feed_open,
359
on_message=on_feed_message,
360
on_error=on_feed_error,
361
on_close=on_feed_close
362
)
363
364
# Start feeding
365
feeder.connect()
366
```
367
368
### WebSocket Authorization
369
370
Get WebSocket connection URLs and authorization for different data feeds.
371
372
```python { .api }
373
def get_market_data_feed(api_version: str) -> WebsocketAuthRedirectResponse:
374
"""
375
Get market data WebSocket feed URL.
376
377
Parameters:
378
- api_version: API version ('2.0')
379
380
Returns:
381
WebsocketAuthRedirectResponse with WebSocket URL
382
"""
383
384
def get_market_data_feed_authorize(api_version: str) -> WebsocketAuthRedirectResponse:
385
"""
386
Get authorized market data WebSocket feed URL.
387
388
Parameters:
389
- api_version: API version ('2.0')
390
391
Returns:
392
WebsocketAuthRedirectResponse with authorized WebSocket URL
393
"""
394
395
def get_market_data_feed_v3() -> WebsocketAuthRedirectResponse:
396
"""
397
Get V3 market data WebSocket feed URL.
398
399
Returns:
400
WebsocketAuthRedirectResponse with V3 WebSocket URL
401
"""
402
403
def get_market_data_feed_authorize_v3() -> WebsocketAuthRedirectResponse:
404
"""
405
Get authorized V3 market data WebSocket feed URL.
406
407
Returns:
408
WebsocketAuthRedirectResponse with authorized V3 WebSocket URL
409
"""
410
411
def get_portfolio_stream_feed_authorize(api_version: str, order_update: bool = None, position_update: bool = None, holding_update: bool = None) -> WebsocketAuthRedirectResponse:
412
"""
413
Get authorized portfolio stream WebSocket URL.
414
415
Parameters:
416
- api_version: API version ('2.0')
417
- order_update: Enable order updates
418
- position_update: Enable position updates
419
- holding_update: Enable holding updates
420
421
Returns:
422
WebsocketAuthRedirectResponse with portfolio WebSocket URL
423
"""
424
```
425
426
## Subscription Modes
427
428
### Market Data Modes
429
430
#### Standard Modes
431
- `"ltpc"` - Last Traded Price & Change: Minimal data with price and change information
432
- `"quote"` - Quote: Price, volume, and basic market data
433
- `"full"` - Full: Complete market data including depth, OHLC, and all available fields
434
435
#### V3 Enhanced Modes
436
- `"ltpc"` - Last Traded Price & Change (V3)
437
- `"full"` - Full market data with improved performance (V3)
438
- `"option_greeks"` - Option Greeks data (Delta, Gamma, Theta, Vega)
439
- `"full_d30"` - Full market data with 30-day historical context
440
441
### Data Format Examples
442
443
#### LTPC Mode
444
```json
445
{
446
"feeds": {
447
"NSE_EQ|INE002A01018": {
448
"ltpc": {
449
"ltp": 1520.50,
450
"ltt": "2024-09-06T15:29:45.000Z",
451
"ltq": 100,
452
"cp": 1515.75
453
}
454
}
455
}
456
}
457
```
458
459
#### Full Mode
460
```json
461
{
462
"feeds": {
463
"NSE_EQ|INE002A01018": {
464
"ff": {
465
"ltp": 1520.50,
466
"ltt": "2024-09-06T15:29:45.000Z",
467
"ltq": 100,
468
"cp": 1515.75,
469
"v": 1250000,
470
"o": 1518.00,
471
"h": 1525.00,
472
"l": 1510.00,
473
"c": 1515.75,
474
"ap": 1520.25,
475
"oi": 0,
476
"marketFF": {
477
"bp1": 1520.00, "bq1": 500,
478
"bp2": 1519.50, "bq2": 300,
479
"sp1": 1520.50, "sq1": 400,
480
"sp2": 1521.00, "sq2": 600
481
}
482
}
483
}
484
}
485
}
486
```
487
488
## Error Handling & Reconnection
489
490
```python
491
# Configure automatic reconnection
492
streamer.auto_reconnect(
493
enable=True,
494
interval=5, # Reconnect after 5 seconds
495
retry_count=10 # Maximum 10 retry attempts
496
)
497
498
# Handle connection errors
499
def on_error(error):
500
print(f"WebSocket error: {error}")
501
# Implement custom error handling logic
502
if "authentication" in str(error).lower():
503
# Handle auth errors - refresh token
504
pass
505
elif "network" in str(error).lower():
506
# Handle network issues
507
pass
508
509
# Handle disconnections
510
def on_close():
511
print("Connection closed - auto-reconnect will attempt to reconnect")
512
# Log disconnection, update UI state, etc.
513
```
514
515
## Best Practices
516
517
### Performance Optimization
518
```python
519
# 1. Subscribe to only required instruments
520
essential_instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"]
521
streamer = MarketDataStreamer(api_client, essential_instruments, mode='ltpc')
522
523
# 2. Use appropriate subscription mode
524
streamer.subscribe(essential_instruments, mode='ltpc') # For price tracking
525
streamer.subscribe(depth_instruments, mode='full') # For depth analysis
526
527
# 3. Implement efficient message handling
528
def on_message(data):
529
# Process data efficiently
530
market_data = json.loads(data)
531
# Update only changed values in UI
532
update_ui_efficiently(market_data)
533
```
534
535
### Connection Management
536
```python
537
# 1. Enable auto-reconnect for production
538
streamer.auto_reconnect(enable=True, interval=3, retry_count=5)
539
540
# 2. Graceful disconnection
541
def cleanup():
542
streamer.clear_subscriptions()
543
streamer.disconnect()
544
portfolio_streamer.disconnect()
545
546
# 3. Connection monitoring
547
def on_open():
548
# Reset connection error counters
549
connection_errors = 0
550
551
def on_error(error):
552
connection_errors += 1
553
if connection_errors > 5:
554
# Implement fallback mechanism
555
switch_to_rest_api_polling()
556
```
557
558
### Subscription Management
559
```python
560
# Dynamic subscription based on user activity
561
active_instruments = get_user_watchlist()
562
streamer.clear_subscriptions()
563
streamer.subscribe(active_instruments, mode='full')
564
565
# Efficient mode switching
566
def switch_to_trading_mode():
567
# Switch to full mode for active trading
568
streamer.change_mode(trading_instruments, newMode='full')
569
570
def switch_to_monitoring_mode():
571
# Switch to LTPC for passive monitoring
572
streamer.change_mode(all_instruments, newMode='ltpc')
573
```
574
575
## WebSocket Response Types
576
577
```python { .api }
578
class WebsocketAuthRedirectResponse:
579
status: str
580
data: WebsocketAuthRedirectResponseData
581
582
class WebsocketAuthRedirectResponseData:
583
authorized_redirect_uri: str # WebSocket connection URL
584
585
# Market Data Feed Structure (parsed from JSON)
586
class MarketDataFeed:
587
feeds: dict[str, FeedData]
588
589
class FeedData:
590
ltpc: LTPCData # For LTPC mode
591
ff: FullFeedData # For full mode
592
593
class LTPCData:
594
ltp: float # Last traded price
595
ltt: str # Last trade time
596
ltq: int # Last trade quantity
597
cp: float # Close price
598
599
class FullFeedData:
600
ltp: float # Last traded price
601
v: int # Volume
602
o: float # Open
603
h: float # High
604
l: float # Low
605
c: float # Close
606
ap: float # Average price
607
oi: int # Open interest
608
marketFF: MarketDepth # Market depth
609
610
class MarketDepth:
611
bp1: float # Best bid price 1
612
bq1: int # Best bid quantity 1
613
sp1: float # Best ask price 1
614
sq1: int # Best ask quantity 1
615
# ... up to 5 levels (bp1-bp5, bq1-bq5, sp1-sp5, sq1-sq5)
616
```