0
# WebSocket Streaming
1
2
Real-time market data and account updates through WebSocket connections. Provides automatic reconnection, subscription management, and comprehensive event handling for live cryptocurrency data streaming.
3
4
## Capabilities
5
6
### WebSocket Client
7
8
High-level WebSocket client for easy integration.
9
10
```python { .api }
11
class KucoinWsClient:
12
"""High-level WebSocket client for KuCoin real-time data."""
13
14
def __init__(self):
15
"""Initialize WebSocket client."""
16
17
@classmethod
18
async def create(cls, loop, client, callback, private: bool = False, sock=None):
19
"""
20
Create and initialize WebSocket client.
21
22
Args:
23
loop: Event loop for async operations
24
client: KuCoin API client instance (with authentication)
25
callback: Message callback function
26
private (bool): Enable private channel access
27
sock: Optional socket configuration
28
29
Returns:
30
KucoinWsClient: Initialized WebSocket client
31
"""
32
33
async def subscribe(self, topic: str):
34
"""
35
Subscribe to a WebSocket topic.
36
37
Args:
38
topic (str): Subscription topic (e.g., '/market/ticker:BTC-USDT')
39
"""
40
41
async def unsubscribe(self, topic: str):
42
"""
43
Unsubscribe from a WebSocket topic.
44
45
Args:
46
topic (str): Topic to unsubscribe from
47
"""
48
49
@property
50
def topics(self):
51
"""Get list of currently subscribed topics."""
52
```
53
54
### WebSocket Token Management
55
56
Manage authentication tokens for WebSocket connections.
57
58
```python { .api }
59
class GetToken:
60
"""WebSocket token management."""
61
62
def get_ws_token(self, is_private: bool = False):
63
"""
64
Get WebSocket token for public or private channels.
65
66
Args:
67
is_private (bool): Request private channel token
68
69
Returns:
70
dict: Token, endpoint, and connection parameters
71
"""
72
```
73
74
### Low-Level WebSocket Connection
75
76
Direct WebSocket connection management for advanced use cases.
77
78
```python { .api }
79
class ConnectWebsocket:
80
"""Low-level WebSocket connection handler."""
81
82
def __init__(self, loop, client, callback, private: bool = False, sock=None):
83
"""
84
Initialize WebSocket connection.
85
86
Args:
87
loop: Event loop for async operations
88
client: KuCoin API client instance
89
callback: Message callback function
90
private (bool): Enable private channel access
91
sock: Optional socket configuration
92
"""
93
94
async def send_message(self, msg: dict, retry_count: int = 0):
95
"""
96
Send message to WebSocket server.
97
98
Args:
99
msg (dict): Message to send
100
retry_count (int): Number of retry attempts
101
"""
102
103
async def send_ping(self):
104
"""Send ping message to maintain connection."""
105
106
@property
107
def topics(self):
108
"""Get list of current subscription topics."""
109
```
110
111
## Usage Examples
112
113
### Basic Market Data Streaming
114
115
```python
116
import asyncio
117
import json
118
from kucoin.ws_client import KucoinWsClient
119
from kucoin.client import Market
120
121
async def message_handler(message):
122
"""Handle incoming WebSocket messages."""
123
if 'data' in message:
124
data = message['data']
125
topic = message.get('topic', '')
126
127
if 'ticker' in topic:
128
# Handle ticker updates
129
ticker_data = data
130
print(f"Ticker Update - {ticker_data['symbol']}: ${ticker_data['price']}")
131
132
elif 'level2' in topic:
133
# Handle order book updates
134
ob_data = data
135
print(f"OrderBook Update - {ob_data['symbol']}")
136
print(f"Best Bid: {ob_data['bids'][0] if ob_data.get('bids') else 'N/A'}")
137
print(f"Best Ask: {ob_data['asks'][0] if ob_data.get('asks') else 'N/A'}")
138
139
async def main():
140
# Initialize market client for token generation
141
market = Market()
142
143
# Create WebSocket client
144
loop = asyncio.get_event_loop()
145
ws_client = await KucoinWsClient.create(loop, market, message_handler)
146
147
# Subscribe to BTC-USDT ticker
148
await ws_client.subscribe('/market/ticker:BTC-USDT')
149
150
# Subscribe to order book updates
151
await ws_client.subscribe('/market/level2:BTC-USDT')
152
153
# Keep connection alive
154
await asyncio.sleep(60) # Stream for 1 minute
155
156
# Run the async example
157
asyncio.run(main())
158
```
159
160
### Private Account Updates
161
162
```python
163
import asyncio
164
from kucoin.ws_client import KucoinWsClient
165
from kucoin.client import User
166
167
async def private_message_handler(message):
168
"""Handle private channel messages."""
169
if 'data' in message:
170
data = message['data']
171
topic = message.get('topic', '')
172
173
if 'account' in topic:
174
# Handle account balance updates
175
account_data = data
176
print(f"Balance Update - {account_data['currency']}: {account_data['available']}")
177
178
elif 'tradeOrders' in topic:
179
# Handle order updates
180
order_data = data
181
print(f"Order Update - {order_data['symbol']}: {order_data['type']} {order_data['side']}")
182
print(f"Status: {order_data['status']}, Size: {order_data['size']}")
183
184
async def private_streaming():
185
# Initialize authenticated client
186
user = User(
187
key='your-api-key',
188
secret='your-api-secret',
189
passphrase='your-passphrase',
190
is_sandbox=False
191
)
192
193
# Create WebSocket client with private channel access
194
loop = asyncio.get_event_loop()
195
ws_client = await KucoinWsClient.create(loop, user, private_message_handler, private=True)
196
197
# Subscribe to private account updates
198
await ws_client.subscribe('/account/balance')
199
200
# Subscribe to order updates
201
await ws_client.subscribe('/spotMarket/tradeOrders')
202
203
# Keep connection alive
204
await asyncio.sleep(300) # Stream for 5 minutes
205
206
# Run private streaming
207
asyncio.run(private_streaming())
208
```
209
210
### Advanced Multi-Symbol Streaming
211
212
```python
213
class KuCoinStreamer:
214
def __init__(self, symbols, api_credentials=None):
215
self.symbols = symbols
216
self.ws_client = KucoinWsClient(**api_credentials) if api_credentials else KucoinWsClient()
217
self.connections = {}
218
self.data_buffer = {}
219
220
def start_streaming(self):
221
"""Start streaming for all symbols."""
222
for symbol in self.symbols:
223
# Subscribe to ticker updates
224
ticker_topic = f'/market/ticker:{symbol}'
225
self.connections[f'{symbol}_ticker'] = self.ws_client._socket(
226
ticker_topic,
227
lambda msg, s=symbol: self.handle_ticker(msg, s)
228
)
229
230
# Subscribe to level2 order book updates
231
orderbook_topic = f'/market/level2:{symbol}'
232
self.connections[f'{symbol}_orderbook'] = self.ws_client._socket(
233
orderbook_topic,
234
lambda msg, s=symbol: self.handle_orderbook(msg, s)
235
)
236
237
def handle_ticker(self, message, symbol):
238
"""Handle ticker updates."""
239
data = json.loads(message)
240
if data['type'] == 'message':
241
ticker = data['data']
242
self.data_buffer[f'{symbol}_ticker'] = {
243
'symbol': symbol,
244
'price': float(ticker['price']),
245
'change': float(ticker['changeRate']),
246
'volume': float(ticker['vol']),
247
'timestamp': ticker['time']
248
}
249
self.process_ticker_update(symbol)
250
251
def handle_orderbook(self, message, symbol):
252
"""Handle order book updates."""
253
data = json.loads(message)
254
if data['type'] == 'message':
255
ob_data = data['data']
256
self.data_buffer[f'{symbol}_orderbook'] = {
257
'symbol': symbol,
258
'bids': ob_data['bids'][:5], # Top 5 bids
259
'asks': ob_data['asks'][:5], # Top 5 asks
260
'timestamp': ob_data['time']
261
}
262
self.process_orderbook_update(symbol)
263
264
def process_ticker_update(self, symbol):
265
"""Process ticker data for analysis."""
266
ticker = self.data_buffer.get(f'{symbol}_ticker')
267
if ticker:
268
print(f"{symbol}: ${ticker['price']} ({ticker['change']:+.2%})")
269
270
def process_orderbook_update(self, symbol):
271
"""Process order book data."""
272
ob = self.data_buffer.get(f'{symbol}_orderbook')
273
if ob and ob['bids'] and ob['asks']:
274
spread = float(ob['asks'][0][0]) - float(ob['bids'][0][0])
275
spread_pct = spread / float(ob['bids'][0][0]) * 100
276
print(f"{symbol} Spread: ${spread:.4f} ({spread_pct:.3f}%)")
277
278
def get_market_snapshot(self):
279
"""Get current market snapshot."""
280
snapshot = {}
281
for symbol in self.symbols:
282
ticker_key = f'{symbol}_ticker'
283
ob_key = f'{symbol}_orderbook'
284
285
if ticker_key in self.data_buffer and ob_key in self.data_buffer:
286
ticker = self.data_buffer[ticker_key]
287
orderbook = self.data_buffer[ob_key]
288
289
snapshot[symbol] = {
290
'price': ticker['price'],
291
'change_24h': ticker['change'],
292
'volume_24h': ticker['volume'],
293
'bid': float(orderbook['bids'][0][0]) if orderbook['bids'] else None,
294
'ask': float(orderbook['asks'][0][0]) if orderbook['asks'] else None,
295
'spread': None
296
}
297
298
if snapshot[symbol]['bid'] and snapshot[symbol]['ask']:
299
snapshot[symbol]['spread'] = snapshot[symbol]['ask'] - snapshot[symbol]['bid']
300
301
return snapshot
302
303
def close(self):
304
"""Close all connections."""
305
self.ws_client.close()
306
307
# Usage
308
symbols = ['BTC-USDT', 'ETH-USDT', 'ADA-USDT']
309
streamer = KuCoinStreamer(symbols)
310
streamer.start_streaming()
311
312
# Stream for 5 minutes
313
time.sleep(300)
314
315
# Get final snapshot
316
snapshot = streamer.get_market_snapshot()
317
print("\nMarket Snapshot:")
318
for symbol, data in snapshot.items():
319
print(f"{symbol}: ${data['price']} | Spread: ${data['spread']:.4f}")
320
321
streamer.close()
322
```
323
324
### Real-Time Trading Bot Integration
325
326
```python
327
class TradingBot:
328
def __init__(self, api_key, api_secret, api_passphrase, symbols):
329
self.ws_client = KucoinWsClient(api_key, api_secret, api_passphrase)
330
self.symbols = symbols
331
self.market_data = {}
332
self.positions = {}
333
334
def start_monitoring(self):
335
"""Start monitoring market data and account updates."""
336
# Monitor market data
337
for symbol in self.symbols:
338
topic = f'/market/ticker:{symbol}'
339
self.ws_client._socket(topic, self.handle_market_data)
340
341
# Monitor account balance changes
342
self.ws_client._socket('/account/balance', self.handle_balance_update, is_private=True)
343
344
# Monitor order execution
345
self.ws_client._socket('/spotMarket/tradeOrders', self.handle_order_update, is_private=True)
346
347
def handle_market_data(self, message):
348
"""Process market data for trading decisions."""
349
data = json.loads(message)
350
if data['type'] == 'message':
351
ticker = data['data']
352
symbol = ticker['symbol']
353
price = float(ticker['price'])
354
355
# Store market data
356
self.market_data[symbol] = {
357
'price': price,
358
'change': float(ticker['changeRate']),
359
'timestamp': ticker['time']
360
}
361
362
# Check for trading opportunities
363
self.check_trading_signals(symbol, price)
364
365
def handle_balance_update(self, message):
366
"""Handle account balance changes."""
367
data = json.loads(message)
368
if data['type'] == 'message':
369
balance_data = data['data']
370
currency = balance_data['currency']
371
available = float(balance_data['available'])
372
373
print(f"Balance Update: {currency} = {available}")
374
375
# Update position tracking
376
if currency in self.positions:
377
self.positions[currency]['balance'] = available
378
379
def handle_order_update(self, message):
380
"""Handle order execution updates."""
381
data = json.loads(message)
382
if data['type'] == 'message':
383
order_data = data['data']
384
385
print(f"Order Update: {order_data['symbol']} - {order_data['status']}")
386
387
if order_data['status'] == 'match':
388
# Order was filled
389
self.on_order_filled(order_data)
390
391
def check_trading_signals(self, symbol, price):
392
"""Check for trading opportunities."""
393
# Implement your trading logic here
394
# This is just a simple example
395
396
if symbol not in self.market_data:
397
return
398
399
# Simple momentum strategy example
400
change = self.market_data[symbol]['change']
401
402
if change > 0.05: # Price up 5%
403
print(f"Strong upward momentum detected for {symbol}")
404
# Consider buying logic here
405
406
elif change < -0.05: # Price down 5%
407
print(f"Strong downward momentum detected for {symbol}")
408
# Consider selling logic here
409
410
def on_order_filled(self, order_data):
411
"""Handle order fill events."""
412
symbol = order_data['symbol']
413
side = order_data['side']
414
size = float(order_data['dealSize'])
415
price = float(order_data['dealFunds']) / size if size > 0 else 0
416
417
print(f"Order Filled: {side} {size} {symbol} at ${price}")
418
419
# Update position tracking
420
base_currency = symbol.split('-')[0]
421
quote_currency = symbol.split('-')[1]
422
423
if side == 'buy':
424
# Bought base currency with quote currency
425
self.update_position(base_currency, size, price)
426
else:
427
# Sold base currency for quote currency
428
self.update_position(base_currency, -size, price)
429
430
def update_position(self, currency, size_change, price):
431
"""Update position tracking."""
432
if currency not in self.positions:
433
self.positions[currency] = {'size': 0, 'avg_price': 0}
434
435
pos = self.positions[currency]
436
437
if pos['size'] == 0:
438
# New position
439
pos['size'] = size_change
440
pos['avg_price'] = price
441
else:
442
# Update existing position
443
total_value = pos['size'] * pos['avg_price'] + size_change * price
444
pos['size'] += size_change
445
446
if pos['size'] != 0:
447
pos['avg_price'] = total_value / pos['size']
448
else:
449
pos['avg_price'] = 0
450
451
# # Usage (commented out for example)
452
# bot = TradingBot(api_key, api_secret, api_passphrase, ['BTC-USDT', 'ETH-USDT'])
453
# bot.start_monitoring()
454
```
455
456
## Types
457
458
```python { .api }
459
WebSocketToken = dict
460
# {
461
# "token": str, # Connection token
462
# "instanceServers": list, # Server endpoints
463
# "pingInterval": int, # Ping interval in ms
464
# "pingTimeout": int # Ping timeout in ms
465
# }
466
467
WebSocketMessage = dict
468
# {
469
# "id": str, # Message ID
470
# "type": str, # Message type ('message', 'welcome', 'ping', 'pong')
471
# "topic": str, # Subscription topic
472
# "subject": str, # Message subject
473
# "data": dict # Payload data
474
# }
475
476
TickerData = dict
477
# {
478
# "symbol": str, # Trading symbol
479
# "sequence": str, # Sequence number
480
# "price": str, # Last price
481
# "size": str, # Last size
482
# "bestAsk": str, # Best ask price
483
# "bestAskSize": str, # Best ask size
484
# "bestBid": str, # Best bid price
485
# "bestBidSize": str, # Best bid size
486
# "time": int # Timestamp
487
# }
488
489
OrderBookData = dict
490
# {
491
# "symbol": str, # Trading symbol
492
# "sequence": str, # Sequence number
493
# "asks": list, # Ask orders [[price, size], ...]
494
# "bids": list, # Bid orders [[price, size], ...]
495
# "time": int # Timestamp
496
# }
497
498
AccountBalanceData = dict
499
# {
500
# "accountId": str, # Account ID
501
# "currency": str, # Currency
502
# "total": str, # Total balance
503
# "available": str, # Available balance
504
# "holds": str, # Held balance
505
# "relationEvent": str, # Event that caused change
506
# "relationEventId": str, # Event ID
507
# "time": int # Timestamp
508
# }
509
510
OrderUpdateData = dict
511
# {
512
# "symbol": str, # Trading symbol
513
# "orderType": str, # Order type
514
# "side": str, # Order side
515
# "orderId": str, # Order ID
516
# "type": str, # Update type
517
# "orderTime": int, # Order timestamp
518
# "size": str, # Order size
519
# "filledSize": str, # Filled size
520
# "price": str, # Order price
521
# "clientOid": str, # Client order ID
522
# "remainSize": str, # Remaining size
523
# "status": str, # Order status
524
# "ts": int # Update timestamp
525
# }
526
```