CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-binance

Unofficial Python wrapper for the Binance cryptocurrency exchange REST API v3 and WebSocket APIs with comprehensive trading, market data, and account management functionality

Pending
Overview
Eval results
Files

websockets.mddocs/

WebSocket Streaming

Real-time market data and account update streaming with automatic connection management, reconnection logic, and message queuing. Supports all Binance WebSocket streams including user data, market data, and futures streams.

Capabilities

BinanceSocketManager

Async WebSocket manager for real-time data streaming with automatic connection management.

class BinanceSocketManager:
    def __init__(
        self, 
        client: AsyncClient, 
        user_timeout=KEEPALIVE_TIMEOUT,
        max_queue_size: int = 100,
    ): ...
    
    def symbol_ticker_socket(self, symbol: str): ...
    def all_ticker_socket(self): ...
    def symbol_mini_ticker_socket(self, symbol: str): ...
    def all_mini_ticker_socket(self): ...
    def kline_socket(self, symbol: str, interval: str): ...
    def aggtrade_socket(self, symbol: str): ...
    def trade_socket(self, symbol: str): ...
    def depth_socket(self, symbol: str, depth: Optional[str] = None): ...
    def diff_depth_socket(self, symbol: str): ...
    def user_socket(self): ...
    def futures_socket(self): ...
    def options_socket(self): ...

Basic Usage Example

import asyncio
from binance import AsyncClient, BinanceSocketManager

async def handle_socket_message(msg):
    print(f"Received: {msg}")

async def main():
    # Create async client and socket manager
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Start symbol ticker stream
    ts = bm.symbol_ticker_socket('BTCUSDT')
    
    async with ts as tscm:
        while True:
            res = await tscm.recv()
            await handle_socket_message(res)

asyncio.run(main())

Market Data Streams

Individual Symbol Ticker

async def ticker_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # 24hr ticker statistics
    ts = bm.symbol_ticker_socket('BTCUSDT')
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "e": "24hrTicker",
            #   "E": 1672515782136,
            #   "s": "BTCUSDT",
            #   "p": "0.0015",  # Price change
            #   "P": "0.018",   # Price change percent
            #   "w": "0.0018",  # Weighted average price
            #   "x": "0.0009",  # Previous day's close price
            #   "c": "0.0025",  # Current day's close price
            #   "Q": "10",      # Close quantity
            #   "b": "0.0024",  # Best bid price
            #   "B": "10",      # Best bid quantity
            #   "a": "0.0026",  # Best ask price
            #   "A": "100",     # Best ask quantity
            #   "o": "0.0010",  # Open price
            #   "h": "0.0025",  # High price
            #   "l": "0.0010",  # Low price
            #   "v": "10000",   # Total traded base asset volume
            #   "q": "18",      # Total traded quote asset volume
            #   "O": 0,         # Statistics open time
            #   "C": 86400000,  # Statistics close time
            #   "F": 0,         # First trade ID
            #   "L": 18150,     # Last trade ID
            #   "n": 18151      # Total number of trades
            # }
            print(f"BTC Price: {msg['c']}, Change: {msg['P']}%")

All Symbol Tickers

async def all_tickers_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # All 24hr ticker statistics
    ts = bm.all_ticker_socket()
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # msg is a list of all ticker data
            btc_ticker = next((t for t in msg if t['s'] == 'BTCUSDT'), None)
            if btc_ticker:
                print(f"BTC: {btc_ticker['c']}")

Mini Tickers

async def mini_ticker_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Individual mini ticker (less data, more frequent)
    ts = bm.symbol_mini_ticker_socket('BTCUSDT')
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "e": "24hrMiniTicker",
            #   "E": 1672515782136,
            #   "s": "BTCUSDT",
            #   "c": "0.0025",  # Close price
            #   "o": "0.0010",  # Open price
            #   "h": "0.0025",  # High price
            #   "l": "0.0010",  # Low price
            #   "v": "10000",   # Total traded base asset volume
            #   "q": "18"       # Total traded quote asset volume
            # }
            print(f"BTC: O:{msg['o']} H:{msg['h']} L:{msg['l']} C:{msg['c']}")

Kline/Candlestick Streams

from binance import KLINE_INTERVAL_1MINUTE, KLINE_INTERVAL_1HOUR

