CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-binance

Unofficial Python wrapper for the Binance cryptocurrency exchange REST API v3 and WebSocket APIs with comprehensive trading, market data, and account management functionality

Pending
Overview
Eval results
Files

depth-cache.mddocs/

Depth Caching

Efficient real-time order book management with automatic synchronization from WebSocket streams. Provides sorted bid/ask access and maintains consistent order book state with configurable precision and conversion types.

Capabilities

DepthCacheManager

Main class for managing real-time order book caches with automatic WebSocket synchronization.

class DepthCacheManager:
    def __init__(
        self,
        client: AsyncClient,
        symbol: str,
        refresh_interval: Optional[int] = None,
        bm: Optional[BinanceSocketManager] = None,
        limit: int = 500,
        conv_type: Callable = float
    ): ...
    
    async def __aenter__(self): ...
    async def __aexit__(self, exc_type, exc_val, exc_tb): ...
    def get_depth_cache(self) -> DepthCache: ...

Usage Examples

import asyncio
from binance import AsyncClient, DepthCacheManager

async def main():
    # Initialize async client
    client = await AsyncClient.create()
    
    try:
        # Create depth cache manager
        async with DepthCacheManager(client, symbol='BTCUSDT') as dcm:
            # Get the depth cache
            depth_cache = dcm.get_depth_cache()
            
            # Monitor order book changes
            for i in range(100):
                await asyncio.sleep(1)
                
                # Get current best bid/ask
                bids = depth_cache.get_bids()[:5]  # Top 5 bids
                asks = depth_cache.get_asks()[:5]  # Top 5 asks
                
                if bids and asks:
                    print(f"Best bid: {bids[0][0]} ({bids[0][1]})")
                    print(f"Best ask: {asks[0][0]} ({asks[0][1]})")
                    print(f"Spread: {float(asks[0][0]) - float(bids[0][0]):.2f}")
                    print(f"Update time: {depth_cache.update_time}")
                    print("---")
    
    finally:
        await client.close_connection()

asyncio.run(main())

DepthCache

Individual order book cache for managing bids and asks.

class DepthCache:
    def __init__(self, symbol, conv_type: Callable = float): ...
    
    def add_bid(self, bid): ...
    def add_ask(self, ask): ...
    def get_bids(self) -> List[List]: ...
    def get_asks(self) -> List[List]: ...
    def get_top_bids(self, k: int) -> List[List]: ...
    def get_top_asks(self, k: int) -> List[List]: ...
    @staticmethod
    def sort_depth(depth_dict, reverse: bool = False, conv_type: Callable = float) -> List[List]: ...

Usage Examples

from binance.ws import DepthCache

# Create depth cache with custom conversion type
from decimal import Decimal
depth_cache = DepthCache('BTCUSDT', conv_type=Decimal)

# Manually add bids and asks (normally done automatically)
depth_cache.add_bid(['50000.00', '0.5'])
depth_cache.add_bid(['49950.00', '1.0'])
depth_cache.add_ask(['50050.00', '0.3'])
depth_cache.add_ask(['50100.00', '0.8'])

# Get sorted order book data
bids = depth_cache.get_bids()
asks = depth_cache.get_asks()

print("Bids (price, quantity):")
for bid in bids[:10]:
    print(f"  {bid[0]} @ {bid[1]}")

print("Asks (price, quantity):")
for ask in asks[:10]:
    print(f"  {ask[0]} @ {ask[1]}")

# Get top N levels
top_5_bids = depth_cache.get_top_bids(5)
top_5_asks = depth_cache.get_top_asks(5)

# Calculate order book statistics
total_bid_volume = sum(float(bid[1]) for bid in bids)
total_ask_volume = sum(float(ask[1]) for ask in asks)

print(f"Total bid volume: {total_bid_volume}")
print(f"Total ask volume: {total_ask_volume}")

ThreadedDepthCacheManager

Thread-based depth cache manager for easier integration with non-async code.

class ThreadedDepthCacheManager:
    def __init__(
        self,
        api_key: str,
        api_secret: str,
        symbol: str,
        refresh_interval: Optional[int] = None,
        testnet: bool = False
    ): ...
    
    def start(self): ...
    def stop(self): ...
    def get_depth_cache(self) -> DepthCache: ...

Usage Examples

import time
from binance.ws import ThreadedDepthCacheManager

