CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cbpro

The unofficial Python client for the Coinbase Pro API providing comprehensive trading and market data access

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

order-book.mddocs/

Order Book Management

The OrderBook class provides automated maintenance of real-time order book state by processing WebSocket messages from Coinbase Pro's feed. It maintains accurate bid/ask data and handles the complexity of order book updates, making it easy to access current market depth.

Capabilities

Order Book Initialization

Create an OrderBook instance to track real-time order book state for a specific product.

class OrderBook:
    def __init__(self, product_id: str = 'BTC-USD', log_to = None):
        """
        Initialize order book for real-time tracking of a specific product.

        Parameters:
        - product_id (str): Product to track (e.g., 'BTC-USD', 'ETH-USD')
        - log_to: Optional file-like object for logging order book messages
        """

Usage Example:

import cbpro

# Create order book for BTC-USD
order_book = cbpro.OrderBook(product_id='BTC-USD')

# With logging to file
with open('orderbook.log', 'wb') as log_file:
    order_book = cbpro.OrderBook(product_id='BTC-USD', log_to=log_file)

Message Processing

Process WebSocket messages to maintain accurate order book state with automatic synchronization.

def process_message(self, message: dict):
    """
    Process a WebSocket message to update the order book state.

    Handles different message types:
    - 'open': Add new order to book
    - 'done': Remove completed order from book
    - 'match': Update order sizes after trade execution
    - 'change': Update order size changes

    Parameters:
    - message (dict): WebSocket message from Coinbase Pro feed

    Automatically handles:
    - Message sequence validation
    - Order book resyncing on sequence gaps
    - Initial book loading from REST API
    """

def reset_book(self):
    """
    Reset and reload the order book from Coinbase Pro REST API.
    
    Automatically called when:
    - First message is processed
    - Message sequence gaps are detected
    - Manual reset is needed
    """

def on_sequence_gap(self, gap_start: int, gap_end: int):
    """
    Called when a sequence gap is detected in WebSocket messages.
    
    Override this method to customize gap handling behavior. Default
    implementation calls reset_book() to resync.
    
    Parameters:
    - gap_start (int): Last processed sequence number
    - gap_end (int): Next received sequence number
    """

Usage Example:

# Integration with WebSocket client
class OrderBookWebsocket(cbpro.WebsocketClient):
    def __init__(self, product_id):
        super().__init__(
            products=[product_id],
            channels=['full'],  # Full channel for complete order book updates
            should_print=False
        )
        self.order_book = cbpro.OrderBook(product_id=product_id)

    def on_message(self, msg):
        # Process each message through order book
        self.order_book.process_message(msg)

# Start tracking BTC-USD order book
ws_client = OrderBookWebsocket('BTC-USD')
ws_client.start()

# Access order book data
time.sleep(5)  # Let it initialize
current_book = ws_client.order_book.get_current_book()
print(f"Bids: {len(current_book['bids'])}")
print(f"Asks: {len(current_book['asks'])}")

Order Book Data Access

Retrieve current order book state and market data with convenient access methods.

def get_current_book(self) -> dict:
    """
    Get complete current order book state.

    Returns:
    dict: Complete order book containing:
        - sequence: Current message sequence number
        - bids: List of [price, size, order_id] for buy orders
        - asks: List of [price, size, order_id] for sell orders
    """

def get_ask(self):
    """
    Get the best (lowest) ask price.

    Returns:
    Decimal: Best ask price, or None if no asks available
    """

def get_bid(self):
    """
    Get the best (highest) bid price.

    Returns:
    Decimal: Best bid price, or None if no bids available
    """

def get_asks(self, price) -> list:
    """
    Get all ask orders at a specific price level.

    Parameters:
    - price: Price level to query

    Returns:
    list: List of orders at that price level, or None if no orders
    """

def get_bids(self, price) -> list:
    """
    Get all bid orders at a specific price level.

    Parameters:
    - price: Price level to query

    Returns:
    list: List of orders at that price level, or None if no orders
    """

def get_current_ticker(self) -> dict:
    """
    Get the most recent trade information.

    Returns:
    dict: Last trade data from 'match' messages, or None if no trades yet
    """

Usage Example:

