or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authenticated-client.mdindex.mdorder-book.mdpublic-client.mdwebsocket-client.md

websocket-client.mddocs/

0

# Real-time Data Streaming

1

2

The WebsocketClient provides real-time streaming of market data, order updates, and account changes via WebSocket connections. It supports multiple channels, authentication for private data, and optional MongoDB integration for data persistence.

3

4

## Capabilities

5

6

### Client Initialization

7

8

Create a WebSocket client for real-time data streaming with customizable channels and message handling.

9

10

```python { .api }

11

class WebsocketClient:

12

def __init__(self, url: str = "wss://ws-feed.pro.coinbase.com",

13

products: list = None, message_type: str = "subscribe",

14

mongo_collection = None, should_print: bool = True,

15

auth: bool = False, api_key: str = "", api_secret: str = "",

16

api_passphrase: str = "", *, channels: list):

17

"""

18

Initialize WebSocket client for real-time data streaming.

19

20

Parameters:

21

- url (str): WebSocket URL. Defaults to production feed.

22

- products (list): List of products to subscribe to (e.g., ["BTC-USD", "ETH-USD"])

23

- message_type (str): Message type, typically "subscribe"

24

- mongo_collection: MongoDB collection for data persistence (optional)

25

- should_print (bool): Whether to print messages to console

26

- auth (bool): Whether to authenticate for private channels

27

- api_key (str): API key for authentication

28

- api_secret (str): API secret for authentication

29

- api_passphrase (str): API passphrase for authentication

30

- channels (list): Required. List of channels to subscribe to.

31

Options: ['ticker', 'user', 'matches', 'level2', 'full']

32

"""

33

```

34

35

**Usage Example:**

36

```python

37

import cbpro

38

39

# Public market data streaming

40

ws_client = cbpro.WebsocketClient(

41

products=['BTC-USD', 'ETH-USD'],

42

channels=['ticker', 'matches']

43

)

44

45

# Authenticated streaming for private data

46

auth_ws_client = cbpro.WebsocketClient(

47

products=['BTC-USD'],

48

channels=['user', 'ticker'],

49

auth=True,

50

api_key=api_key,

51

api_secret=api_secret,

52

api_passphrase=api_passphrase

53

)

54

55

# With MongoDB integration

56

from pymongo import MongoClient

57

mongo_client = MongoClient('mongodb://localhost:27017/')

58

collection = mongo_client.crypto_db.btc_data

59

60

ws_client = cbpro.WebsocketClient(

61

products=['BTC-USD'],

62

channels=['ticker'],

63

mongo_collection=collection,

64

should_print=False

65

)

66

```

67

68

### Connection Management

69

70

Control WebSocket connection lifecycle with start, stop, and error handling.

71

72

```python { .api }

73

def start(self):

74

"""

75

Start the WebSocket connection and begin listening for messages.

76

Creates background threads for message processing and keepalive.

77

"""

78

79

def close(self):

80

"""

81

Close the WebSocket connection and stop all background threads.

82

Call this method to cleanly disconnect.

83

"""

84

```

85

86

**Usage Example:**

87

```python

88

# Start streaming

89

ws_client.start()

90

91

try:

92

# Keep main thread alive while streaming

93

while True:

94

time.sleep(1)

95

# Can check ws_client.error for connection issues

96

if ws_client.error:

97

print(f"WebSocket error: {ws_client.error}")

98

break

99

except KeyboardInterrupt:

100

print("Stopping WebSocket client...")

101

finally:

102

ws_client.close()

103

```

104

105

### Event Handlers

106

107

Override event handler methods to customize message processing and connection behavior.

108

109

```python { .api }

110

def on_open(self):

111

"""

112

Called once, immediately before the socket connection is made.

113

Override this method to set initial parameters or perform setup.

114

"""

115

116

def on_message(self, msg: dict):

117

"""

118

Called once for every message that arrives.

119

Override this method to process incoming messages.

120

121

Parameters:

122

- msg (dict): Message data containing channel-specific information

123

"""

124

125

def on_close(self):

126

"""

127

Called once when the WebSocket connection is closed.

128

Override this method to perform cleanup or logging.

129

"""

130

131

def on_error(self, e: Exception, data = None):

132

"""

133

Called when an error occurs during WebSocket operation.

134

Override this method to handle errors appropriately.

135

136

Parameters:

137

- e (Exception): The exception that occurred

138

- data: Additional error data (optional)

139

"""

140

```

141

142

**Usage Example:**

143