async def kline_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Kline stream with 1-minute interval
    ts = bm.kline_socket('BTCUSDT', KLINE_INTERVAL_1MINUTE)
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "e": "kline",
            #   "E": 1672515782136,
            #   "s": "BTCUSDT",
            #   "k": {
            #     "t": 1672515780000,  # Kline start time
            #     "T": 1672515839999,  # Kline close time
            #     "s": "BTCUSDT",      # Symbol
            #     "i": "1m",           # Interval
            #     "f": 100,            # First trade ID
            #     "L": 200,            # Last trade ID
            #     "o": "0.0010",       # Open price
            #     "c": "0.0020",       # Close price
            #     "h": "0.0025",       # High price
            #     "l": "0.0010",       # Low price
            #     "v": "1000",         # Base asset volume
            #     "n": 100,            # Number of trades
            #     "x": false,          # Is this kline closed?
            #     "q": "1.0000",       # Quote asset volume
            #     "V": "500",          # Taker buy base asset volume
            #     "Q": "0.500",        # Taker buy quote asset volume
            #     "B": "123456"        # Ignore
            #   }
            # }
            kline_data = msg['k']
            if kline_data['x']:  # Only process closed klines
                print(f"Closed kline: O:{kline_data['o']} C:{kline_data['c']} V:{kline_data['v']}")

Trade Streams

async def trade_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Individual trade stream
    ts = bm.trade_socket('BTCUSDT')
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "e": "trade",
            #   "E": 1672515782136,
            #   "s": "BTCUSDT",
            #   "t": 12345,      # Trade ID
            #   "p": "0.001",    # Price
            #   "q": "100",      # Quantity
            #   "b": 88,         # Buyer order ID
            #   "a": 50,         # Seller order ID
            #   "T": 1672515782134, # Trade time
            #   "m": true,       # Is the buyer the market maker?
            #   "M": true        # Ignore
            # }
            print(f"Trade: {msg['p']} x {msg['q']} at {msg['T']}")

async def agg_trade_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Aggregate trade stream (combines small trades)
    ts = bm.aggtrade_socket('BTCUSDT')
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "e": "aggTrade",
            #   "E": 1672515782136,
            #   "s": "BTCUSDT",
            #   "a": 26129,      # Aggregate trade ID
            #   "p": "0.001",    # Price
            #   "q": "100",      # Quantity
            #   "f": 100,        # First trade ID
            #   "l": 105,        # Last trade ID
            #   "T": 1672515782134, # Trade time
            #   "m": true,       # Is the buyer the market maker?
            #   "M": true        # Ignore
            # }
            print(f"Agg Trade: {msg['p']} x {msg['q']}")

Depth/Order Book Streams

async def depth_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Partial book depth (top 20 levels)
    ts = bm.depth_socket('BTCUSDT', depth='20')
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "lastUpdateId": 160,
            #   "bids": [
            #     ["0.0024", "10"],  # [price, quantity]
            #     ["0.0023", "100"]
            #   ],
            #   "asks": [
            #     ["0.0026", "100"],
            #     ["0.0027", "10"]
            #   ]
            # }
            print(f"Best bid: {msg['bids'][0]}, Best ask: {msg['asks'][0]}")

async def diff_depth_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Differential depth stream (updates only)
    ts = bm.diff_depth_socket('BTCUSDT')
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Message format:
            # {
            #   "e": "depthUpdate",
            #   "E": 1672515782136,
            #   "s": "BTCUSDT",
            #   "U": 157,        # First update ID
            #   "u": 160,        # Final update ID
            #   "b": [           # Bids to be updated
            #     ["0.0024", "10"]
            #   ],
            #   "a": [           # Asks to be updated
            #     ["0.0026", "100"]
            #   ]
            # }
            print(f"Depth update: {len(msg['b'])} bid updates, {len(msg['a'])} ask updates")

User Data Streams

async def user_data_stream():
    # Requires API key and secret
    client = await AsyncClient.create(api_key='your_key', api_secret='your_secret')
    bm = BinanceSocketManager(client)
    
    # User data stream (account updates, order updates)
    ts = bm.user_socket()
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            
            if msg['e'] == 'executionReport':
                # Order update
                print(f"Order update: {msg['s']} {msg['S']} {msg['o']} Status: {msg['X']}")
                
            elif msg['e'] == 'outboundAccountPosition':
                # Account balance update
                print(f"Balance update: {msg['a']} Free: {msg['f']} Locked: {msg['l']}")
                
            elif msg['e'] == 'balanceUpdate':
                # Individual balance update
                print(f"Balance change: {msg['a']} Delta: {msg['d']}")