def main():
    # Initialize threaded depth cache manager  
    dcm = ThreadedDepthCacheManager(
        api_key='your_key',
        api_secret='your_secret',
        symbol='BTCUSDT'
    )
    
    try:
        # Start the manager
        dcm.start()
        
        # Wait for initial synchronization
        time.sleep(2)
        
        # Monitor order book
        for i in range(60):  # Monitor for 1 minute
            depth_cache = dcm.get_depth_cache()
            
            if depth_cache:
                bids = depth_cache.get_bids()[:3]
                asks = depth_cache.get_asks()[:3]
                
                print(f"Time: {time.time()}")
                print(f"Top 3 bids: {bids}")
                print(f"Top 3 asks: {asks}")
                
                if bids and asks:
                    spread = float(asks[0][0]) - float(bids[0][0])
                    print(f"Spread: {spread:.2f}")
                
                print("---")
            
            time.sleep(1)
    
    finally:
        dcm.stop()

if __name__ == "__main__":
    main()

Specialized Depth Cache Managers

Different managers for various market types.

class FuturesDepthCacheManager:
    def __init__(
        self,
        client: AsyncClient,
        symbol: str,
        refresh_interval: Optional[int] = None,
        bm: Optional[BinanceSocketManager] = None,
        limit: int = 500,
        conv_type: Callable = float
    ): ...

class OptionsDepthCacheManager:
    def __init__(
        self,
        client: AsyncClient,
        symbol: str,
        refresh_interval: Optional[int] = None,
        bm: Optional[BinanceSocketManager] = None,
        limit: int = 500,
        conv_type: Callable = float
    ): ...

Specialized Usage Examples

from binance import AsyncClient, FuturesDepthCacheManager, OptionsDepthCacheManager

async def futures_depth_monitoring():
    client = await AsyncClient.create()
    
    try:
        # Monitor futures order book
        async with FuturesDepthCacheManager(client, symbol='BTCUSDT') as dcm:
            depth_cache = dcm.get_depth_cache()
            
            for i in range(60):
                await asyncio.sleep(1)
                
                bids = depth_cache.get_bids()[:5]
                asks = depth_cache.get_asks()[:5]
                
                print(f"Futures BTCUSDT - Best bid: {bids[0] if bids else 'N/A'}")
                print(f"Futures BTCUSDT - Best ask: {asks[0] if asks else 'N/A'}")
    
    finally:
        await client.close_connection()

async def options_depth_monitoring():
    client = await AsyncClient.create()
    
    try:
        # Monitor options order book
        async with OptionsDepthCacheManager(client, symbol='BTC-240329-70000-C') as dcm:
            depth_cache = dcm.get_depth_cache()
            
            for i in range(60):
                await asyncio.sleep(1)
                
                bids = depth_cache.get_bids()[:3]
                asks = depth_cache.get_asks()[:3]
                
                print(f"Options - Best bid: {bids[0] if bids else 'N/A'}")
                print(f"Options - Best ask: {asks[0] if asks else 'N/A'}")
    
    finally:
        await client.close_connection()

Advanced Depth Cache Configuration

Custom Conversion Types

from decimal import Decimal
import asyncio
from binance import AsyncClient, DepthCacheManager

async def high_precision_monitoring():
    client = await AsyncClient.create()
    
    try:
        # Use Decimal for high precision
        async with DepthCacheManager(
            client, 
            symbol='BTCUSDT',
            conv_type=Decimal  # High precision arithmetic
        ) as dcm:
            depth_cache = dcm.get_depth_cache()
            
            await asyncio.sleep(2)  # Wait for sync
            
            bids = depth_cache.get_bids()[:5]
            asks = depth_cache.get_asks()[:5]
            
            print("High precision order book:")
            for i, (bid, ask) in enumerate(zip(bids, asks)):
                print(f"Level {i+1}: Bid {bid[0]} ({bid[1]}) | Ask {ask[0]} ({ask[1]})")
                
                # Calculate precise spread
                spread = ask[0] - bid[0]
                print(f"  Spread: {spread}")
    
    finally:
        await client.close_connection()

asyncio.run(high_precision_monitoring())

Custom Refresh Intervals

async def custom_refresh_monitoring():
    client = await AsyncClient.create()
    
    try:
        # Refresh order book snapshot every 10 seconds
        async with DepthCacheManager(
            client, 
            symbol='BTCUSDT',
            refresh_interval=10  # Seconds
        ) as dcm:
            depth_cache = dcm.get_depth_cache()
            
            # Monitor for changes
            for i in range(120):  # 2 minutes
                await asyncio.sleep(1)
                
                if i % 10 == 0:  # Print every 10 seconds
                    bids = depth_cache.get_bids()[:3]
                    asks = depth_cache.get_asks()[:3]
                    
                    print(f"Refresh cycle {i//10}: Update time {depth_cache.update_time}")
                    print(f"Bids: {bids}")
                    print(f"Asks: {asks}")
    
    finally:
        await client.close_connection()

