CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cbpro

The unofficial Python client for the Coinbase Pro API providing comprehensive trading and market data access

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

websocket-client.mddocs/

Real-time Data Streaming

The WebsocketClient provides real-time streaming of market data, order updates, and account changes via WebSocket connections. It supports multiple channels, authentication for private data, and optional MongoDB integration for data persistence.

Capabilities

Client Initialization

Create a WebSocket client for real-time data streaming with customizable channels and message handling.

class WebsocketClient:
    def __init__(self, url: str = "wss://ws-feed.pro.coinbase.com", 
                 products: list = None, message_type: str = "subscribe",
                 mongo_collection = None, should_print: bool = True,
                 auth: bool = False, api_key: str = "", api_secret: str = "", 
                 api_passphrase: str = "", *, channels: list):
        """
        Initialize WebSocket client for real-time data streaming.

        Parameters:
        - url (str): WebSocket URL. Defaults to production feed.
        - products (list): List of products to subscribe to (e.g., ["BTC-USD", "ETH-USD"])
        - message_type (str): Message type, typically "subscribe"
        - mongo_collection: MongoDB collection for data persistence (optional)
        - should_print (bool): Whether to print messages to console
        - auth (bool): Whether to authenticate for private channels
        - api_key (str): API key for authentication
        - api_secret (str): API secret for authentication
        - api_passphrase (str): API passphrase for authentication
        - channels (list): Required. List of channels to subscribe to.
            Options: ['ticker', 'user', 'matches', 'level2', 'full']
        """

Usage Example:

import cbpro

# Public market data streaming
ws_client = cbpro.WebsocketClient(
    products=['BTC-USD', 'ETH-USD'],
    channels=['ticker', 'matches']
)

# Authenticated streaming for private data
auth_ws_client = cbpro.WebsocketClient(
    products=['BTC-USD'],
    channels=['user', 'ticker'],
    auth=True,
    api_key=api_key,
    api_secret=api_secret,
    api_passphrase=api_passphrase
)

# With MongoDB integration
from pymongo import MongoClient
mongo_client = MongoClient('mongodb://localhost:27017/')
collection = mongo_client.crypto_db.btc_data

ws_client = cbpro.WebsocketClient(
    products=['BTC-USD'],
    channels=['ticker'],
    mongo_collection=collection,
    should_print=False
)

Connection Management

Control WebSocket connection lifecycle with start, stop, and error handling.

def start(self):
    """
    Start the WebSocket connection and begin listening for messages.
    Creates background threads for message processing and keepalive.
    """

def close(self):
    """
    Close the WebSocket connection and stop all background threads.
    Call this method to cleanly disconnect.
    """

Usage Example:

# Start streaming
ws_client.start()

try:
    # Keep main thread alive while streaming
    while True:
        time.sleep(1)
        # Can check ws_client.error for connection issues
        if ws_client.error:
            print(f"WebSocket error: {ws_client.error}")
            break
except KeyboardInterrupt:
    print("Stopping WebSocket client...")
finally:
    ws_client.close()

Event Handlers

Override event handler methods to customize message processing and connection behavior.

def on_open(self):
    """
    Called once, immediately before the socket connection is made.
    Override this method to set initial parameters or perform setup.
    """

def on_message(self, msg: dict):
    """
    Called once for every message that arrives.
    Override this method to process incoming messages.

    Parameters:
    - msg (dict): Message data containing channel-specific information
    """

def on_close(self):
    """
    Called once when the WebSocket connection is closed.
    Override this method to perform cleanup or logging.
    """

def on_error(self, e: Exception, data = None):
    """
    Called when an error occurs during WebSocket operation.
    Override this method to handle errors appropriately.

    Parameters:
    - e (Exception): The exception that occurred
    - data: Additional error data (optional)
    """

Usage Example:

