or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcharges-analytics.mdindex.mdmarket-data.mdorder-management.mdportfolio-management.mdwebsocket-streaming.md

websocket-streaming.mddocs/

0

# WebSocket Streaming

1

2

Real-time data streaming for live market data feeds and portfolio updates using WebSocket connections with event-driven architecture. Support for both callback-based feeders and event-driven streamers.

3

4

## Capabilities

5

6

### Market Data Streaming

7

8

Stream live market data including last traded price, full market quotes, and market depth using WebSocket connections.

9

10

```python { .api }

11

class MarketDataStreamer:

12

def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:

13

"""

14

Initialize market data streamer.

15

16

Parameters:

17

- api_client: Authenticated API client

18

- instrumentKeys: List of instrument tokens to subscribe

19

- mode: Subscription mode ('ltpc', 'full', 'quote')

20

"""

21

22

def connect() -> None:

23

"""Establish WebSocket connection to market data feed"""

24

25

def subscribe(instrumentKeys: list, mode: str) -> None:

26

"""

27

Subscribe to market data for instruments.

28

29

Parameters:

30

- instrumentKeys: List of instrument tokens

31

- mode: Data mode ('ltpc', 'full', 'quote')

32

"""

33

34

def unsubscribe(instrumentKeys: list) -> None:

35

"""

36

Unsubscribe from market data.

37

38

Parameters:

39

- instrumentKeys: List of instrument tokens to unsubscribe

40

"""

41

42

def change_mode(instrumentKeys: list, newMode: str) -> None:

43

"""

44

Change subscription mode for instruments.

45

46

Parameters:

47

- instrumentKeys: List of instrument tokens

48

- newMode: New subscription mode

49

"""

50

51

def clear_subscriptions() -> None:

52

"""Remove all active subscriptions"""

53

54

def on(event: str, listener: callable) -> None:

55

"""

56

Register event listener.

57

58

Parameters:

59

- event: Event name ('open', 'message', 'error', 'close')

60

- listener: Callback function

61

"""

62

63

def disconnect() -> None:

64

"""Disconnect from WebSocket"""

65

66

def auto_reconnect(enable: bool, interval: int = None, retry_count: int = None) -> None:

67

"""

68

Configure automatic reconnection.

69

70

Parameters:

71

- enable: Enable/disable auto-reconnect

72

- interval: Reconnection interval in seconds

73

- retry_count: Maximum retry attempts

74

"""

75

```

76

77

#### Usage Example

78

79

```python

80

from upstox_client.feeder import MarketDataStreamer

81

from upstox_client import Configuration, ApiClient

82

import json

83

84

# Setup

85

config = Configuration()

86

config.access_token = 'your_access_token'

87

api_client = ApiClient(config)

88

89

# Initialize streamer

90

instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"] # Reliance, Infosys

91

streamer = MarketDataStreamer(api_client, instruments, mode='full')

92

93

# Event handlers

94

def on_open():

95

print("WebSocket connection opened")

96

print(f"Subscribed to {len(instruments)} instruments")

97

98

def on_message(data):

99

"""Handle incoming market data"""

100

try:

101

market_data = json.loads(data)

102

if 'feeds' in market_data:

103

for instrument_token, feed_data in market_data['feeds'].items():

104

if 'ff' in feed_data: # Full feed

105

ff = feed_data['ff']

106

print(f"{instrument_token}:")

107

print(f" LTP: ₹{ff.get('ltp', 0):.2f}")

108

print(f" Volume: {ff.get('v', 0)}")

109

print(f" Change: {ff.get('nc', 0):.2f}")

110

print(f" % Change: {ff.get('pc', 0):.2f}%")

111

112

# Market depth

113

if 'marketFF' in ff:

114

depth = ff['marketFF']

115

print(f" Best Bid: ₹{depth.get('bp1', 0):.2f} x {depth.get('bq1', 0)}")

116

print(f" Best Ask: ₹{depth.get('sp1', 0):.2f} x {depth.get('sq1', 0)}")

117

print()

118

except Exception as e:

119

print(f"Error processing message: {e}")

120

121

def on_error(error):

122

print(f"WebSocket error: {error}")

123

124

def on_close():

125

print("WebSocket connection closed")

126

127

# Register event handlers

128

streamer.on('open', on_open)

129

streamer.on('message', on_message)

130

streamer.on('error', on_error)

131

streamer.on('close', on_close)

132

133

# Enable auto-reconnect

134

streamer.auto_reconnect(enable=True, interval=5, retry_count=10)

135

136

# Connect to start streaming

137

streamer.connect()

138

139

# Runtime subscription management

140

import time

141

time.sleep(10) # Stream for 10 seconds

142

143

# Add more instruments

144

new_instruments = ["NSE_EQ|INE040A01034"] # HDFC Bank

145

streamer.subscribe(new_instruments, mode='ltpc')

146

147

# Change mode for existing subscription

148

streamer.change_mode(["NSE_EQ|INE002A01018"], newMode='ltpc')

149

150

# Unsubscribe from specific instruments

151

streamer.unsubscribe(["NSE_EQ|INE009A01021"])

152

153

# Clear all subscriptions

154

# streamer.clear_subscriptions()

155

156

# Disconnect when done

157

# streamer.disconnect()

158

```