Order Book Analysis

Market Depth Analysis

async def market_depth_analysis():
    client = await AsyncClient.create()
    
    try:
        async with DepthCacheManager(client, symbol='BTCUSDT') as dcm:
            depth_cache = dcm.get_depth_cache()
            
            await asyncio.sleep(3)  # Wait for sync
            
            bids = depth_cache.get_bids()
            asks = depth_cache.get_asks()
            
            # Calculate cumulative volumes
            cumulative_bid_volume = 0
            cumulative_ask_volume = 0
            
            print("Market Depth Analysis:")
            print("Level | Bid Price | Bid Vol | Cum Bid | Ask Price | Ask Vol | Cum Ask | Spread")
            print("-" * 80)
            
            for i in range(min(10, len(bids), len(asks))):
                bid_price, bid_vol = bids[i]
                ask_price, ask_vol = asks[i]
                
                cumulative_bid_volume += float(bid_vol)
                cumulative_ask_volume += float(ask_vol)
                
                spread = float(ask_price) - float(bid_price)
                
                print(f"{i+1:5d} | {bid_price:9s} | {bid_vol:7s} | {cumulative_bid_volume:7.3f} | {ask_price:9s} | {ask_vol:7s} | {cumulative_ask_volume:7.3f} | {spread:6.2f}")
            
            # Calculate market impact for different order sizes
            print("\nMarket Impact Analysis:")
            test_sizes = [0.1, 0.5, 1.0, 5.0, 10.0]  # BTC amounts
            
            for size in test_sizes:
                # Calculate average execution price for market buy
                remaining_size = size
                total_cost = 0
                
                for ask_price, ask_vol in asks:
                    if remaining_size <= 0:
                        break
                    
                    vol_to_take = min(remaining_size, float(ask_vol))
                    total_cost += vol_to_take * float(ask_price)
                    remaining_size -= vol_to_take
                
                if remaining_size <= 0:
                    avg_price = total_cost / size
                    market_price = float(asks[0][0])
                    impact = ((avg_price - market_price) / market_price) * 100
                    print(f"Buy {size:4.1f} BTC: Avg price {avg_price:8.2f}, Impact {impact:5.2f}%")
    
    finally:
        await client.close_connection()

asyncio.run(market_depth_analysis())

Error Handling and Monitoring

import asyncio
import logging
from binance import AsyncClient, DepthCacheManager
from binance.exceptions import BinanceWebsocketUnableToConnect

logging.basicConfig(level=logging.INFO)

async def robust_depth_monitoring():
    client = await AsyncClient.create()
    
    retry_count = 0
    max_retries = 5
    
    while retry_count < max_retries:
        try:
            async with DepthCacheManager(client, symbol='BTCUSDT') as dcm:
                depth_cache = dcm.get_depth_cache()
                
                consecutive_errors = 0
                max_consecutive_errors = 10
                
                for i in range(300):  # 5 minutes
                    try:
                        await asyncio.sleep(1)
                        
                        bids = depth_cache.get_bids()
                        asks = depth_cache.get_asks()
                        
                        if not bids or not asks:
                            consecutive_errors += 1
                            if consecutive_errors > max_consecutive_errors:
                                raise Exception("Too many consecutive empty order book updates")
                            continue
                        
                        consecutive_errors = 0
                        
                        # Normal processing
                        if i % 10 == 0:  # Log every 10 seconds
                            print(f"Status OK: Best bid {bids[0][0]}, Best ask {asks[0][0]}")
                    
                    except Exception as e:
                        logging.error(f"Error during monitoring: {e}")
                        break
                
                # If we get here, monitoring completed successfully
                break
                
        except BinanceWebsocketUnableToConnect as e:
            retry_count += 1
            logging.error(f"WebSocket connection failed (attempt {retry_count}): {e}")
            if retry_count < max_retries:
                await asyncio.sleep(2 ** retry_count)  # Exponential backoff
            else:
                logging.error("Max retries reached, giving up")
                break
        
        except Exception as e:
            logging.error(f"Unexpected error: {e}")
            break
    
    await client.close_connection()

asyncio.run(robust_depth_monitoring())

The depth cache system provides efficient real-time order book management with automatic synchronization, flexible precision control, and comprehensive market depth analysis capabilities for all Binance market types.

Install with Tessl CLI

npx tessl i tessl/pypi-python-binance

docs

account.md

convert-api.md

depth-cache.md

futures.md

index.md

margin.md

market-data.md

rest-clients.md

staking-mining.md

trading.md

websockets.md

tile.json