0
# WebSocket Streaming
1
2
Real-time market data and account update streaming with automatic connection management, reconnection logic, and message queuing. Supports all Binance WebSocket streams including user data, market data, and futures streams.
3
4
## Capabilities
5
6
### BinanceSocketManager
7
8
Async WebSocket manager for real-time data streaming with automatic connection management.
9
10
```python { .api }
11
class BinanceSocketManager:
12
def __init__(
13
self,
14
client: AsyncClient,
15
user_timeout=KEEPALIVE_TIMEOUT,
16
max_queue_size: int = 100,
17
): ...
18
19
def symbol_ticker_socket(self, symbol: str): ...
20
def all_ticker_socket(self): ...
21
def symbol_mini_ticker_socket(self, symbol: str): ...
22
def all_mini_ticker_socket(self): ...
23
def kline_socket(self, symbol: str, interval: str): ...
24
def aggtrade_socket(self, symbol: str): ...
25
def trade_socket(self, symbol: str): ...
26
def depth_socket(self, symbol: str, depth: Optional[str] = None): ...
27
def diff_depth_socket(self, symbol: str): ...
28
def user_socket(self): ...
29
def futures_socket(self): ...
30
def options_socket(self): ...
31
```
32
33
#### Basic Usage Example
34
35
```python
36
import asyncio
37
from binance import AsyncClient, BinanceSocketManager
38
39
async def handle_socket_message(msg):
40
print(f"Received: {msg}")
41
42
async def main():
43
# Create async client and socket manager
44
client = await AsyncClient.create()
45
bm = BinanceSocketManager(client)
46
47
# Start symbol ticker stream
48
ts = bm.symbol_ticker_socket('BTCUSDT')
49
50
async with ts as tscm:
51
while True:
52
res = await tscm.recv()
53
await handle_socket_message(res)
54
55
asyncio.run(main())
56
```
57
58
### Market Data Streams
59
60
#### Individual Symbol Ticker
61
62
```python
63
async def ticker_stream():
64
client = await AsyncClient.create()
65
bm = BinanceSocketManager(client)
66
67
# 24hr ticker statistics
68
ts = bm.symbol_ticker_socket('BTCUSDT')
69
70
async with ts as tscm:
71
while True:
72
msg = await tscm.recv()
73
# Message format:
74
# {
75
# "e": "24hrTicker",
76
# "E": 1672515782136,
77
# "s": "BTCUSDT",
78
# "p": "0.0015", # Price change
79
# "P": "0.018", # Price change percent
80
# "w": "0.0018", # Weighted average price
81
# "x": "0.0009", # Previous day's close price
82
# "c": "0.0025", # Current day's close price
83
# "Q": "10", # Close quantity
84
# "b": "0.0024", # Best bid price
85
# "B": "10", # Best bid quantity
86
# "a": "0.0026", # Best ask price
87
# "A": "100", # Best ask quantity
88
# "o": "0.0010", # Open price
89
# "h": "0.0025", # High price
90
# "l": "0.0010", # Low price
91
# "v": "10000", # Total traded base asset volume
92
# "q": "18", # Total traded quote asset volume
93
# "O": 0, # Statistics open time
94
# "C": 86400000, # Statistics close time
95
# "F": 0, # First trade ID
96
# "L": 18150, # Last trade ID
97
# "n": 18151 # Total number of trades
98
# }
99
print(f"BTC Price: {msg['c']}, Change: {msg['P']}%")
100
```
101
102
#### All Symbol Tickers
103
104
```python
105
async def all_tickers_stream():
106
client = await AsyncClient.create()
107
bm = BinanceSocketManager(client)
108
109
# All 24hr ticker statistics
110
ts = bm.all_ticker_socket()
111
112
async with ts as tscm:
113
while True:
114
msg = await tscm.recv()
115
# msg is a list of all ticker data
116
btc_ticker = next((t for t in msg if t['s'] == 'BTCUSDT'), None)
117
if btc_ticker:
118
print(f"BTC: {btc_ticker['c']}")
119
```
120
121
#### Mini Tickers
122
123
```python
124
async def mini_ticker_stream():
125
client = await AsyncClient.create()
126
bm = BinanceSocketManager(client)
127
128
# Individual mini ticker (less data, more frequent)
129
ts = bm.symbol_mini_ticker_socket('BTCUSDT')
130
131
async with ts as tscm:
132
while True:
133
msg = await tscm.recv()
134
# Message format:
135
# {
136
# "e": "24hrMiniTicker",
137
# "E": 1672515782136,
138
# "s": "BTCUSDT",
139
# "c": "0.0025", # Close price
140
# "o": "0.0010", # Open price
141
# "h": "0.0025", # High price
142
# "l": "0.0010", # Low price
143
# "v": "10000", # Total traded base asset volume
144
# "q": "18" # Total traded quote asset volume
145
# }
146
print(f"BTC: O:{msg['o']} H:{msg['h']} L:{msg['l']} C:{msg['c']}")
147
```
148
149
### Kline/Candlestick Streams
150
151
```python
152
from binance import KLINE_INTERVAL_1MINUTE, KLINE_INTERVAL_1HOUR
153
154
async def kline_stream():
155
client = await AsyncClient.create()
156
bm = BinanceSocketManager(client)
157
158
# Kline stream with 1-minute interval
159
ts = bm.kline_socket('BTCUSDT', KLINE_INTERVAL_1MINUTE)
160
161
async with ts as tscm:
162
while True:
163
msg = await tscm.recv()
164
# Message format:
165
# {
166
# "e": "kline",
167
# "E": 1672515782136,
168
# "s": "BTCUSDT",
169
# "k": {
170
# "t": 1672515780000, # Kline start time
171
# "T": 1672515839999, # Kline close time
172
# "s": "BTCUSDT", # Symbol
173
# "i": "1m", # Interval
174
# "f": 100, # First trade ID
175
# "L": 200, # Last trade ID
176
# "o": "0.0010", # Open price
177
# "c": "0.0020", # Close price
178
# "h": "0.0025", # High price
179
# "l": "0.0010", # Low price
180
# "v": "1000", # Base asset volume
181
# "n": 100, # Number of trades
182
# "x": false, # Is this kline closed?
183
# "q": "1.0000", # Quote asset volume
184
# "V": "500", # Taker buy base asset volume
185
# "Q": "0.500", # Taker buy quote asset volume
186
# "B": "123456" # Ignore
187
# }
188
# }
189
kline_data = msg['k']
190
if kline_data['x']: # Only process closed klines
191
print(f"Closed kline: O:{kline_data['o']} C:{kline_data['c']} V:{kline_data['v']}")
192
```
193
194
### Trade Streams
195
196
```python
197
async def trade_stream():
198
client = await AsyncClient.create()
199
bm = BinanceSocketManager(client)
200
201
# Individual trade stream
202
ts = bm.trade_socket('BTCUSDT')
203
204
async with ts as tscm:
205
while True:
206
msg = await tscm.recv()
207
# Message format:
208
# {
209
# "e": "trade",
210
# "E": 1672515782136,
211
# "s": "BTCUSDT",
212
# "t": 12345, # Trade ID
213
# "p": "0.001", # Price
214
# "q": "100", # Quantity
215
# "b": 88, # Buyer order ID
216
# "a": 50, # Seller order ID
217
# "T": 1672515782134, # Trade time
218
# "m": true, # Is the buyer the market maker?
219
# "M": true # Ignore
220
# }
221
print(f"Trade: {msg['p']} x {msg['q']} at {msg['T']}")
222
223
async def agg_trade_stream():
224
client = await AsyncClient.create()
225
bm = BinanceSocketManager(client)
226
227
# Aggregate trade stream (combines small trades)
228
ts = bm.aggtrade_socket('BTCUSDT')
229
230
async with ts as tscm:
231
while True:
232
msg = await tscm.recv()
233
# Message format:
234
# {
235
# "e": "aggTrade",
236
# "E": 1672515782136,
237
# "s": "BTCUSDT",
238
# "a": 26129, # Aggregate trade ID
239
# "p": "0.001", # Price
240
# "q": "100", # Quantity
241
# "f": 100, # First trade ID
242
# "l": 105, # Last trade ID
243
# "T": 1672515782134, # Trade time
244
# "m": true, # Is the buyer the market maker?
245
# "M": true # Ignore
246
# }
247
print(f"Agg Trade: {msg['p']} x {msg['q']}")
248
```
249
250
### Depth/Order Book Streams
251
252
```python
253
async def depth_stream():
254
client = await AsyncClient.create()
255
bm = BinanceSocketManager(client)
256
257
# Partial book depth (top 20 levels)
258
ts = bm.depth_socket('BTCUSDT', depth='20')
259
260
async with ts as tscm:
261
while True:
262
msg = await tscm.recv()
263
# Message format:
264
# {
265
# "lastUpdateId": 160,
266
# "bids": [
267
# ["0.0024", "10"], # [price, quantity]
268
# ["0.0023", "100"]
269
# ],
270
# "asks": [
271
# ["0.0026", "100"],
272
# ["0.0027", "10"]
273
# ]
274
# }
275
print(f"Best bid: {msg['bids'][0]}, Best ask: {msg['asks'][0]}")
276
277
async def diff_depth_stream():
278
client = await AsyncClient.create()
279
bm = BinanceSocketManager(client)
280
281
# Differential depth stream (updates only)
282
ts = bm.diff_depth_socket('BTCUSDT')
283
284
async with ts as tscm:
285
while True:
286
msg = await tscm.recv()
287
# Message format:
288
# {
289
# "e": "depthUpdate",
290
# "E": 1672515782136,
291
# "s": "BTCUSDT",
292
# "U": 157, # First update ID
293
# "u": 160, # Final update ID
294
# "b": [ # Bids to be updated
295
# ["0.0024", "10"]
296
# ],
297
# "a": [ # Asks to be updated
298
# ["0.0026", "100"]
299
# ]
300
# }
301
print(f"Depth update: {len(msg['b'])} bid updates, {len(msg['a'])} ask updates")
302
```
303
304
### User Data Streams
305
306
```python
307
async def user_data_stream():
308
# Requires API key and secret
309
client = await AsyncClient.create(api_key='your_key', api_secret='your_secret')
310
bm = BinanceSocketManager(client)
311
312
# User data stream (account updates, order updates)
313
ts = bm.user_socket()
314
315
async with ts as tscm:
316
while True:
317
msg = await tscm.recv()
318
319
if msg['e'] == 'executionReport':
320
# Order update
321
print(f"Order update: {msg['s']} {msg['S']} {msg['o']} Status: {msg['X']}")
322
323
elif msg['e'] == 'outboundAccountPosition':
324
# Account balance update
325
print(f"Balance update: {msg['a']} Free: {msg['f']} Locked: {msg['l']}")
326
327
elif msg['e'] == 'balanceUpdate':
328
# Individual balance update
329
print(f"Balance change: {msg['a']} Delta: {msg['d']}")
330
```
331
332
### ThreadedWebsocketManager
333
334
Thread-based WebSocket manager for easier integration with non-async code.
335
336
```python { .api }
337
class ThreadedWebsocketManager:
338
def __init__(self, api_key: str, api_secret: str, testnet: bool = False): ...
339
340
def start(self): ...
341
def stop(self): ...
342
def start_symbol_ticker_socket(self, callback, symbol: str): ...
343
def start_all_ticker_socket(self, callback): ...
344
def start_kline_socket(self, callback, symbol: str, interval: str): ...
345
def start_depth_socket(self, callback, symbol: str, depth: Optional[str] = None): ...
346
def start_aggtrade_socket(self, callback, symbol: str): ...
347
def start_trade_socket(self, callback, symbol: str): ...
348
def start_user_socket(self, callback): ...
349
```
350
351
#### Threaded Usage Example
352
353
```python
354
from binance import ThreadedWebsocketManager
355
import time
356
357
def handle_ticker_message(msg):
358
print(f"Ticker: {msg['s']} Price: {msg['c']}")
359
360
def handle_kline_message(msg):
361
kline = msg['k']
362
if kline['x']: # Closed kline
363
print(f"Kline: {kline['s']} {kline['i']} O:{kline['o']} C:{kline['c']}")
364
365
def main():
366
# Initialize threaded manager
367
twm = ThreadedWebsocketManager(api_key='your_key', api_secret='your_secret')
368
369
# Start the manager
370
twm.start()
371
372
# Start streams
373
twm.start_symbol_ticker_socket(callback=handle_ticker_message, symbol='BTCUSDT')
374
twm.start_kline_socket(callback=handle_kline_message, symbol='BTCUSDT', interval='1m')
375
376
# Keep running
377
try:
378
while True:
379
time.sleep(1)
380
except KeyboardInterrupt:
381
print("Stopping...")
382
finally:
383
twm.stop()
384
385
if __name__ == "__main__":
386
main()
387
```
388
389
### Futures and Options Streams
390
391
```python
392
async def futures_stream():
393
client = await AsyncClient.create()
394
bm = BinanceSocketManager(client)
395
396
# Futures user data stream
397
ts = bm.futures_socket()
398
399
async with ts as tscm:
400
while True:
401
msg = await tscm.recv()
402
# Handle futures-specific messages
403
if msg['e'] == 'ACCOUNT_UPDATE':
404
print(f"Account update: {msg}")
405
elif msg['e'] == 'ORDER_TRADE_UPDATE':
406
print(f"Order update: {msg}")
407
408
async def options_stream():
409
client = await AsyncClient.create()
410
bm = BinanceSocketManager(client)
411
412
# Options user data stream
413
ts = bm.options_socket()
414
415
async with ts as tscm:
416
while True:
417
msg = await tscm.recv()
418
print(f"Options update: {msg}")
419
```
420
421
### Connection Management and Error Handling
422
423
```python
424
import asyncio
425
from binance import AsyncClient, BinanceSocketManager, BinanceWebsocketUnableToConnect
426
427
async def robust_stream():
428
client = await AsyncClient.create()
429
bm = BinanceSocketManager(client, max_queue_size=200)
430
431
while True:
432
try:
433
ts = bm.symbol_ticker_socket('BTCUSDT')
434
435
async with ts as tscm:
436
while True:
437
try:
438
msg = await asyncio.wait_for(tscm.recv(), timeout=30.0)
439
print(f"Received: {msg['c']}")
440
except asyncio.TimeoutError:
441
print("No message received in 30 seconds")
442
continue
443
except Exception as e:
444
print(f"Message processing error: {e}")
445
break
446
447
except BinanceWebsocketUnableToConnect:
448
print("Unable to connect, retrying in 5 seconds...")
449
await asyncio.sleep(5)
450
except Exception as e:
451
print(f"Stream error: {e}, retrying...")
452
await asyncio.sleep(5)
453
finally:
454
await client.close_connection()
455
```
456
457
### Stream Configuration
458
459
```python
460
# Custom configuration
461
bm = BinanceSocketManager(
462
client,
463
user_timeout=30, # User stream timeout in seconds
464
max_queue_size=500 # Maximum message queue size
465
)
466
467
# Available depths for depth streams
468
WEBSOCKET_DEPTH_5 = "5"
469
WEBSOCKET_DEPTH_10 = "10"
470
WEBSOCKET_DEPTH_20 = "20"
471
472
# Depth stream with specific level
473
ts = bm.depth_socket('BTCUSDT', depth=WEBSOCKET_DEPTH_10)
474
```
475
476
### WebSocket API Methods
477
478
Direct WebSocket API functionality for executing orders and queries through WebSocket connections with lower latency than REST API.
479
480
```python { .api }
481
def ws_create_order(self, **params): ...
482
def ws_create_test_order(self, **params): ...
483
def ws_order_limit(self, timeInForce=BaseClient.TIME_IN_FORCE_GTC, **params): ...
484
def ws_order_limit_buy(self, timeInForce=BaseClient.TIME_IN_FORCE_GTC, **params): ...
485
def ws_order_limit_sell(self, timeInForce=BaseClient.TIME_IN_FORCE_GTC, **params): ...
486
def ws_order_market(self, **params): ...
487
def ws_order_market_buy(self, **params): ...
488
def ws_order_market_sell(self, **params): ...
489
def ws_get_order(self, **params): ...
490
def ws_cancel_order(self, **params): ...
491
def ws_cancel_and_replace_order(self, **params): ...
492
def ws_get_open_orders(self, **params): ...
493
def ws_cancel_all_open_orders(self, **params): ...
494
def ws_create_oco_order(self, **params): ...
495
def ws_create_oto_order(self, **params): ...
496
def ws_create_otoco_order(self, **params): ...
497
def ws_create_sor_order(self, **params): ...
498
```
499
500
#### WebSocket API Usage Examples
501
502
```python
503
# Create market order via WebSocket API
504
ws_order = client.ws_create_order(
505
symbol='BTCUSDT',
506
side='BUY',
507
type='MARKET',
508
quantity=0.001
509
)
510
511
print(f"WebSocket Order ID: {ws_order['id']}")
512
print(f"Result: {ws_order['result']}")
513
514
# Create limit order via WebSocket API
515
ws_limit_order = client.ws_order_limit_buy(
516
symbol='BTCUSDT',
517
quantity=0.001,
518
price='45000.00'
519
)
520
521
# Create OCO (One-Cancels-Other) order via WebSocket
522
ws_oco = client.ws_create_oco_order(
523
symbol='BTCUSDT',
524
side='SELL',
525
quantity=0.001,
526
price='55000.00', # Take profit price
527
stopPrice='48000.00', # Stop loss trigger
528
stopLimitPrice='47500.00' # Stop loss limit price
529
)
530
531
# Create OTO (One-Triggers-Other) order via WebSocket
532
ws_oto = client.ws_create_oto_order(
533
symbol='BTCUSDT',
534
workingType='LIMIT',
535
workingSide='BUY',
536
workingQuantity=0.001,
537
workingPrice='45000.00',
538
pendingType='LIMIT',
539
pendingSide='SELL',
540
pendingQuantity=0.001,
541
pendingPrice='55000.00'
542
)
543
544
# Create SOR (Smart Order Routing) order via WebSocket
545
ws_sor = client.ws_create_sor_order(
546
symbol='BTCUSDT',
547
side='BUY',
548
type='LIMIT',
549
quantity=0.001,
550
price='50000.00'
551
)
552
553
# Get order status via WebSocket
554
ws_order_status = client.ws_get_order(
555
symbol='BTCUSDT',
556
orderId=12345678
557
)
558
559
# Cancel order via WebSocket
560
ws_cancel = client.ws_cancel_order(
561
symbol='BTCUSDT',
562
orderId=12345678
563
)
564
565
# Cancel and replace order atomically via WebSocket
566
ws_replace = client.ws_cancel_and_replace_order(
567
symbol='BTCUSDT',
568
side='BUY',
569
type='LIMIT',
570
cancelReplaceMode='STOP_ON_FAILURE',
571
timeInForce='GTC',
572
quantity=0.002, # New quantity
573
price='49500.00', # New price
574
cancelOrderId=12345678 # Order to cancel
575
)
576
577
# Get all open orders via WebSocket
578
ws_open_orders = client.ws_get_open_orders(symbol='BTCUSDT')
579
580
# Cancel all open orders via WebSocket
581
ws_cancel_all = client.ws_cancel_all_open_orders(symbol='BTCUSDT')
582
```
583
584
#### WebSocket API Response Format
585
586
WebSocket API responses follow a consistent format:
587
588
```python
589
{
590
"id": "request_id", # Request identifier
591
"status": 200, # HTTP status code
592
"result": { # Response data
593
"symbol": "BTCUSDT",
594
"orderId": 12345678,
595
"status": "FILLED",
596
# ... other order data
597
},
598
"rateLimits": [ # Current rate limit usage
599
{
600
"rateLimitType": "REQUEST_WEIGHT",
601
"interval": "MINUTE",
602
"intervalNum": 1,
603
"limit": 1200,
604
"count": 10
605
}
606
]
607
}
608
```
609
610
### WebSocket API vs REST API
611
612
**Advantages of WebSocket API:**
613
- Lower latency for trading operations
614
- Real-time rate limit information
615
- Persistent connection reduces overhead
616
- Better for high-frequency trading
617
618
**When to use WebSocket API:**
619
- Time-sensitive trading operations
620
- High-frequency order management
621
- Applications requiring minimal latency
622
- Systems with persistent WebSocket connections
623
624
**When to use REST API:**
625
- One-off operations
626
- Simple integrations
627
- Applications without persistent connections
628
- Non-time-critical operations
629
630
The WebSocket streaming system provides real-time access to all Binance market data and account updates with robust connection management and flexible async/threaded interfaces, plus direct WebSocket API functionality for low-latency trading operations.