159

160

### Enhanced Market Data Streaming (V3)

161

162

Improved market data streamer with enhanced performance and features.

163

164

```python { .api }

165

class MarketDataStreamerV3:

166

def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None) -> None:

167

"""

168

Initialize V3 market data streamer with enhanced modes.

169

170

Parameters:

171

- api_client: Authenticated API client

172

- instrumentKeys: List of instrument tokens to subscribe

173

- mode: Subscription mode ('ltpc', 'full', 'option_greeks', 'full_d30')

174

"""

175

176

def connect() -> None:

177

"""Establish V3 WebSocket connection"""

178

179

def subscribe(instrumentKeys: list, mode: str) -> None:

180

"""Subscribe to V3 market data feed"""

181

182

def unsubscribe(instrumentKeys: list) -> None:

183

"""Unsubscribe from V3 market data feed"""

184

185

def change_mode(instrumentKeys: list, newMode: str) -> None:

186

"""Change V3 subscription mode"""

187

188

def clear_subscriptions() -> None:

189

"""Clear all V3 subscriptions"""

190

```

191

192

### Portfolio Data Streaming

193

194

Stream live portfolio updates including order status changes, position updates, and holdings modifications.

195

196

```python { .api }

197

class PortfolioDataStreamer:

198

def __init__(api_client: ApiClient = None, order_update: bool = None, position_update: bool = None, holding_update: bool = None, gtt_update: bool = None) -> None:

199

"""

200

Initialize portfolio data streamer.

201

202

Parameters:

203

- api_client: Authenticated API client

204

- order_update: Enable order status updates

205

- position_update: Enable position updates

206

- holding_update: Enable holdings updates

207

- gtt_update: Enable GTT order updates

208

"""

209

210

def connect() -> None:

211

"""Connect to portfolio data stream"""

212

213

def on(event: str, listener: callable) -> None:

214

"""

215

Register event listener for portfolio updates.

216

217

Parameters:

218

- event: Event name ('open', 'message', 'error', 'close')

219

- listener: Callback function

220

"""

221

```

222

223

#### Usage Example

224

225

