or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

account-management.mdearn-products.mdindex.mdlending.mdmargin-trading.mdmarket-data.mdspot-trading.mdwebsocket.md

websocket.mddocs/

0

# WebSocket Streaming

1

2

Real-time market data and account updates through WebSocket connections. Provides automatic reconnection, subscription management, and comprehensive event handling for live cryptocurrency data streaming.

3

4

## Capabilities

5

6

### WebSocket Client

7

8

High-level WebSocket client for easy integration.

9

10

```python { .api }

11

class KucoinWsClient:

12

"""High-level WebSocket client for KuCoin real-time data."""

13

14

def __init__(self):

15

"""Initialize WebSocket client."""

16

17

@classmethod

18

async def create(cls, loop, client, callback, private: bool = False, sock=None):

19

"""

20

Create and initialize WebSocket client.

21

22

Args:

23

loop: Event loop for async operations

24

client: KuCoin API client instance (with authentication)

25

callback: Message callback function

26

private (bool): Enable private channel access

27

sock: Optional socket configuration

28

29

Returns:

30

KucoinWsClient: Initialized WebSocket client

31

"""

32

33

async def subscribe(self, topic: str):

34

"""

35

Subscribe to a WebSocket topic.

36

37

Args:

38

topic (str): Subscription topic (e.g., '/market/ticker:BTC-USDT')

39

"""

40

41

async def unsubscribe(self, topic: str):

42

"""

43

Unsubscribe from a WebSocket topic.

44

45

Args:

46

topic (str): Topic to unsubscribe from

47

"""

48

49

@property

50

def topics(self):

51

"""Get list of currently subscribed topics."""

52

```

53

54

### WebSocket Token Management

55

56

Manage authentication tokens for WebSocket connections.

57

58

```python { .api }

59

class GetToken:

60

"""WebSocket token management."""

61

62

def get_ws_token(self, is_private: bool = False):

63

"""

64

Get WebSocket token for public or private channels.

65

66

Args:

67

is_private (bool): Request private channel token

68

69

Returns:

70

dict: Token, endpoint, and connection parameters

71

"""

72

```

73

74

### Low-Level WebSocket Connection

75

76

Direct WebSocket connection management for advanced use cases.

77

78

```python { .api }

79

class ConnectWebsocket:

80

"""Low-level WebSocket connection handler."""

81

82

def __init__(self, loop, client, callback, private: bool = False, sock=None):

83

"""

84

Initialize WebSocket connection.

85

86

Args:

87

loop: Event loop for async operations

88

client: KuCoin API client instance

89

callback: Message callback function

90

private (bool): Enable private channel access

91

sock: Optional socket configuration

92

"""

93

94

async def send_message(self, msg: dict, retry_count: int = 0):

95

"""

96

Send message to WebSocket server.

97

98

Args:

99

msg (dict): Message to send

100

retry_count (int): Number of retry attempts

101

"""

102

103

async def send_ping(self):

104

"""Send ping message to maintain connection."""

105

106

@property

107

def topics(self):

108

"""Get list of current subscription topics."""

109

```

110

111

## Usage Examples

112

113

### Basic Market Data Streaming

114

115

```python

116

import asyncio

117

import json

118

from kucoin.ws_client import KucoinWsClient

119

from kucoin.client import Market

120

121

async def message_handler(message):

122

"""Handle incoming WebSocket messages."""

123

if 'data' in message:

124

data = message['data']

125

topic = message.get('topic', '')

126

127

if 'ticker' in topic:

128

# Handle ticker updates

129

ticker_data = data

130

print(f"Ticker Update - {ticker_data['symbol']}: ${ticker_data['price']}")

131

132

elif 'level2' in topic:

133

# Handle order book updates

134

ob_data = data

135

print(f"OrderBook Update - {ob_data['symbol']}")

136

print(f"Best Bid: {ob_data['bids'][0] if ob_data.get('bids') else 'N/A'}")

137

print(f"Best Ask: {ob_data['asks'][0] if ob_data.get('asks') else 'N/A'}")

138

139

async def main():

140

# Initialize market client for token generation

141

market = Market()

142

143

# Create WebSocket client

144

loop = asyncio.get_event_loop()

145

ws_client = await KucoinWsClient.create(loop, market, message_handler)

146

147

# Subscribe to BTC-USDT ticker

148

await ws_client.subscribe('/market/ticker:BTC-USDT')

149

150

# Subscribe to order book updates

151

await ws_client.subscribe('/market/level2:BTC-USDT')

152

153

# Keep connection alive

154

await asyncio.sleep(60) # Stream for 1 minute

155

156

# Run the async example

157

asyncio.run(main())

158

```