# Get current market data
best_bid = order_book.get_bid()
best_ask = order_book.get_ask()

if best_bid and best_ask:
    spread = float(best_ask) - float(best_bid)
    mid_price = (float(best_bid) + float(best_ask)) / 2
    spread_bps = (spread / mid_price) * 10000
    
    print(f"Best Bid: ${best_bid}")
    print(f"Best Ask: ${best_ask}")
    print(f"Spread: ${spread:.2f} ({spread_bps:.1f} bps)")

# Get market depth at specific levels
bid_orders_at_43000 = order_book.get_bids(43000.00)
if bid_orders_at_43000:
    total_size = sum(float(order['size']) for order in bid_orders_at_43000)
    print(f"Total size at $43,000 bid: {total_size} BTC")

# Get complete order book snapshot
book_snapshot = order_book.get_current_book()
print(f"Order book sequence: {book_snapshot['sequence']}")
print(f"Total bid levels: {len(book_snapshot['bids'])}")
print(f"Total ask levels: {len(book_snapshot['asks'])}")

Advanced Usage Patterns

Bid-Ask Spread Monitor

Track and log spread changes in real-time with automatic detection of significant moves.

class SpreadMonitor(cbpro.OrderBook):
    def __init__(self, product_id, spread_threshold=0.01):
        super().__init__(product_id=product_id)
        self.spread_threshold = spread_threshold
        self.last_spread = None
        self.spread_history = []

    def process_message(self, message):
        super().process_message(message)
        
        # Check spread after each update
        bid = self.get_bid()
        ask = self.get_ask()
        
        if bid and ask:
            current_spread = float(ask) - float(bid)
            
            # Log significant spread changes
            if self.last_spread:
                spread_change = abs(current_spread - self.last_spread)
                if spread_change > self.spread_threshold:
                    print(f"Spread change: ${self.last_spread:.2f} -> ${current_spread:.2f}")
            
            self.last_spread = current_spread
            self.spread_history.append({
                'timestamp': time.time(),
                'spread': current_spread,
                'bid': float(bid),
                'ask': float(ask)
            })
            
            # Keep only last 1000 spreads
            if len(self.spread_history) > 1000:
                self.spread_history = self.spread_history[-1000:]

# Use spread monitor
spread_monitor = SpreadMonitor('BTC-USD', spread_threshold=1.0)

Market Depth Analysis

Analyze order book depth and liquidity at different price levels.

class DepthAnalyzer(cbpro.OrderBook):
    def __init__(self, product_id):
        super().__init__(product_id=product_id)
        self.depth_levels = [0.1, 0.25, 0.5, 1.0, 2.0]  # Percentage levels

    def get_market_depth(self):
        """Calculate market depth at various percentage levels from best bid/ask."""
        bid = self.get_bid()
        ask = self.get_ask()
        
        if not bid or not ask:
            return None
        
        book = self.get_current_book()
        depth_analysis = {
            'timestamp': time.time(),
            'best_bid': float(bid),
            'best_ask': float(ask),
            'bid_depth': {},
            'ask_depth': {}
        }
        
        # Analyze bid depth
        for level_pct in self.depth_levels:
            price_threshold = float(bid) * (1 - level_pct / 100)
            total_size = 0
            total_value = 0
            
            for bid_level in book['bids']:
                price, size, _ = bid_level
                if float(price) >= price_threshold:
                    total_size += float(size)
                    total_value += float(price) * float(size)
            
            depth_analysis['bid_depth'][f'{level_pct}%'] = {
                'price_threshold': price_threshold,
                'total_size': total_size,
                'total_value': total_value
            }
        
        # Analyze ask depth
        for level_pct in self.depth_levels:
            price_threshold = float(ask) * (1 + level_pct / 100)
            total_size = 0
            total_value = 0
            
            for ask_level in book['asks']:
                price, size, _ = ask_level
                if float(price) <= price_threshold:
                    total_size += float(size)
                    total_value += float(price) * float(size)
            
            depth_analysis['ask_depth'][f'{level_pct}%'] = {
                'price_threshold': price_threshold,
                'total_size': total_size,
                'total_value': total_value
            }
        
        return depth_analysis

    def process_message(self, message):
        super().process_message(message)
        
        # Analyze depth every 100 messages
        if hasattr(self, 'message_count'):
            self.message_count += 1
        else:
            self.message_count = 1
            
        if self.message_count % 100 == 0:
            depth = self.get_market_depth()
            if depth:
                print(f"Market depth analysis:")
                print(f"  1% bid depth: {depth['bid_depth']['1.0%']['total_size']:.4f} BTC")
                print(f"  1% ask depth: {depth['ask_depth']['1.0%']['total_size']:.4f} BTC")

