Unofficial Python wrapper for the Binance cryptocurrency exchange REST API v3 and WebSocket APIs with comprehensive trading, market data, and account management functionality
—
Efficient real-time order book management with automatic synchronization from WebSocket streams. Provides sorted bid/ask access and maintains consistent order book state with configurable precision and conversion types.
Main class for managing real-time order book caches with automatic WebSocket synchronization.
class DepthCacheManager:
def __init__(
self,
client: AsyncClient,
symbol: str,
refresh_interval: Optional[int] = None,
bm: Optional[BinanceSocketManager] = None,
limit: int = 500,
conv_type: Callable = float
): ...
async def __aenter__(self): ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
def get_depth_cache(self) -> DepthCache: ...import asyncio
from binance import AsyncClient, DepthCacheManager
async def main():
# Initialize async client
client = await AsyncClient.create()
try:
# Create depth cache manager
async with DepthCacheManager(client, symbol='BTCUSDT') as dcm:
# Get the depth cache
depth_cache = dcm.get_depth_cache()
# Monitor order book changes
for i in range(100):
await asyncio.sleep(1)
# Get current best bid/ask
bids = depth_cache.get_bids()[:5] # Top 5 bids
asks = depth_cache.get_asks()[:5] # Top 5 asks
if bids and asks:
print(f"Best bid: {bids[0][0]} ({bids[0][1]})")
print(f"Best ask: {asks[0][0]} ({asks[0][1]})")
print(f"Spread: {float(asks[0][0]) - float(bids[0][0]):.2f}")
print(f"Update time: {depth_cache.update_time}")
print("---")
finally:
await client.close_connection()
asyncio.run(main())Individual order book cache for managing bids and asks.
class DepthCache:
def __init__(self, symbol, conv_type: Callable = float): ...
def add_bid(self, bid): ...
def add_ask(self, ask): ...
def get_bids(self) -> List[List]: ...
def get_asks(self) -> List[List]: ...
def get_top_bids(self, k: int) -> List[List]: ...
def get_top_asks(self, k: int) -> List[List]: ...
@staticmethod
def sort_depth(depth_dict, reverse: bool = False, conv_type: Callable = float) -> List[List]: ...from binance.ws import DepthCache
# Create depth cache with custom conversion type
from decimal import Decimal
depth_cache = DepthCache('BTCUSDT', conv_type=Decimal)
# Manually add bids and asks (normally done automatically)
depth_cache.add_bid(['50000.00', '0.5'])
depth_cache.add_bid(['49950.00', '1.0'])
depth_cache.add_ask(['50050.00', '0.3'])
depth_cache.add_ask(['50100.00', '0.8'])
# Get sorted order book data
bids = depth_cache.get_bids()
asks = depth_cache.get_asks()
print("Bids (price, quantity):")
for bid in bids[:10]:
print(f" {bid[0]} @ {bid[1]}")
print("Asks (price, quantity):")
for ask in asks[:10]:
print(f" {ask[0]} @ {ask[1]}")
# Get top N levels
top_5_bids = depth_cache.get_top_bids(5)
top_5_asks = depth_cache.get_top_asks(5)
# Calculate order book statistics
total_bid_volume = sum(float(bid[1]) for bid in bids)
total_ask_volume = sum(float(ask[1]) for ask in asks)
print(f"Total bid volume: {total_bid_volume}")
print(f"Total ask volume: {total_ask_volume}")Thread-based depth cache manager for easier integration with non-async code.
class ThreadedDepthCacheManager:
def __init__(
self,
api_key: str,
api_secret: str,
symbol: str,
refresh_interval: Optional[int] = None,
testnet: bool = False
): ...
def start(self): ...
def stop(self): ...
def get_depth_cache(self) -> DepthCache: ...import time
from binance.ws import ThreadedDepthCacheManager
def main():
# Initialize threaded depth cache manager
dcm = ThreadedDepthCacheManager(
api_key='your_key',
api_secret='your_secret',
symbol='BTCUSDT'
)
try:
# Start the manager
dcm.start()
# Wait for initial synchronization
time.sleep(2)
# Monitor order book
for i in range(60): # Monitor for 1 minute
depth_cache = dcm.get_depth_cache()
if depth_cache:
bids = depth_cache.get_bids()[:3]
asks = depth_cache.get_asks()[:3]
print(f"Time: {time.time()}")
print(f"Top 3 bids: {bids}")
print(f"Top 3 asks: {asks}")
if bids and asks:
spread = float(asks[0][0]) - float(bids[0][0])
print(f"Spread: {spread:.2f}")
print("---")
time.sleep(1)
finally:
dcm.stop()
if __name__ == "__main__":
main()Different managers for various market types.
class FuturesDepthCacheManager:
def __init__(
self,
client: AsyncClient,
symbol: str,
refresh_interval: Optional[int] = None,
bm: Optional[BinanceSocketManager] = None,
limit: int = 500,
conv_type: Callable = float
): ...
class OptionsDepthCacheManager:
def __init__(
self,
client: AsyncClient,
symbol: str,
refresh_interval: Optional[int] = None,
bm: Optional[BinanceSocketManager] = None,
limit: int = 500,
conv_type: Callable = float
): ...from binance import AsyncClient, FuturesDepthCacheManager, OptionsDepthCacheManager
async def futures_depth_monitoring():
client = await AsyncClient.create()
try:
# Monitor futures order book
async with FuturesDepthCacheManager(client, symbol='BTCUSDT') as dcm:
depth_cache = dcm.get_depth_cache()
for i in range(60):
await asyncio.sleep(1)
bids = depth_cache.get_bids()[:5]
asks = depth_cache.get_asks()[:5]
print(f"Futures BTCUSDT - Best bid: {bids[0] if bids else 'N/A'}")
print(f"Futures BTCUSDT - Best ask: {asks[0] if asks else 'N/A'}")
finally:
await client.close_connection()
async def options_depth_monitoring():
client = await AsyncClient.create()
try:
# Monitor options order book
async with OptionsDepthCacheManager(client, symbol='BTC-240329-70000-C') as dcm:
depth_cache = dcm.get_depth_cache()
for i in range(60):
await asyncio.sleep(1)
bids = depth_cache.get_bids()[:3]
asks = depth_cache.get_asks()[:3]
print(f"Options - Best bid: {bids[0] if bids else 'N/A'}")
print(f"Options - Best ask: {asks[0] if asks else 'N/A'}")
finally:
await client.close_connection()from decimal import Decimal
import asyncio
from binance import AsyncClient, DepthCacheManager
async def high_precision_monitoring():
client = await AsyncClient.create()
try:
# Use Decimal for high precision
async with DepthCacheManager(
client,
symbol='BTCUSDT',
conv_type=Decimal # High precision arithmetic
) as dcm:
depth_cache = dcm.get_depth_cache()
await asyncio.sleep(2) # Wait for sync
bids = depth_cache.get_bids()[:5]
asks = depth_cache.get_asks()[:5]
print("High precision order book:")
for i, (bid, ask) in enumerate(zip(bids, asks)):
print(f"Level {i+1}: Bid {bid[0]} ({bid[1]}) | Ask {ask[0]} ({ask[1]})")
# Calculate precise spread
spread = ask[0] - bid[0]
print(f" Spread: {spread}")
finally:
await client.close_connection()
asyncio.run(high_precision_monitoring())async def custom_refresh_monitoring():
client = await AsyncClient.create()
try:
# Refresh order book snapshot every 10 seconds
async with DepthCacheManager(
client,
symbol='BTCUSDT',
refresh_interval=10 # Seconds
) as dcm:
depth_cache = dcm.get_depth_cache()
# Monitor for changes
for i in range(120): # 2 minutes
await asyncio.sleep(1)
if i % 10 == 0: # Print every 10 seconds
bids = depth_cache.get_bids()[:3]
asks = depth_cache.get_asks()[:3]
print(f"Refresh cycle {i//10}: Update time {depth_cache.update_time}")
print(f"Bids: {bids}")
print(f"Asks: {asks}")
finally:
await client.close_connection()async def market_depth_analysis():
client = await AsyncClient.create()
try:
async with DepthCacheManager(client, symbol='BTCUSDT') as dcm:
depth_cache = dcm.get_depth_cache()
await asyncio.sleep(3) # Wait for sync
bids = depth_cache.get_bids()
asks = depth_cache.get_asks()
# Calculate cumulative volumes
cumulative_bid_volume = 0
cumulative_ask_volume = 0
print("Market Depth Analysis:")
print("Level | Bid Price | Bid Vol | Cum Bid | Ask Price | Ask Vol | Cum Ask | Spread")
print("-" * 80)
for i in range(min(10, len(bids), len(asks))):
bid_price, bid_vol = bids[i]
ask_price, ask_vol = asks[i]
cumulative_bid_volume += float(bid_vol)
cumulative_ask_volume += float(ask_vol)
spread = float(ask_price) - float(bid_price)
print(f"{i+1:5d} | {bid_price:9s} | {bid_vol:7s} | {cumulative_bid_volume:7.3f} | {ask_price:9s} | {ask_vol:7s} | {cumulative_ask_volume:7.3f} | {spread:6.2f}")
# Calculate market impact for different order sizes
print("\nMarket Impact Analysis:")
test_sizes = [0.1, 0.5, 1.0, 5.0, 10.0] # BTC amounts
for size in test_sizes:
# Calculate average execution price for market buy
remaining_size = size
total_cost = 0
for ask_price, ask_vol in asks:
if remaining_size <= 0:
break
vol_to_take = min(remaining_size, float(ask_vol))
total_cost += vol_to_take * float(ask_price)
remaining_size -= vol_to_take
if remaining_size <= 0:
avg_price = total_cost / size
market_price = float(asks[0][0])
impact = ((avg_price - market_price) / market_price) * 100
print(f"Buy {size:4.1f} BTC: Avg price {avg_price:8.2f}, Impact {impact:5.2f}%")
finally:
await client.close_connection()
asyncio.run(market_depth_analysis())import asyncio
import logging
from binance import AsyncClient, DepthCacheManager
from binance.exceptions import BinanceWebsocketUnableToConnect
logging.basicConfig(level=logging.INFO)
async def robust_depth_monitoring():
client = await AsyncClient.create()
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
async with DepthCacheManager(client, symbol='BTCUSDT') as dcm:
depth_cache = dcm.get_depth_cache()
consecutive_errors = 0
max_consecutive_errors = 10
for i in range(300): # 5 minutes
try:
await asyncio.sleep(1)
bids = depth_cache.get_bids()
asks = depth_cache.get_asks()
if not bids or not asks:
consecutive_errors += 1
if consecutive_errors > max_consecutive_errors:
raise Exception("Too many consecutive empty order book updates")
continue
consecutive_errors = 0
# Normal processing
if i % 10 == 0: # Log every 10 seconds
print(f"Status OK: Best bid {bids[0][0]}, Best ask {asks[0][0]}")
except Exception as e:
logging.error(f"Error during monitoring: {e}")
break
# If we get here, monitoring completed successfully
break
except BinanceWebsocketUnableToConnect as e:
retry_count += 1
logging.error(f"WebSocket connection failed (attempt {retry_count}): {e}")
if retry_count < max_retries:
await asyncio.sleep(2 ** retry_count) # Exponential backoff
else:
logging.error("Max retries reached, giving up")
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
break
await client.close_connection()
asyncio.run(robust_depth_monitoring())The depth cache system provides efficient real-time order book management with automatic synchronization, flexible precision control, and comprehensive market depth analysis capabilities for all Binance market types.
Install with Tessl CLI
npx tessl i tessl/pypi-python-binance