159

160

### Private Account Updates

161

162

```python

163

import asyncio

164

from kucoin.ws_client import KucoinWsClient

165

from kucoin.client import User

166

167

async def private_message_handler(message):

168

"""Handle private channel messages."""

169

if 'data' in message:

170

data = message['data']

171

topic = message.get('topic', '')

172

173

if 'account' in topic:

174

# Handle account balance updates

175

account_data = data

176

print(f"Balance Update - {account_data['currency']}: {account_data['available']}")

177

178

elif 'tradeOrders' in topic:

179

# Handle order updates

180

order_data = data

181

print(f"Order Update - {order_data['symbol']}: {order_data['type']} {order_data['side']}")

182

print(f"Status: {order_data['status']}, Size: {order_data['size']}")

183

184

async def private_streaming():

185

# Initialize authenticated client

186

user = User(

187

key='your-api-key',

188

secret='your-api-secret',

189

passphrase='your-passphrase',

190

is_sandbox=False

191

)

192

193

# Create WebSocket client with private channel access

194

loop = asyncio.get_event_loop()

195

ws_client = await KucoinWsClient.create(loop, user, private_message_handler, private=True)

196

197

# Subscribe to private account updates

198

await ws_client.subscribe('/account/balance')

199

200

# Subscribe to order updates

201

await ws_client.subscribe('/spotMarket/tradeOrders')

202

203

# Keep connection alive

204

await asyncio.sleep(300) # Stream for 5 minutes

205

206

# Run private streaming

207

asyncio.run(private_streaming())

208

```

209

210

### Advanced Multi-Symbol Streaming

211

212

```python

213

class KuCoinStreamer:

214

def __init__(self, symbols, api_credentials=None):

215

self.symbols = symbols

216

self.ws_client = KucoinWsClient(**api_credentials) if api_credentials else KucoinWsClient()

217

self.connections = {}

218

self.data_buffer = {}

219

220

def start_streaming(self):

221

"""Start streaming for all symbols."""

222

for symbol in self.symbols:

223

# Subscribe to ticker updates

224

ticker_topic = f'/market/ticker:{symbol}'

225

self.connections[f'{symbol}_ticker'] = self.ws_client._socket(

226

ticker_topic,

227

lambda msg, s=symbol: self.handle_ticker(msg, s)

228

)

229

230

# Subscribe to level2 order book updates

231

orderbook_topic = f'/market/level2:{symbol}'

232

self.connections[f'{symbol}_orderbook'] = self.ws_client._socket(

233

orderbook_topic,

234

lambda msg, s=symbol: self.handle_orderbook(msg, s)

235

)

236

237

def handle_ticker(self, message, symbol):

238

"""Handle ticker updates."""

239

data = json.loads(message)

240

if data['type'] == 'message':

241

ticker = data['data']

242

self.data_buffer[f'{symbol}_ticker'] = {

243

'symbol': symbol,

244

'price': float(ticker['price']),

245

'change': float(ticker['changeRate']),

246

'volume': float(ticker['vol']),

247

'timestamp': ticker['time']

248

}

249

self.process_ticker_update(symbol)

250

251

def handle_orderbook(self, message, symbol):

252

"""Handle order book updates."""

253

data = json.loads(message)

254

if data['type'] == 'message':

255

ob_data = data['data']

256

self.data_buffer[f'{symbol}_orderbook'] = {

257

'symbol': symbol,

258

'bids': ob_data['bids'][:5], # Top 5 bids

259

'asks': ob_data['asks'][:5], # Top 5 asks

260

'timestamp': ob_data['time']

261

}

262

self.process_orderbook_update(symbol)

263

264

def process_ticker_update(self, symbol):

265

"""Process ticker data for analysis."""

266

ticker = self.data_buffer.get(f'{symbol}_ticker')

267

if ticker:

268

print(f"{symbol}: ${ticker['price']} ({ticker['change']:+.2%})")

269

270

def process_orderbook_update(self, symbol):

271

"""Process order book data."""

272

ob = self.data_buffer.get(f'{symbol}_orderbook')

273

if ob and ob['bids'] and ob['asks']:

274

spread = float(ob['asks'][0][0]) - float(ob['bids'][0][0])

275

spread_pct = spread / float(ob['bids'][0][0]) * 100

276

print(f"{symbol} Spread: ${spread:.4f} ({spread_pct:.3f}%)")

277

278

def get_market_snapshot(self):

279

"""Get current market snapshot."""

280

snapshot = {}

281

for symbol in self.symbols:

282

ticker_key = f'{symbol}_ticker'

283

ob_key = f'{symbol}_orderbook'

284

285

if ticker_key in self.data_buffer and ob_key in self.data_buffer:

286

ticker = self.data_buffer[ticker_key]

287

orderbook = self.data_buffer[ob_key]

288

289

snapshot[symbol] = {

290

'price': ticker['price'],

291

'change_24h': ticker['change'],

292

'volume_24h': ticker['volume'],

293

'bid': float(orderbook['bids'][0][0]) if orderbook['bids'] else None,

294

'ask': float(orderbook['asks'][0][0]) if orderbook['asks'] else None,

295

'spread': None

296

}

297

298

if snapshot[symbol]['bid'] and snapshot[symbol]['ask']:

299

snapshot[symbol]['spread'] = snapshot[symbol]['ask'] - snapshot[symbol]['bid']

300

301

return snapshot

302

303

def close(self):

304

"""Close all connections."""

305

self.ws_client.close()

306

307

# Usage

308

symbols = ['BTC-USDT', 'ETH-USDT', 'ADA-USDT']

309

streamer = KuCoinStreamer(symbols)

310

streamer.start_streaming()

311

312

# Stream for 5 minutes

313

time.sleep(300)

314

315

# Get final snapshot

316

snapshot = streamer.get_market_snapshot()

317

print("\nMarket Snapshot:")

318

for symbol, data in snapshot.items():

319

print(f"{symbol}: ${data['price']} | Spread: ${data['spread']:.4f}")

320

321

streamer.close()

322

```