depth_analyzer = DepthAnalyzer('BTC-USD')

Order Book Console Display

Real-time console display of order book changes with bid-ask spread monitoring.

class OrderBookConsole(cbpro.OrderBook):
    def __init__(self, product_id):
        super().__init__(product_id=product_id)
        self._bid = None
        self._ask = None
        self._bid_depth = None
        self._ask_depth = None

    def process_message(self, message):
        if message.get('product_id') == self.product_id:
            super().process_message(message)

            try:
                # Calculate current bid-ask spread
                bid = self.get_bid()
                ask = self.get_ask()
                
                if bid and ask:
                    bids = self.get_bids(bid)
                    asks = self.get_asks(ask)
                    
                    bid_depth = sum(float(b['size']) for b in bids) if bids else 0
                    ask_depth = sum(float(a['size']) for a in asks) if asks else 0

                    # Only print if there are changes
                    if (self._bid != bid or self._ask != ask or 
                        self._bid_depth != bid_depth or self._ask_depth != ask_depth):
                        
                        self._bid = bid
                        self._ask = ask
                        self._bid_depth = bid_depth
                        self._ask_depth = ask_depth
                        
                        spread = float(ask) - float(bid)
                        timestamp = datetime.now().strftime('%H:%M:%S')
                        
                        print(f'{timestamp} {self.product_id} '
                              f'bid: {bid_depth:.3f} @ ${float(bid):,.2f} | '
                              f'ask: {ask_depth:.3f} @ ${float(ask):,.2f} | '
                              f'spread: ${spread:.2f}')
                        
            except Exception as e:
                print(f"Display error: {e}")

# Usage with WebSocket
class ConsoleWebsocket(cbpro.WebsocketClient):
    def __init__(self, products):
        super().__init__(
            products=products,
            channels=['full'],
            should_print=False
        )
        self.order_books = {}
        for product in products:
            self.order_books[product] = OrderBookConsole(product)

    def on_message(self, msg):
        product_id = msg.get('product_id')
        if product_id in self.order_books:
            self.order_books[product_id].process_message(msg)

# Monitor multiple products
console_ws = ConsoleWebsocket(['BTC-USD', 'ETH-USD'])
console_ws.start()

Error Handling and Synchronization

The OrderBook class includes robust error handling:

  • Sequence Gap Detection: Automatically detects missing messages and resyncs
  • Book Reset: Reloads complete order book from REST API when needed
  • Message Validation: Handles malformed or out-of-sequence messages
  • State Consistency: Maintains consistent bid/ask ordering using SortedDict

Sequence Gap Handling

def on_sequence_gap(self, gap_start, gap_end):
    """
    Called when a sequence gap is detected.
    Override this method to customize gap handling behavior.
    
    Parameters:
    - gap_start: Last processed sequence number
    - gap_end: Next received sequence number
    """
    print(f"Sequence gap detected: {gap_start} -> {gap_end}")
    print("Resetting order book...")
    self.reset_book()

Performance Considerations

  • Memory Usage: Order book size grows with market activity; consider periodic resets for long-running applications
  • Processing Speed: Keep message processing fast to avoid falling behind the feed
  • Data Structures: Uses SortedDict for efficient price-level access and maintenance
  • Logging: Optional pickle logging can impact performance; use only when needed
  • Channel Selection: Use 'level2' channel for lighter-weight order book tracking if full depth isn't needed

The OrderBook class is designed for high-frequency updates and maintains real-time accuracy even during periods of intense trading activity.

Install with Tessl CLI

npx tessl i tessl/pypi-cbpro

docs

authenticated-client.md

index.md

order-book.md

public-client.md

websocket-client.md

tile.json