ThreadedWebsocketManager

Thread-based WebSocket manager for easier integration with non-async code.

class ThreadedWebsocketManager:
    def __init__(self, api_key: str, api_secret: str, testnet: bool = False): ...
    
    def start(self): ...
    def stop(self): ...
    def start_symbol_ticker_socket(self, callback, symbol: str): ...
    def start_all_ticker_socket(self, callback): ...
    def start_kline_socket(self, callback, symbol: str, interval: str): ...
    def start_depth_socket(self, callback, symbol: str, depth: Optional[str] = None): ...
    def start_aggtrade_socket(self, callback, symbol: str): ...
    def start_trade_socket(self, callback, symbol: str): ...
    def start_user_socket(self, callback): ...

Threaded Usage Example

from binance import ThreadedWebsocketManager
import time

def handle_ticker_message(msg):
    print(f"Ticker: {msg['s']} Price: {msg['c']}")

def handle_kline_message(msg):
    kline = msg['k']
    if kline['x']:  # Closed kline
        print(f"Kline: {kline['s']} {kline['i']} O:{kline['o']} C:{kline['c']}")

def main():
    # Initialize threaded manager
    twm = ThreadedWebsocketManager(api_key='your_key', api_secret='your_secret')
    
    # Start the manager
    twm.start()
    
    # Start streams
    twm.start_symbol_ticker_socket(callback=handle_ticker_message, symbol='BTCUSDT')
    twm.start_kline_socket(callback=handle_kline_message, symbol='BTCUSDT', interval='1m')
    
    # Keep running
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("Stopping...")
    finally:
        twm.stop()

if __name__ == "__main__":
    main()

Futures and Options Streams

async def futures_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Futures user data stream
    ts = bm.futures_socket()
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            # Handle futures-specific messages
            if msg['e'] == 'ACCOUNT_UPDATE':
                print(f"Account update: {msg}")
            elif msg['e'] == 'ORDER_TRADE_UPDATE':
                print(f"Order update: {msg}")

async def options_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    
    # Options user data stream
    ts = bm.options_socket()
    
    async with ts as tscm:
        while True:
            msg = await tscm.recv()
            print(f"Options update: {msg}")

Connection Management and Error Handling

import asyncio
from binance import AsyncClient, BinanceSocketManager, BinanceWebsocketUnableToConnect

async def robust_stream():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client, max_queue_size=200)
    
    while True:
        try:
            ts = bm.symbol_ticker_socket('BTCUSDT')
            
            async with ts as tscm:
                while True:
                    try:
                        msg = await asyncio.wait_for(tscm.recv(), timeout=30.0)
                        print(f"Received: {msg['c']}")
                    except asyncio.TimeoutError:
                        print("No message received in 30 seconds")
                        continue
                    except Exception as e:
                        print(f"Message processing error: {e}")
                        break
                        
        except BinanceWebsocketUnableToConnect:
            print("Unable to connect, retrying in 5 seconds...")
            await asyncio.sleep(5)
        except Exception as e:
            print(f"Stream error: {e}, retrying...")
            await asyncio.sleep(5)
        finally:
            await client.close_connection()

Stream Configuration

# Custom configuration
bm = BinanceSocketManager(
    client,
    user_timeout=30,  # User stream timeout in seconds
    max_queue_size=500  # Maximum message queue size
)

# Available depths for depth streams
WEBSOCKET_DEPTH_5 = "5"
WEBSOCKET_DEPTH_10 = "10" 
WEBSOCKET_DEPTH_20 = "20"

# Depth stream with specific level
ts = bm.depth_socket('BTCUSDT', depth=WEBSOCKET_DEPTH_10)

WebSocket API Methods

Direct WebSocket API functionality for executing orders and queries through WebSocket connections with lower latency than REST API.

def ws_create_order(self, **params): ...
def ws_create_test_order(self, **params): ...
def ws_order_limit(self, timeInForce=BaseClient.TIME_IN_FORCE_GTC, **params): ...
def ws_order_limit_buy(self, timeInForce=BaseClient.TIME_IN_FORCE_GTC, **params): ...
def ws_order_limit_sell(self, timeInForce=BaseClient.TIME_IN_FORCE_GTC, **params): ...
def ws_order_market(self, **params): ...
def ws_order_market_buy(self, **params): ...
def ws_order_market_sell(self, **params): ...
def ws_get_order(self, **params): ...
def ws_cancel_order(self, **params): ...
def ws_cancel_and_replace_order(self, **params): ...
def ws_get_open_orders(self, **params): ...
def ws_cancel_all_open_orders(self, **params): ...
def ws_create_oco_order(self, **params): ...
def ws_create_oto_order(self, **params): ...
def ws_create_otoco_order(self, **params): ...
def ws_create_sor_order(self, **params): ...