323

324

### Real-Time Trading Bot Integration

325

326

```python

327

class TradingBot:

328

def __init__(self, api_key, api_secret, api_passphrase, symbols):

329

self.ws_client = KucoinWsClient(api_key, api_secret, api_passphrase)

330

self.symbols = symbols

331

self.market_data = {}

332

self.positions = {}

333

334

def start_monitoring(self):

335

"""Start monitoring market data and account updates."""

336

# Monitor market data

337

for symbol in self.symbols:

338

topic = f'/market/ticker:{symbol}'

339

self.ws_client._socket(topic, self.handle_market_data)

340

341

# Monitor account balance changes

342

self.ws_client._socket('/account/balance', self.handle_balance_update, is_private=True)

343

344

# Monitor order execution

345

self.ws_client._socket('/spotMarket/tradeOrders', self.handle_order_update, is_private=True)

346

347

def handle_market_data(self, message):

348

"""Process market data for trading decisions."""

349

data = json.loads(message)

350

if data['type'] == 'message':

351

ticker = data['data']

352

symbol = ticker['symbol']

353

price = float(ticker['price'])

354

355

# Store market data

356

self.market_data[symbol] = {

357

'price': price,

358

'change': float(ticker['changeRate']),

359

'timestamp': ticker['time']

360

}

361

362

# Check for trading opportunities

363

self.check_trading_signals(symbol, price)

364

365

def handle_balance_update(self, message):

366

"""Handle account balance changes."""

367

data = json.loads(message)

368

if data['type'] == 'message':

369

balance_data = data['data']

370

currency = balance_data['currency']

371

available = float(balance_data['available'])

372

373

print(f"Balance Update: {currency} = {available}")

374

375

# Update position tracking

376

if currency in self.positions:

377

self.positions[currency]['balance'] = available

378

379

def handle_order_update(self, message):

380

"""Handle order execution updates."""

381

data = json.loads(message)

382

if data['type'] == 'message':

383

order_data = data['data']

384

385

print(f"Order Update: {order_data['symbol']} - {order_data['status']}")

386

387

if order_data['status'] == 'match':

388

# Order was filled

389

self.on_order_filled(order_data)

390

391

def check_trading_signals(self, symbol, price):

392

"""Check for trading opportunities."""

393

# Implement your trading logic here

394

# This is just a simple example

395

396

if symbol not in self.market_data:

397

return

398

399

# Simple momentum strategy example

400

change = self.market_data[symbol]['change']

401

402

if change > 0.05: # Price up 5%

403

print(f"Strong upward momentum detected for {symbol}")

404

# Consider buying logic here

405

406

elif change < -0.05: # Price down 5%

407

print(f"Strong downward momentum detected for {symbol}")

408

# Consider selling logic here

409

410

def on_order_filled(self, order_data):

411

"""Handle order fill events."""

412

symbol = order_data['symbol']

413

side = order_data['side']

414

size = float(order_data['dealSize'])

415

price = float(order_data['dealFunds']) / size if size > 0 else 0

416

417

print(f"Order Filled: {side} {size} {symbol} at ${price}")

418

419

# Update position tracking

420

base_currency = symbol.split('-')[0]

421

quote_currency = symbol.split('-')[1]

422

423

if side == 'buy':

424

# Bought base currency with quote currency

425

self.update_position(base_currency, size, price)

426

else:

427

# Sold base currency for quote currency

428

self.update_position(base_currency, -size, price)

429

430

def update_position(self, currency, size_change, price):

431

"""Update position tracking."""

432

if currency not in self.positions:

433

self.positions[currency] = {'size': 0, 'avg_price': 0}

434

435

pos = self.positions[currency]

436

437

if pos['size'] == 0:

438

# New position

439

pos['size'] = size_change

440

pos['avg_price'] = price

441

else:

442

# Update existing position

443

total_value = pos['size'] * pos['avg_price'] + size_change * price

444

pos['size'] += size_change

445

446

if pos['size'] != 0:

447

pos['avg_price'] = total_value / pos['size']

448

else:

449

pos['avg_price'] = 0

450

451

# # Usage (commented out for example)

452

# bot = TradingBot(api_key, api_secret, api_passphrase, ['BTC-USDT', 'ETH-USDT'])

453

# bot.start_monitoring()

454

```