```python

226

from upstox_client.feeder import PortfolioDataStreamer

227

import json

228

229

# Initialize portfolio streamer

230

portfolio_streamer = PortfolioDataStreamer(

231

api_client=api_client,

232

order_update=True,

233

position_update=True,

234

holding_update=True,

235

gtt_update=True

236

)

237

238

def on_portfolio_open():

239

print("Portfolio WebSocket connection opened")

240

241

def on_portfolio_message(data):

242

"""Handle portfolio updates"""

243

try:

244

update = json.loads(data)

245

update_type = update.get('type')

246

247

if update_type == 'order':

248

order_data = update.get('data', {})

249

print(f"Order Update:")

250

print(f" Order ID: {order_data.get('order_id')}")

251

print(f" Status: {order_data.get('status')}")

252

print(f" Symbol: {order_data.get('tradingsymbol')}")

253

print(f" Quantity: {order_data.get('quantity')}")

254

print(f" Price: ₹{order_data.get('price', 0):.2f}")

255

256

elif update_type == 'position':

257

position_data = update.get('data', {})

258

print(f"Position Update:")

259

print(f" Symbol: {position_data.get('tradingsymbol')}")

260

print(f" Net Quantity: {position_data.get('quantity')}")

261

print(f" P&L: ₹{position_data.get('pnl', 0):.2f}")

262

263

elif update_type == 'holding':

264

holding_data = update.get('data', {})

265

print(f"Holding Update:")

266

print(f" Symbol: {holding_data.get('tradingsymbol')}")

267

print(f" Quantity: {holding_data.get('quantity')}")

268

print(f" Current Value: ₹{holding_data.get('current_value', 0):.2f}")

269

270

elif update_type == 'gtt':

271

gtt_data = update.get('data', {})

272

print(f"GTT Update:")

273

print(f" GTT ID: {gtt_data.get('gtt_order_id')}")

274

print(f" Status: {gtt_data.get('status')}")

275

276

except Exception as e:

277

print(f"Error processing portfolio update: {e}")

278

279

def on_portfolio_error(error):

280

print(f"Portfolio WebSocket error: {error}")

281

282

def on_portfolio_close():

283

print("Portfolio WebSocket connection closed")

284

285

# Register event handlers

286

portfolio_streamer.on('open', on_portfolio_open)

287

portfolio_streamer.on('message', on_portfolio_message)

288

portfolio_streamer.on('error', on_portfolio_error)

289

portfolio_streamer.on('close', on_portfolio_close)

290

291

# Connect to start receiving updates

292

portfolio_streamer.connect()

293

```

294

295

### Callback-based Feeders

296

297

Alternative feeder classes that use callback functions instead of event listeners.

298

299

```python { .api }

300

class MarketDataFeeder:

301

def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:

302

"""

303

Initialize market data feeder with callbacks.

304

305

Parameters:

306

- api_client: Authenticated API client

307

- instrumentKeys: List of instrument tokens

308

- mode: Subscription mode

309

- on_open: Connection opened callback

310

- on_message: Message received callback

311

- on_error: Error occurred callback

312

- on_close: Connection closed callback

313

"""

314

315

def connect() -> None:

316

"""Start market data feed with callbacks"""

317

318

def subscribe(instrumentKeys: list, mode: str = None) -> None:

319

"""Subscribe to instruments with callback handling"""

320

321

def unsubscribe(instrumentKeys: list) -> None:

322

"""Unsubscribe from instruments"""

323

324

def change_mode(instrumentKeys: list, newMode: str) -> None:

325

"""Change subscription mode"""

326

327

class MarketDataFeederV3:

328

def __init__(api_client: ApiClient = None, instrumentKeys: list = None, mode: str = None, on_open: callable = None, on_message: callable = None, on_error: callable = None, on_close: callable = None) -> None:

329

"""Initialize V3 market data feeder with callbacks"""

330

331

class PortfolioDataFeeder:

332

def connect() -> None:

333

"""Connect to portfolio data feed with callbacks"""

334

```

335

336

#### Usage Example

337

338

```python

339

from upstox_client.feeder import MarketDataFeeder

340

341

def on_feed_open():

342

print("Market data feed connected")

343

344

def on_feed_message(data):

345

print(f"Received market data: {data}")

346

347

def on_feed_error(error):

348

print(f"Feed error: {error}")

349

350

def on_feed_close():

351

print("Market data feed disconnected")

352

353

# Initialize feeder with callbacks

354

feeder = MarketDataFeeder(

355

api_client=api_client,

356

instrumentKeys=["NSE_EQ|INE002A01018"],

357

mode='full',

358

on_open=on_feed_open,

359

on_message=on_feed_message,

360

on_error=on_feed_error,

361

on_close=on_feed_close

362

)

363

364

# Start feeding

365

feeder.connect()

366

```

