The unofficial Python client for the Coinbase Pro API providing comprehensive trading and market data access
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The WebsocketClient provides real-time streaming of market data, order updates, and account changes via WebSocket connections. It supports multiple channels, authentication for private data, and optional MongoDB integration for data persistence.
Create a WebSocket client for real-time data streaming with customizable channels and message handling.
class WebsocketClient:
def __init__(self, url: str = "wss://ws-feed.pro.coinbase.com",
products: list = None, message_type: str = "subscribe",
mongo_collection = None, should_print: bool = True,
auth: bool = False, api_key: str = "", api_secret: str = "",
api_passphrase: str = "", *, channels: list):
"""
Initialize WebSocket client for real-time data streaming.
Parameters:
- url (str): WebSocket URL. Defaults to production feed.
- products (list): List of products to subscribe to (e.g., ["BTC-USD", "ETH-USD"])
- message_type (str): Message type, typically "subscribe"
- mongo_collection: MongoDB collection for data persistence (optional)
- should_print (bool): Whether to print messages to console
- auth (bool): Whether to authenticate for private channels
- api_key (str): API key for authentication
- api_secret (str): API secret for authentication
- api_passphrase (str): API passphrase for authentication
- channels (list): Required. List of channels to subscribe to.
Options: ['ticker', 'user', 'matches', 'level2', 'full']
"""Usage Example:
import cbpro
# Public market data streaming
ws_client = cbpro.WebsocketClient(
products=['BTC-USD', 'ETH-USD'],
channels=['ticker', 'matches']
)
# Authenticated streaming for private data
auth_ws_client = cbpro.WebsocketClient(
products=['BTC-USD'],
channels=['user', 'ticker'],
auth=True,
api_key=api_key,
api_secret=api_secret,
api_passphrase=api_passphrase
)
# With MongoDB integration
from pymongo import MongoClient
mongo_client = MongoClient('mongodb://localhost:27017/')
collection = mongo_client.crypto_db.btc_data
ws_client = cbpro.WebsocketClient(
products=['BTC-USD'],
channels=['ticker'],
mongo_collection=collection,
should_print=False
)Control WebSocket connection lifecycle with start, stop, and error handling.
def start(self):
"""
Start the WebSocket connection and begin listening for messages.
Creates background threads for message processing and keepalive.
"""
def close(self):
"""
Close the WebSocket connection and stop all background threads.
Call this method to cleanly disconnect.
"""Usage Example:
# Start streaming
ws_client.start()
try:
# Keep main thread alive while streaming
while True:
time.sleep(1)
# Can check ws_client.error for connection issues
if ws_client.error:
print(f"WebSocket error: {ws_client.error}")
break
except KeyboardInterrupt:
print("Stopping WebSocket client...")
finally:
ws_client.close()Override event handler methods to customize message processing and connection behavior.
def on_open(self):
"""
Called once, immediately before the socket connection is made.
Override this method to set initial parameters or perform setup.
"""
def on_message(self, msg: dict):
"""
Called once for every message that arrives.
Override this method to process incoming messages.
Parameters:
- msg (dict): Message data containing channel-specific information
"""
def on_close(self):
"""
Called once when the WebSocket connection is closed.
Override this method to perform cleanup or logging.
"""
def on_error(self, e: Exception, data = None):
"""
Called when an error occurs during WebSocket operation.
Override this method to handle errors appropriately.
Parameters:
- e (Exception): The exception that occurred
- data: Additional error data (optional)
"""Usage Example:
class CustomWebsocketClient(cbpro.WebsocketClient):
def __init__(self):
super().__init__(
products=['BTC-USD', 'ETH-USD'],
channels=['ticker', 'matches']
)
self.message_count = 0
self.prices = {}
def on_open(self):
print("WebSocket connection established")
print(f"Subscribed to: {self.products}")
def on_message(self, msg):
self.message_count += 1
if msg.get('type') == 'ticker':
product = msg.get('product_id')
price = float(msg.get('price', 0))
self.prices[product] = price
print(f"{product}: ${price:,.2f}")
elif msg.get('type') == 'match':
product = msg.get('product_id')
size = msg.get('size')
price = msg.get('price')
side = msg.get('side')
print(f"Trade: {product} {side} {size} @ ${price}")
def on_close(self):
print(f"Connection closed. Processed {self.message_count} messages")
def on_error(self, e, data=None):
print(f"WebSocket error: {e}")
if data:
print(f"Error data: {data}")
# Use custom client
custom_client = CustomWebsocketClient()
custom_client.start()Real-time price updates and 24-hour statistics.
Channel: ticker
Authentication: Not required
Message Format:
{
"type": "ticker",
"sequence": 5928281084,
"product_id": "BTC-USD",
"price": "43000.00",
"open_24h": "42500.00",
"volume_24h": "1234.56789",
"low_24h": "42000.00",
"high_24h": "44000.00",
"volume_30d": "12345.67890",
"best_bid": "42999.99",
"best_ask": "43000.01",
"side": "buy",
"time": "2023-01-01T12:00:00.000000Z",
"trade_id": 123456789,
"last_size": "0.001"
}Real-time trade execution data.
Channel: matches
Authentication: Not required
Message Format:
{
"type": "match",
"trade_id": 123456789,
"sequence": 5928281084,
"maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8",
"taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1",
"time": "2023-01-01T12:00:00.000000Z",
"product_id": "BTC-USD",
"size": "0.001",
"price": "43000.00",
"side": "buy"
}Order book updates with top 50 bids and asks.
Channel: level2
Authentication: Not required
Message Format:
# Snapshot message (initial)
{
"type": "snapshot",
"product_id": "BTC-USD",
"bids": [["43000.00", "1.5"], ["42999.99", "2.0"]],
"asks": [["43000.01", "0.5"], ["43000.02", "1.0"]]
}
# Update message (changes)
{
"type": "l2update",
"product_id": "BTC-USD",
"time": "2023-01-01T12:00:00.000000Z",
"changes": [
["buy", "43000.00", "1.2"], # [side, price, new_size]
["sell", "43000.02", "0.0"] # size "0.0" means removed
]
}Private account and order updates (requires authentication).
Channel: user
Authentication: Required
Message Format:
# Order received
{
"type": "received",
"time": "2023-01-01T12:00:00.000000Z",
"product_id": "BTC-USD",
"sequence": 5928281084,
"order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",
"size": "0.001",
"price": "43000.00",
"side": "buy",
"order_type": "limit"
}
# Order filled
{
"type": "match",
"time": "2023-01-01T12:00:00.000000Z",
"product_id": "BTC-USD",
"sequence": 5928281085,
"order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",
"trade_id": 123456789,
"size": "0.001",
"price": "43000.00",
"side": "buy",
"liquidity": "T",
"fee": "1.075",
"funds": "43.001075"
}Complete order book with all orders (Level 3).
Channel: full
Authentication: Not required (but rate limited)
Note: Only recommended for maintaining full real-time order books. Abuse via polling will result in access limitations.
class PriceMonitor(cbpro.WebsocketClient):
def __init__(self, product_id, alert_price):
super().__init__(
products=[product_id],
channels=['ticker'],
should_print=False
)
self.product_id = product_id
self.alert_price = alert_price
self.last_price = None
def on_message(self, msg):
if msg.get('type') == 'ticker' and msg.get('product_id') == self.product_id:
current_price = float(msg.get('price', 0))
if self.last_price and current_price >= self.alert_price and self.last_price < self.alert_price:
print(f"ALERT: {self.product_id} crossed ${self.alert_price}!")
print(f"Current price: ${current_price}")
self.last_price = current_price
# Monitor BTC price
price_monitor = PriceMonitor('BTC-USD', 45000.0)
price_monitor.start()class VolumeAnalyzer(cbpro.WebsocketClient):
def __init__(self, product_id):
super().__init__(
products=[product_id],
channels=['matches'],
should_print=False
)
self.product_id = product_id
self.trades = []
self.volume_1min = 0
self.last_minute = None
def on_message(self, msg):
if msg.get('type') == 'match' and msg.get('product_id') == self.product_id:
size = float(msg.get('size', 0))
price = float(msg.get('price', 0))
timestamp = msg.get('time')
# Track volume per minute
current_minute = timestamp[:16] # YYYY-MM-DDTHH:MM
if self.last_minute != current_minute:
if self.last_minute:
print(f"Volume {self.last_minute}: {self.volume_1min:.4f} {self.product_id.split('-')[0]}")
self.volume_1min = 0
self.last_minute = current_minute
self.volume_1min += size
self.trades.append({
'time': timestamp,
'price': price,
'size': size,
'value': price * size
})
volume_analyzer = VolumeAnalyzer('BTC-USD')
volume_analyzer.start()from pymongo import MongoClient
import cbpro
# Setup MongoDB connection
mongo_client = MongoClient('mongodb://localhost:27017/')
db = mongo_client.crypto_data
btc_collection = db.btc_ticks
# Stream ticker data directly to MongoDB
ws_client = cbpro.WebsocketClient(
products=['BTC-USD'],
channels=['ticker'],
mongo_collection=btc_collection,
should_print=False
)
ws_client.start()
# Query stored data later
from datetime import datetime, timedelta
recent_data = btc_collection.find({
'time': {'$gte': (datetime.utcnow() - timedelta(hours=1)).isoformat()}
}).sort('time', 1)
for tick in recent_data:
print(f"Price: ${tick['price']} at {tick['time']}")The WebSocketClient includes built-in error handling and keepalive mechanisms:
error attributeFor production applications, implement reconnection logic:
def run_websocket_with_reconnect(ws_class, max_retries=5):
retries = 0
while retries < max_retries:
ws_client = ws_class()
ws_client.start()
# Monitor for errors
while not ws_client.error:
time.sleep(1)
print(f"WebSocket error occurred: {ws_client.error}")
ws_client.close()
retries += 1
if retries < max_retries:
print(f"Reconnecting in 5 seconds... (attempt {retries + 1}/{max_retries})")
time.sleep(5)
else:
print("Max retries exceeded. Giving up.")
breakon_message processing fast to avoid blockingInstall with Tessl CLI
npx tessl i tessl/pypi-cbpro