WebSocket API Usage Examples

# Create market order via WebSocket API
ws_order = client.ws_create_order(
    symbol='BTCUSDT',
    side='BUY',
    type='MARKET',
    quantity=0.001
)

print(f"WebSocket Order ID: {ws_order['id']}")
print(f"Result: {ws_order['result']}")

# Create limit order via WebSocket API
ws_limit_order = client.ws_order_limit_buy(
    symbol='BTCUSDT',
    quantity=0.001,
    price='45000.00'
)

# Create OCO (One-Cancels-Other) order via WebSocket
ws_oco = client.ws_create_oco_order(
    symbol='BTCUSDT',
    side='SELL',
    quantity=0.001,
    price='55000.00',        # Take profit price
    stopPrice='48000.00',    # Stop loss trigger
    stopLimitPrice='47500.00'  # Stop loss limit price
)

# Create OTO (One-Triggers-Other) order via WebSocket
ws_oto = client.ws_create_oto_order(
    symbol='BTCUSDT',
    workingType='LIMIT',
    workingSide='BUY',
    workingQuantity=0.001,
    workingPrice='45000.00',
    pendingType='LIMIT',
    pendingSide='SELL',
    pendingQuantity=0.001,
    pendingPrice='55000.00'
)

# Create SOR (Smart Order Routing) order via WebSocket
ws_sor = client.ws_create_sor_order(
    symbol='BTCUSDT',
    side='BUY',
    type='LIMIT',
    quantity=0.001,
    price='50000.00'
)

# Get order status via WebSocket
ws_order_status = client.ws_get_order(
    symbol='BTCUSDT',
    orderId=12345678
)

# Cancel order via WebSocket
ws_cancel = client.ws_cancel_order(
    symbol='BTCUSDT',
    orderId=12345678
)

# Cancel and replace order atomically via WebSocket
ws_replace = client.ws_cancel_and_replace_order(
    symbol='BTCUSDT',
    side='BUY',
    type='LIMIT',
    cancelReplaceMode='STOP_ON_FAILURE',
    timeInForce='GTC',
    quantity=0.002,  # New quantity
    price='49500.00',  # New price
    cancelOrderId=12345678  # Order to cancel
)

# Get all open orders via WebSocket
ws_open_orders = client.ws_get_open_orders(symbol='BTCUSDT')

# Cancel all open orders via WebSocket
ws_cancel_all = client.ws_cancel_all_open_orders(symbol='BTCUSDT')

WebSocket API Response Format

WebSocket API responses follow a consistent format:

{
    "id": "request_id",           # Request identifier
    "status": 200,                # HTTP status code
    "result": {                   # Response data
        "symbol": "BTCUSDT",
        "orderId": 12345678,
        "status": "FILLED",
        # ... other order data
    },
    "rateLimits": [              # Current rate limit usage
        {
            "rateLimitType": "REQUEST_WEIGHT",
            "interval": "MINUTE",
            "intervalNum": 1,
            "limit": 1200,
            "count": 10
        }
    ]
}

WebSocket API vs REST API

Advantages of WebSocket API:

  • Lower latency for trading operations
  • Real-time rate limit information
  • Persistent connection reduces overhead
  • Better for high-frequency trading

When to use WebSocket API:

  • Time-sensitive trading operations
  • High-frequency order management
  • Applications requiring minimal latency
  • Systems with persistent WebSocket connections

When to use REST API:

  • One-off operations
  • Simple integrations
  • Applications without persistent connections
  • Non-time-critical operations

The WebSocket streaming system provides real-time access to all Binance market data and account updates with robust connection management and flexible async/threaded interfaces, plus direct WebSocket API functionality for low-latency trading operations.

Install with Tessl CLI

npx tessl i tessl/pypi-python-binance

docs

account.md

convert-api.md

depth-cache.md

futures.md

index.md

margin.md

market-data.md

rest-clients.md

staking-mining.md

trading.md

websockets.md

tile.json