367

368

### WebSocket Authorization

369

370

Get WebSocket connection URLs and authorization for different data feeds.

371

372

```python { .api }

373

def get_market_data_feed(api_version: str) -> WebsocketAuthRedirectResponse:

374

"""

375

Get market data WebSocket feed URL.

376

377

Parameters:

378

- api_version: API version ('2.0')

379

380

Returns:

381

WebsocketAuthRedirectResponse with WebSocket URL

382

"""

383

384

def get_market_data_feed_authorize(api_version: str) -> WebsocketAuthRedirectResponse:

385

"""

386

Get authorized market data WebSocket feed URL.

387

388

Parameters:

389

- api_version: API version ('2.0')

390

391

Returns:

392

WebsocketAuthRedirectResponse with authorized WebSocket URL

393

"""

394

395

def get_market_data_feed_v3() -> WebsocketAuthRedirectResponse:

396

"""

397

Get V3 market data WebSocket feed URL.

398

399

Returns:

400

WebsocketAuthRedirectResponse with V3 WebSocket URL

401

"""

402

403

def get_market_data_feed_authorize_v3() -> WebsocketAuthRedirectResponse:

404

"""

405

Get authorized V3 market data WebSocket feed URL.

406

407

Returns:

408

WebsocketAuthRedirectResponse with authorized V3 WebSocket URL

409

"""

410

411

def get_portfolio_stream_feed_authorize(api_version: str, order_update: bool = None, position_update: bool = None, holding_update: bool = None) -> WebsocketAuthRedirectResponse:

412

"""

413

Get authorized portfolio stream WebSocket URL.

414

415

Parameters:

416

- api_version: API version ('2.0')

417

- order_update: Enable order updates

418

- position_update: Enable position updates

419

- holding_update: Enable holding updates

420

421

Returns:

422

WebsocketAuthRedirectResponse with portfolio WebSocket URL

423

"""

424

```

425

426

## Subscription Modes

427

428

### Market Data Modes

429

430

#### Standard Modes

431

- `"ltpc"` - Last Traded Price & Change: Minimal data with price and change information

432

- `"quote"` - Quote: Price, volume, and basic market data

433

- `"full"` - Full: Complete market data including depth, OHLC, and all available fields

434

435

#### V3 Enhanced Modes

436

- `"ltpc"` - Last Traded Price & Change (V3)

437

- `"full"` - Full market data with improved performance (V3)

438

- `"option_greeks"` - Option Greeks data (Delta, Gamma, Theta, Vega)

439

- `"full_d30"` - Full market data with 30-day historical context

440

441

### Data Format Examples

442

443

#### LTPC Mode

444

```json

445

{

446

"feeds": {

447

"NSE_EQ|INE002A01018": {

448

"ltpc": {

449

"ltp": 1520.50,

450

"ltt": "2024-09-06T15:29:45.000Z",

451

"ltq": 100,

452

"cp": 1515.75

453

}

454

}

455

}

456

}

457

```

458

459

#### Full Mode

460

```json

461

{

462

"feeds": {

463

"NSE_EQ|INE002A01018": {

464

"ff": {

465

"ltp": 1520.50,

466

"ltt": "2024-09-06T15:29:45.000Z",

467

"ltq": 100,

468

"cp": 1515.75,

469

"v": 1250000,

470

"o": 1518.00,

471

"h": 1525.00,

472

"l": 1510.00,

473

"c": 1515.75,

474

"ap": 1520.25,

475

"oi": 0,

476

"marketFF": {

477

"bp1": 1520.00, "bq1": 500,

478

"bp2": 1519.50, "bq2": 300,

479

"sp1": 1520.50, "sq1": 400,

480

"sp2": 1521.00, "sq2": 600

481

}

482

}

483

}

484

}

485

}

486

```

487

488

## Error Handling & Reconnection

489

490

