0
# Real-time Data Streaming
1
2
The WebsocketClient provides real-time streaming of market data, order updates, and account changes via WebSocket connections. It supports multiple channels, authentication for private data, and optional MongoDB integration for data persistence.
3
4
## Capabilities
5
6
### Client Initialization
7
8
Create a WebSocket client for real-time data streaming with customizable channels and message handling.
9
10
```python { .api }
11
class WebsocketClient:
12
def __init__(self, url: str = "wss://ws-feed.pro.coinbase.com",
13
products: list = None, message_type: str = "subscribe",
14
mongo_collection = None, should_print: bool = True,
15
auth: bool = False, api_key: str = "", api_secret: str = "",
16
api_passphrase: str = "", *, channels: list):
17
"""
18
Initialize WebSocket client for real-time data streaming.
19
20
Parameters:
21
- url (str): WebSocket URL. Defaults to production feed.
22
- products (list): List of products to subscribe to (e.g., ["BTC-USD", "ETH-USD"])
23
- message_type (str): Message type, typically "subscribe"
24
- mongo_collection: MongoDB collection for data persistence (optional)
25
- should_print (bool): Whether to print messages to console
26
- auth (bool): Whether to authenticate for private channels
27
- api_key (str): API key for authentication
28
- api_secret (str): API secret for authentication
29
- api_passphrase (str): API passphrase for authentication
30
- channels (list): Required. List of channels to subscribe to.
31
Options: ['ticker', 'user', 'matches', 'level2', 'full']
32
"""
33
```
34
35
**Usage Example:**
36
```python
37
import cbpro
38
39
# Public market data streaming
40
ws_client = cbpro.WebsocketClient(
41
products=['BTC-USD', 'ETH-USD'],
42
channels=['ticker', 'matches']
43
)
44
45
# Authenticated streaming for private data
46
auth_ws_client = cbpro.WebsocketClient(
47
products=['BTC-USD'],
48
channels=['user', 'ticker'],
49
auth=True,
50
api_key=api_key,
51
api_secret=api_secret,
52
api_passphrase=api_passphrase
53
)
54
55
# With MongoDB integration
56
from pymongo import MongoClient
57
mongo_client = MongoClient('mongodb://localhost:27017/')
58
collection = mongo_client.crypto_db.btc_data
59
60
ws_client = cbpro.WebsocketClient(
61
products=['BTC-USD'],
62
channels=['ticker'],
63
mongo_collection=collection,
64
should_print=False
65
)
66
```
67
68
### Connection Management
69
70
Control WebSocket connection lifecycle with start, stop, and error handling.
71
72
```python { .api }
73
def start(self):
74
"""
75
Start the WebSocket connection and begin listening for messages.
76
Creates background threads for message processing and keepalive.
77
"""
78
79
def close(self):
80
"""
81
Close the WebSocket connection and stop all background threads.
82
Call this method to cleanly disconnect.
83
"""
84
```
85
86
**Usage Example:**
87
```python
88
# Start streaming
89
ws_client.start()
90
91
try:
92
# Keep main thread alive while streaming
93
while True:
94
time.sleep(1)
95
# Can check ws_client.error for connection issues
96
if ws_client.error:
97
print(f"WebSocket error: {ws_client.error}")
98
break
99
except KeyboardInterrupt:
100
print("Stopping WebSocket client...")
101
finally:
102
ws_client.close()
103
```
104
105
### Event Handlers
106
107
Override event handler methods to customize message processing and connection behavior.
108
109
```python { .api }
110
def on_open(self):
111
"""
112
Called once, immediately before the socket connection is made.
113
Override this method to set initial parameters or perform setup.
114
"""
115
116
def on_message(self, msg: dict):
117
"""
118
Called once for every message that arrives.
119
Override this method to process incoming messages.
120
121
Parameters:
122
- msg (dict): Message data containing channel-specific information
123
"""
124
125
def on_close(self):
126
"""
127
Called once when the WebSocket connection is closed.
128
Override this method to perform cleanup or logging.
129
"""
130
131
def on_error(self, e: Exception, data = None):
132
"""
133
Called when an error occurs during WebSocket operation.
134
Override this method to handle errors appropriately.
135
136
Parameters:
137
- e (Exception): The exception that occurred
138
- data: Additional error data (optional)
139
"""
140
```
141
142
**Usage Example:**
143
```python
144
class CustomWebsocketClient(cbpro.WebsocketClient):
145
def __init__(self):
146
super().__init__(
147
products=['BTC-USD', 'ETH-USD'],
148
channels=['ticker', 'matches']
149
)
150
self.message_count = 0
151
self.prices = {}
152
153
def on_open(self):
154
print("WebSocket connection established")
155
print(f"Subscribed to: {self.products}")
156
157
def on_message(self, msg):
158
self.message_count += 1
159
160
if msg.get('type') == 'ticker':
161
product = msg.get('product_id')
162
price = float(msg.get('price', 0))
163
self.prices[product] = price
164
print(f"{product}: ${price:,.2f}")
165
166
elif msg.get('type') == 'match':
167
product = msg.get('product_id')
168
size = msg.get('size')
169
price = msg.get('price')
170
side = msg.get('side')
171
print(f"Trade: {product} {side} {size} @ ${price}")
172
173
def on_close(self):
174
print(f"Connection closed. Processed {self.message_count} messages")
175
176
def on_error(self, e, data=None):
177
print(f"WebSocket error: {e}")
178
if data:
179
print(f"Error data: {data}")
180
181
# Use custom client
182
custom_client = CustomWebsocketClient()
183
custom_client.start()
184
```
185
186
## Channel Types and Message Formats
187
188
### Ticker Channel
189
190
Real-time price updates and 24-hour statistics.
191
192
**Channel:** `ticker`
193
**Authentication:** Not required
194
195
**Message Format:**
196
```python
197
{
198
"type": "ticker",
199
"sequence": 5928281084,
200
"product_id": "BTC-USD",
201
"price": "43000.00",
202
"open_24h": "42500.00",
203
"volume_24h": "1234.56789",
204
"low_24h": "42000.00",
205
"high_24h": "44000.00",
206
"volume_30d": "12345.67890",
207
"best_bid": "42999.99",
208
"best_ask": "43000.01",
209
"side": "buy",
210
"time": "2023-01-01T12:00:00.000000Z",
211
"trade_id": 123456789,
212
"last_size": "0.001"
213
}
214
```
215
216
### Matches Channel
217
218
Real-time trade execution data.
219
220
**Channel:** `matches`
221
**Authentication:** Not required
222
223
**Message Format:**
224
```python
225
{
226
"type": "match",
227
"trade_id": 123456789,
228
"sequence": 5928281084,
229
"maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8",
230
"taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1",
231
"time": "2023-01-01T12:00:00.000000Z",
232
"product_id": "BTC-USD",
233
"size": "0.001",
234
"price": "43000.00",
235
"side": "buy"
236
}
237
```
238
239
### Level2 Channel
240
241
Order book updates with top 50 bids and asks.
242
243
**Channel:** `level2`
244
**Authentication:** Not required
245
246
**Message Format:**
247
```python
248
# Snapshot message (initial)
249
{
250
"type": "snapshot",
251
"product_id": "BTC-USD",
252
"bids": [["43000.00", "1.5"], ["42999.99", "2.0"]],
253
"asks": [["43000.01", "0.5"], ["43000.02", "1.0"]]
254
}
255
256
# Update message (changes)
257
{
258
"type": "l2update",
259
"product_id": "BTC-USD",
260
"time": "2023-01-01T12:00:00.000000Z",
261
"changes": [
262
["buy", "43000.00", "1.2"], # [side, price, new_size]
263
["sell", "43000.02", "0.0"] # size "0.0" means removed
264
]
265
}
266
```
267
268
### User Channel
269
270
Private account and order updates (requires authentication).
271
272
**Channel:** `user`
273
**Authentication:** Required
274
275
**Message Format:**
276
```python
277
# Order received
278
{
279
"type": "received",
280
"time": "2023-01-01T12:00:00.000000Z",
281
"product_id": "BTC-USD",
282
"sequence": 5928281084,
283
"order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",
284
"size": "0.001",
285
"price": "43000.00",
286
"side": "buy",
287
"order_type": "limit"
288
}
289
290
# Order filled
291
{
292
"type": "match",
293
"time": "2023-01-01T12:00:00.000000Z",
294
"product_id": "BTC-USD",
295
"sequence": 5928281085,
296
"order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",
297
"trade_id": 123456789,
298
"size": "0.001",
299
"price": "43000.00",
300
"side": "buy",
301
"liquidity": "T",
302
"fee": "1.075",
303
"funds": "43.001075"
304
}
305
```
306
307
### Full Channel
308
309
Complete order book with all orders (Level 3).
310
311
**Channel:** `full`
312
**Authentication:** Not required (but rate limited)
313
314
**Note:** Only recommended for maintaining full real-time order books. Abuse via polling will result in access limitations.
315
316
## Advanced Usage Patterns
317
318
### Price Monitoring and Alerts
319
320
```python
321
class PriceMonitor(cbpro.WebsocketClient):
322
def __init__(self, product_id, alert_price):
323
super().__init__(
324
products=[product_id],
325
channels=['ticker'],
326
should_print=False
327
)
328
self.product_id = product_id
329
self.alert_price = alert_price
330
self.last_price = None
331
332
def on_message(self, msg):
333
if msg.get('type') == 'ticker' and msg.get('product_id') == self.product_id:
334
current_price = float(msg.get('price', 0))
335
336
if self.last_price and current_price >= self.alert_price and self.last_price < self.alert_price:
337
print(f"ALERT: {self.product_id} crossed ${self.alert_price}!")
338
print(f"Current price: ${current_price}")
339
340
self.last_price = current_price
341
342
# Monitor BTC price
343
price_monitor = PriceMonitor('BTC-USD', 45000.0)
344
price_monitor.start()
345
```
346
347
### Trade Volume Analysis
348
349
```python
350
class VolumeAnalyzer(cbpro.WebsocketClient):
351
def __init__(self, product_id):
352
super().__init__(
353
products=[product_id],
354
channels=['matches'],
355
should_print=False
356
)
357
self.product_id = product_id
358
self.trades = []
359
self.volume_1min = 0
360
self.last_minute = None
361
362
def on_message(self, msg):
363
if msg.get('type') == 'match' and msg.get('product_id') == self.product_id:
364
size = float(msg.get('size', 0))
365
price = float(msg.get('price', 0))
366
timestamp = msg.get('time')
367
368
# Track volume per minute
369
current_minute = timestamp[:16] # YYYY-MM-DDTHH:MM
370
371
if self.last_minute != current_minute:
372
if self.last_minute:
373
print(f"Volume {self.last_minute}: {self.volume_1min:.4f} {self.product_id.split('-')[0]}")
374
self.volume_1min = 0
375
self.last_minute = current_minute
376
377
self.volume_1min += size
378
self.trades.append({
379
'time': timestamp,
380
'price': price,
381
'size': size,
382
'value': price * size
383
})
384
385
volume_analyzer = VolumeAnalyzer('BTC-USD')
386
volume_analyzer.start()
387
```
388
389
### MongoDB Data Persistence
390
391
```python
392
from pymongo import MongoClient
393
import cbpro
394
395
# Setup MongoDB connection
396
mongo_client = MongoClient('mongodb://localhost:27017/')
397
db = mongo_client.crypto_data
398
btc_collection = db.btc_ticks
399
400
# Stream ticker data directly to MongoDB
401
ws_client = cbpro.WebsocketClient(
402
products=['BTC-USD'],
403
channels=['ticker'],
404
mongo_collection=btc_collection,
405
should_print=False
406
)
407
408
ws_client.start()
409
410
# Query stored data later
411
from datetime import datetime, timedelta
412
recent_data = btc_collection.find({
413
'time': {'$gte': (datetime.utcnow() - timedelta(hours=1)).isoformat()}
414
}).sort('time', 1)
415
416
for tick in recent_data:
417
print(f"Price: ${tick['price']} at {tick['time']}")
418
```
419
420
## Error Handling and Reconnection
421
422
The WebSocketClient includes built-in error handling and keepalive mechanisms:
423
424
- **Automatic Keepalive**: Sends ping messages every 30 seconds to maintain connection
425
- **Error Recovery**: Captures connection errors and provides them via the `error` attribute
426
- **Clean Disconnect**: Properly closes connections and joins background threads
427
- **Message Validation**: Handles malformed JSON messages gracefully
428
429
For production applications, implement reconnection logic:
430
431
```python
432
def run_websocket_with_reconnect(ws_class, max_retries=5):
433
retries = 0
434
while retries < max_retries:
435
ws_client = ws_class()
436
ws_client.start()
437
438
# Monitor for errors
439
while not ws_client.error:
440
time.sleep(1)
441
442
print(f"WebSocket error occurred: {ws_client.error}")
443
ws_client.close()
444
445
retries += 1
446
if retries < max_retries:
447
print(f"Reconnecting in 5 seconds... (attempt {retries + 1}/{max_retries})")
448
time.sleep(5)
449
else:
450
print("Max retries exceeded. Giving up.")
451
break
452
```
453
454
## Performance Considerations
455
456
- **Channel Selection**: Only subscribe to needed channels to reduce bandwidth
457
- **Product Filtering**: Limit products to reduce message volume
458
- **Message Processing**: Keep `on_message` processing fast to avoid blocking
459
- **Memory Management**: Implement data rotation for long-running applications
460
- **Rate Limiting**: Respect WebSocket rate limits to avoid disconnection