class CustomWebsocketClient(cbpro.WebsocketClient):
    def __init__(self):
        super().__init__(
            products=['BTC-USD', 'ETH-USD'],
            channels=['ticker', 'matches']
        )
        self.message_count = 0
        self.prices = {}

    def on_open(self):
        print("WebSocket connection established")
        print(f"Subscribed to: {self.products}")

    def on_message(self, msg):
        self.message_count += 1
        
        if msg.get('type') == 'ticker':
            product = msg.get('product_id')
            price = float(msg.get('price', 0))
            self.prices[product] = price
            print(f"{product}: ${price:,.2f}")
            
        elif msg.get('type') == 'match':
            product = msg.get('product_id')
            size = msg.get('size')
            price = msg.get('price')
            side = msg.get('side')
            print(f"Trade: {product} {side} {size} @ ${price}")

    def on_close(self):
        print(f"Connection closed. Processed {self.message_count} messages")

    def on_error(self, e, data=None):
        print(f"WebSocket error: {e}")
        if data:
            print(f"Error data: {data}")

# Use custom client
custom_client = CustomWebsocketClient()
custom_client.start()

Channel Types and Message Formats

Ticker Channel

Real-time price updates and 24-hour statistics.

Channel: ticker
Authentication: Not required

Message Format:

{
    "type": "ticker",
    "sequence": 5928281084,
    "product_id": "BTC-USD",
    "price": "43000.00",
    "open_24h": "42500.00",
    "volume_24h": "1234.56789",
    "low_24h": "42000.00",
    "high_24h": "44000.00",
    "volume_30d": "12345.67890",
    "best_bid": "42999.99",
    "best_ask": "43000.01",
    "side": "buy",
    "time": "2023-01-01T12:00:00.000000Z",
    "trade_id": 123456789,
    "last_size": "0.001"
}

Matches Channel

Real-time trade execution data.

Channel: matches
Authentication: Not required

Message Format:

{
    "type": "match",
    "trade_id": 123456789,
    "sequence": 5928281084,
    "maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8",
    "taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1",
    "time": "2023-01-01T12:00:00.000000Z",
    "product_id": "BTC-USD",
    "size": "0.001",
    "price": "43000.00",
    "side": "buy"
}

Level2 Channel

Order book updates with top 50 bids and asks.

Channel: level2
Authentication: Not required

Message Format:

# Snapshot message (initial)
{
    "type": "snapshot",
    "product_id": "BTC-USD",
    "bids": [["43000.00", "1.5"], ["42999.99", "2.0"]],
    "asks": [["43000.01", "0.5"], ["43000.02", "1.0"]]
}

# Update message (changes)
{
    "type": "l2update",
    "product_id": "BTC-USD",
    "time": "2023-01-01T12:00:00.000000Z",
    "changes": [
        ["buy", "43000.00", "1.2"],    # [side, price, new_size]
        ["sell", "43000.02", "0.0"]    # size "0.0" means removed
    ]
}

User Channel

Private account and order updates (requires authentication).

Channel: user
Authentication: Required

Message Format:

# Order received
{
    "type": "received",
    "time": "2023-01-01T12:00:00.000000Z",
    "product_id": "BTC-USD",
    "sequence": 5928281084,
    "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",
    "size": "0.001",
    "price": "43000.00",
    "side": "buy",
    "order_type": "limit"
}

# Order filled
{
    "type": "match",
    "time": "2023-01-01T12:00:00.000000Z",
    "product_id": "BTC-USD",
    "sequence": 5928281085,
    "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",
    "trade_id": 123456789,
    "size": "0.001",
    "price": "43000.00",
    "side": "buy",
    "liquidity": "T",
    "fee": "1.075",
    "funds": "43.001075"
}

Full Channel

Complete order book with all orders (Level 3).

Channel: full
Authentication: Not required (but rate limited)

Note: Only recommended for maintaining full real-time order books. Abuse via polling will result in access limitations.

Advanced Usage Patterns

Price Monitoring and Alerts