```python

144

class CustomWebsocketClient(cbpro.WebsocketClient):

145

def __init__(self):

146

super().__init__(

147

products=['BTC-USD', 'ETH-USD'],

148

channels=['ticker', 'matches']

149

)

150

self.message_count = 0

151

self.prices = {}

152

153

def on_open(self):

154

print("WebSocket connection established")

155

print(f"Subscribed to: {self.products}")

156

157

def on_message(self, msg):

158

self.message_count += 1

159

160

if msg.get('type') == 'ticker':

161

product = msg.get('product_id')

162

price = float(msg.get('price', 0))

163

self.prices[product] = price

164

print(f"{product}: ${price:,.2f}")

165

166

elif msg.get('type') == 'match':

167

product = msg.get('product_id')

168

size = msg.get('size')

169

price = msg.get('price')

170

side = msg.get('side')

171

print(f"Trade: {product} {side} {size} @ ${price}")

172

173

def on_close(self):

174

print(f"Connection closed. Processed {self.message_count} messages")

175

176

def on_error(self, e, data=None):

177

print(f"WebSocket error: {e}")

178

if data:

179

print(f"Error data: {data}")

180

181

# Use custom client

182

custom_client = CustomWebsocketClient()

183

custom_client.start()

184

```

185

186

## Channel Types and Message Formats

187

188

### Ticker Channel

189

190

Real-time price updates and 24-hour statistics.

191

192

**Channel:** `ticker`

193

**Authentication:** Not required

194

195

**Message Format:**

196

```python

197

{

198

"type": "ticker",

199

"sequence": 5928281084,

200

"product_id": "BTC-USD",

201

"price": "43000.00",

202

"open_24h": "42500.00",

203

"volume_24h": "1234.56789",

204

"low_24h": "42000.00",

205

"high_24h": "44000.00",

206

"volume_30d": "12345.67890",

207

"best_bid": "42999.99",

208

"best_ask": "43000.01",

209

"side": "buy",

210

"time": "2023-01-01T12:00:00.000000Z",

211

"trade_id": 123456789,

212

"last_size": "0.001"

213

}

214

```

215

216

### Matches Channel

217

218

Real-time trade execution data.

219

220

**Channel:** `matches`

221

**Authentication:** Not required

222

223

**Message Format:**

224

```python

225

{

226

"type": "match",

227

"trade_id": 123456789,

228

"sequence": 5928281084,

229

"maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8",

230

"taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1",

231

"time": "2023-01-01T12:00:00.000000Z",

232

"product_id": "BTC-USD",

233

"size": "0.001",

234

"price": "43000.00",

235

"side": "buy"

236

}

237

```

238

239

### Level2 Channel

240

241

Order book updates with top 50 bids and asks.

242

243

**Channel:** `level2`

244

**Authentication:** Not required

245

246

**Message Format:**

247

```python

248

# Snapshot message (initial)

249

{

250

"type": "snapshot",

251

"product_id": "BTC-USD",

252

"bids": [["43000.00", "1.5"], ["42999.99", "2.0"]],

253

"asks": [["43000.01", "0.5"], ["43000.02", "1.0"]]

254

}

255

256

# Update message (changes)

257

{

258

"type": "l2update",

259

"product_id": "BTC-USD",

260

"time": "2023-01-01T12:00:00.000000Z",

261

"changes": [

262

["buy", "43000.00", "1.2"], # [side, price, new_size]

263

["sell", "43000.02", "0.0"] # size "0.0" means removed

264

]

265

}

266

```

267

268

### User Channel

269

270

Private account and order updates (requires authentication).

271

272

**Channel:** `user`

273

**Authentication:** Required

274

275

**Message Format:**

276

```python

277

# Order received

278

{

279

"type": "received",

280

"time": "2023-01-01T12:00:00.000000Z",

281

"product_id": "BTC-USD",

282

"sequence": 5928281084,

283

"order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",

284

"size": "0.001",

285

"price": "43000.00",

286

"side": "buy",

287

"order_type": "limit"

288

}

289

290

# Order filled

291

{

292

"type": "match",

293

"time": "2023-01-01T12:00:00.000000Z",

294

"product_id": "BTC-USD",

295

"sequence": 5928281085,

296

"order_id": "d50ec984-77a8-460a-b958-66f114b0de9b",

297

"trade_id": 123456789,

298

"size": "0.001",

299

"price": "43000.00",

300

"side": "buy",

301

"liquidity": "T",

302

"fee": "1.075",

303

"funds": "43.001075"

304

}

305

```

306

307

### Full Channel

308

309

Complete order book with all orders (Level 3).

310

311

**Channel:** `full`

312

**Authentication:** Not required (but rate limited)

313

314

**Note:** Only recommended for maintaining full real-time order books. Abuse via polling will result in access limitations.

315

316

## Advanced Usage Patterns

317

318

### Price Monitoring and Alerts