455

456

## Types

457

458

```python { .api }

459

WebSocketToken = dict

460

# {

461

# "token": str, # Connection token

462

# "instanceServers": list, # Server endpoints

463

# "pingInterval": int, # Ping interval in ms

464

# "pingTimeout": int # Ping timeout in ms

465

# }

466

467

WebSocketMessage = dict

468

# {

469

# "id": str, # Message ID

470

# "type": str, # Message type ('message', 'welcome', 'ping', 'pong')

471

# "topic": str, # Subscription topic

472

# "subject": str, # Message subject

473

# "data": dict # Payload data

474

# }

475

476

TickerData = dict

477

# {

478

# "symbol": str, # Trading symbol

479

# "sequence": str, # Sequence number

480

# "price": str, # Last price

481

# "size": str, # Last size

482

# "bestAsk": str, # Best ask price

483

# "bestAskSize": str, # Best ask size

484

# "bestBid": str, # Best bid price

485

# "bestBidSize": str, # Best bid size

486

# "time": int # Timestamp

487

# }

488

489

OrderBookData = dict

490

# {

491

# "symbol": str, # Trading symbol

492

# "sequence": str, # Sequence number

493

# "asks": list, # Ask orders [[price, size], ...]

494

# "bids": list, # Bid orders [[price, size], ...]

495

# "time": int # Timestamp

496

# }

497

498

AccountBalanceData = dict

499

# {

500

# "accountId": str, # Account ID

501

# "currency": str, # Currency

502

# "total": str, # Total balance

503

# "available": str, # Available balance

504

# "holds": str, # Held balance

505

# "relationEvent": str, # Event that caused change

506

# "relationEventId": str, # Event ID

507

# "time": int # Timestamp

508

# }

509

510

OrderUpdateData = dict

511

# {

512

# "symbol": str, # Trading symbol

513

# "orderType": str, # Order type

514

# "side": str, # Order side

515

# "orderId": str, # Order ID

516

# "type": str, # Update type

517

# "orderTime": int, # Order timestamp

518

# "size": str, # Order size

519

# "filledSize": str, # Filled size

520

# "price": str, # Order price

521

# "clientOid": str, # Client order ID

522

# "remainSize": str, # Remaining size

523

# "status": str, # Order status

524

# "ts": int # Update timestamp

525

# }

526

```