class PriceMonitor(cbpro.WebsocketClient):
    def __init__(self, product_id, alert_price):
        super().__init__(
            products=[product_id],
            channels=['ticker'],
            should_print=False
        )
        self.product_id = product_id
        self.alert_price = alert_price
        self.last_price = None

    def on_message(self, msg):
        if msg.get('type') == 'ticker' and msg.get('product_id') == self.product_id:
            current_price = float(msg.get('price', 0))
            
            if self.last_price and current_price >= self.alert_price and self.last_price < self.alert_price:
                print(f"ALERT: {self.product_id} crossed ${self.alert_price}!")
                print(f"Current price: ${current_price}")
            
            self.last_price = current_price

# Monitor BTC price
price_monitor = PriceMonitor('BTC-USD', 45000.0)
price_monitor.start()

Trade Volume Analysis

class VolumeAnalyzer(cbpro.WebsocketClient):
    def __init__(self, product_id):
        super().__init__(
            products=[product_id],
            channels=['matches'],
            should_print=False
        )
        self.product_id = product_id
        self.trades = []
        self.volume_1min = 0
        self.last_minute = None

    def on_message(self, msg):
        if msg.get('type') == 'match' and msg.get('product_id') == self.product_id:
            size = float(msg.get('size', 0))
            price = float(msg.get('price', 0))
            timestamp = msg.get('time')
            
            # Track volume per minute
            current_minute = timestamp[:16]  # YYYY-MM-DDTHH:MM
            
            if self.last_minute != current_minute:
                if self.last_minute:
                    print(f"Volume {self.last_minute}: {self.volume_1min:.4f} {self.product_id.split('-')[0]}")
                self.volume_1min = 0
                self.last_minute = current_minute
            
            self.volume_1min += size
            self.trades.append({
                'time': timestamp,
                'price': price,
                'size': size,
                'value': price * size
            })

volume_analyzer = VolumeAnalyzer('BTC-USD')
volume_analyzer.start()

MongoDB Data Persistence

from pymongo import MongoClient
import cbpro

# Setup MongoDB connection
mongo_client = MongoClient('mongodb://localhost:27017/')
db = mongo_client.crypto_data
btc_collection = db.btc_ticks

# Stream ticker data directly to MongoDB
ws_client = cbpro.WebsocketClient(
    products=['BTC-USD'],
    channels=['ticker'],
    mongo_collection=btc_collection,
    should_print=False
)

ws_client.start()

# Query stored data later
from datetime import datetime, timedelta
recent_data = btc_collection.find({
    'time': {'$gte': (datetime.utcnow() - timedelta(hours=1)).isoformat()}
}).sort('time', 1)

for tick in recent_data:
    print(f"Price: ${tick['price']} at {tick['time']}")

Error Handling and Reconnection

The WebSocketClient includes built-in error handling and keepalive mechanisms:

  • Automatic Keepalive: Sends ping messages every 30 seconds to maintain connection
  • Error Recovery: Captures connection errors and provides them via the error attribute
  • Clean Disconnect: Properly closes connections and joins background threads
  • Message Validation: Handles malformed JSON messages gracefully

For production applications, implement reconnection logic:

def run_websocket_with_reconnect(ws_class, max_retries=5):
    retries = 0
    while retries < max_retries:
        ws_client = ws_class()
        ws_client.start()
        
        # Monitor for errors
        while not ws_client.error:
            time.sleep(1)
        
        print(f"WebSocket error occurred: {ws_client.error}")
        ws_client.close()
        
        retries += 1
        if retries < max_retries:
            print(f"Reconnecting in 5 seconds... (attempt {retries + 1}/{max_retries})")
            time.sleep(5)
        else:
            print("Max retries exceeded. Giving up.")
            break

Performance Considerations

  • Channel Selection: Only subscribe to needed channels to reduce bandwidth
  • Product Filtering: Limit products to reduce message volume
  • Message Processing: Keep on_message processing fast to avoid blocking
  • Memory Management: Implement data rotation for long-running applications
  • Rate Limiting: Respect WebSocket rate limits to avoid disconnection

Install with Tessl CLI

npx tessl i tessl/pypi-cbpro

docs

authenticated-client.md

index.md

order-book.md

public-client.md

websocket-client.md

tile.json