319

320

```python

321

class PriceMonitor(cbpro.WebsocketClient):

322

def __init__(self, product_id, alert_price):

323

super().__init__(

324

products=[product_id],

325

channels=['ticker'],

326

should_print=False

327

)

328

self.product_id = product_id

329

self.alert_price = alert_price

330

self.last_price = None

331

332

def on_message(self, msg):

333

if msg.get('type') == 'ticker' and msg.get('product_id') == self.product_id:

334

current_price = float(msg.get('price', 0))

335

336

if self.last_price and current_price >= self.alert_price and self.last_price < self.alert_price:

337

print(f"ALERT: {self.product_id} crossed ${self.alert_price}!")

338

print(f"Current price: ${current_price}")

339

340

self.last_price = current_price

341

342

# Monitor BTC price

343

price_monitor = PriceMonitor('BTC-USD', 45000.0)

344

price_monitor.start()

345

```

346

347

### Trade Volume Analysis

348

349

```python

350

class VolumeAnalyzer(cbpro.WebsocketClient):

351

def __init__(self, product_id):

352

super().__init__(

353

products=[product_id],

354

channels=['matches'],

355

should_print=False

356

)

357

self.product_id = product_id

358

self.trades = []

359

self.volume_1min = 0

360

self.last_minute = None

361

362

def on_message(self, msg):

363

if msg.get('type') == 'match' and msg.get('product_id') == self.product_id:

364

size = float(msg.get('size', 0))

365

price = float(msg.get('price', 0))

366

timestamp = msg.get('time')

367

368

# Track volume per minute

369

current_minute = timestamp[:16] # YYYY-MM-DDTHH:MM

370

371

if self.last_minute != current_minute:

372

if self.last_minute:

373

print(f"Volume {self.last_minute}: {self.volume_1min:.4f} {self.product_id.split('-')[0]}")

374

self.volume_1min = 0

375

self.last_minute = current_minute

376

377

self.volume_1min += size

378

self.trades.append({

379

'time': timestamp,

380

'price': price,

381

'size': size,

382

'value': price * size

383

})

384

385

volume_analyzer = VolumeAnalyzer('BTC-USD')

386

volume_analyzer.start()

387

```

388

389

### MongoDB Data Persistence

390

391

```python

392

from pymongo import MongoClient

393

import cbpro

394

395

# Setup MongoDB connection

396

mongo_client = MongoClient('mongodb://localhost:27017/')

397

db = mongo_client.crypto_data

398

btc_collection = db.btc_ticks

399

400

# Stream ticker data directly to MongoDB

401

ws_client = cbpro.WebsocketClient(

402

products=['BTC-USD'],

403

channels=['ticker'],

404

mongo_collection=btc_collection,

405

should_print=False

406

)

407

408

ws_client.start()

409

410

# Query stored data later

411

from datetime import datetime, timedelta

412

recent_data = btc_collection.find({

413

'time': {'$gte': (datetime.utcnow() - timedelta(hours=1)).isoformat()}

414

}).sort('time', 1)

415

416

for tick in recent_data:

417

print(f"Price: ${tick['price']} at {tick['time']}")

418

```

419

420

## Error Handling and Reconnection

421

422

The WebSocketClient includes built-in error handling and keepalive mechanisms:

423

424

- **Automatic Keepalive**: Sends ping messages every 30 seconds to maintain connection

425

- **Error Recovery**: Captures connection errors and provides them via the `error` attribute

426

- **Clean Disconnect**: Properly closes connections and joins background threads

427

- **Message Validation**: Handles malformed JSON messages gracefully

428

429

For production applications, implement reconnection logic:

430

431

```python

432

def run_websocket_with_reconnect(ws_class, max_retries=5):

433

retries = 0

434

while retries < max_retries:

435

ws_client = ws_class()

436

ws_client.start()

437

438

# Monitor for errors

439

while not ws_client.error:

440

time.sleep(1)

441

442

print(f"WebSocket error occurred: {ws_client.error}")

443

ws_client.close()

444

445

retries += 1

446

if retries < max_retries:

447

print(f"Reconnecting in 5 seconds... (attempt {retries + 1}/{max_retries})")

448

time.sleep(5)

449

else:

450

print("Max retries exceeded. Giving up.")

451

break

452

```

453

454

## Performance Considerations

455

456

- **Channel Selection**: Only subscribe to needed channels to reduce bandwidth

457

- **Product Filtering**: Limit products to reduce message volume

458

- **Message Processing**: Keep `on_message` processing fast to avoid blocking

459

- **Memory Management**: Implement data rotation for long-running applications

460

- **Rate Limiting**: Respect WebSocket rate limits to avoid disconnection