```python

491

# Configure automatic reconnection

492

streamer.auto_reconnect(

493

enable=True,

494

interval=5, # Reconnect after 5 seconds

495

retry_count=10 # Maximum 10 retry attempts

496

)

497

498

# Handle connection errors

499

def on_error(error):

500

print(f"WebSocket error: {error}")

501

# Implement custom error handling logic

502

if "authentication" in str(error).lower():

503

# Handle auth errors - refresh token

504

pass

505

elif "network" in str(error).lower():

506

# Handle network issues

507

pass

508

509

# Handle disconnections

510

def on_close():

511

print("Connection closed - auto-reconnect will attempt to reconnect")

512

# Log disconnection, update UI state, etc.

513

```

514

515

## Best Practices

516

517

### Performance Optimization

518

```python

519

# 1. Subscribe to only required instruments

520

essential_instruments = ["NSE_EQ|INE002A01018", "NSE_EQ|INE009A01021"]

521

streamer = MarketDataStreamer(api_client, essential_instruments, mode='ltpc')

522

523

# 2. Use appropriate subscription mode

524

streamer.subscribe(essential_instruments, mode='ltpc') # For price tracking

525

streamer.subscribe(depth_instruments, mode='full') # For depth analysis

526

527

# 3. Implement efficient message handling

528

def on_message(data):

529

# Process data efficiently

530

market_data = json.loads(data)

531

# Update only changed values in UI

532

update_ui_efficiently(market_data)

533

```

534

535

### Connection Management

536

```python

537

# 1. Enable auto-reconnect for production

538

streamer.auto_reconnect(enable=True, interval=3, retry_count=5)

539

540

# 2. Graceful disconnection

541

def cleanup():

542

streamer.clear_subscriptions()

543

streamer.disconnect()

544

portfolio_streamer.disconnect()

545

546

# 3. Connection monitoring

547

def on_open():

548

# Reset connection error counters

549

connection_errors = 0

550

551

def on_error(error):

552

connection_errors += 1

553

if connection_errors > 5:

554

# Implement fallback mechanism

555

switch_to_rest_api_polling()

556

```

557

558

### Subscription Management

559

```python

560

# Dynamic subscription based on user activity

561

active_instruments = get_user_watchlist()

562

streamer.clear_subscriptions()

563

streamer.subscribe(active_instruments, mode='full')

564

565

# Efficient mode switching

566

def switch_to_trading_mode():

567

# Switch to full mode for active trading

568

streamer.change_mode(trading_instruments, newMode='full')

569

570

def switch_to_monitoring_mode():

571

# Switch to LTPC for passive monitoring

572

streamer.change_mode(all_instruments, newMode='ltpc')

573

```

574

575

## WebSocket Response Types

576

577

```python { .api }

578

class WebsocketAuthRedirectResponse:

579

status: str

580

data: WebsocketAuthRedirectResponseData

581

582

class WebsocketAuthRedirectResponseData:

583

authorized_redirect_uri: str # WebSocket connection URL

584

585

# Market Data Feed Structure (parsed from JSON)

586

class MarketDataFeed:

587

feeds: dict[str, FeedData]

588

589

class FeedData:

590

ltpc: LTPCData # For LTPC mode

591

ff: FullFeedData # For full mode

592

593

class LTPCData:

594

ltp: float # Last traded price

595

ltt: str # Last trade time

596

ltq: int # Last trade quantity

597

cp: float # Close price

598

599

class FullFeedData:

600

ltp: float # Last traded price

601

v: int # Volume

602

o: float # Open

603

h: float # High

604

l: float # Low

605

c: float # Close

606

ap: float # Average price

607

oi: int # Open interest

608

marketFF: MarketDepth # Market depth

609

610

class MarketDepth:

611

bp1: float # Best bid price 1

612

bq1: int # Best bid quantity 1

613

sp1: float # Best ask price 1

614

sq1: int # Best ask quantity 1

615

# ... up to 5 levels (bp1-bp5, bq1-bq5, sp1-sp5, sq1-sq5)

616

```