0
# Storage Backends
1
2
Built-in backend implementations for storing and streaming cryptocurrency market data to various databases, message queues, and other systems with minimal setup and configuration.
3
4
## Capabilities
5
6
### Redis Backends
7
8
Redis-based storage and streaming backends supporting both data storage and real-time streaming.
9
10
```python { .api }
11
# Redis Storage (Sorted Sets)
12
class TradeRedis(BackendCallback): ...
13
class TickerRedis(BackendCallback): ...
14
class FundingRedis(BackendCallback): ...
15
class BookRedis(BackendCallback): ...
16
class BookSnapshotRedisKey(BackendCallback): ...
17
class CandlesRedis(BackendCallback): ...
18
class OpenInterestRedis(BackendCallback): ...
19
class LiquidationsRedis(BackendCallback): ...
20
class OrderInfoRedis(BackendCallback): ...
21
class TransactionsRedis(BackendCallback): ...
22
class BalancesRedis(BackendCallback): ...
23
class FillsRedis(BackendCallback): ...
24
25
# Redis Streams
26
class TradeStream(BackendCallback): ...
27
class TickerStream(BackendCallback): ...
28
class FundingStream(BackendCallback): ...
29
class BookStream(BackendCallback): ...
30
class CandlesStream(BackendCallback): ...
31
class OpenInterestStream(BackendCallback): ...
32
class LiquidationsStream(BackendCallback): ...
33
class OrderInfoStream(BackendCallback): ...
34
class TransactionsStream(BackendCallback): ...
35
class BalancesStream(BackendCallback): ...
36
class FillsStream(BackendCallback): ...
37
```
38
39
### Database Backends
40
41
Database storage backends for persistent data storage with support for time-series and relational databases.
42
43
```python { .api }
44
# MongoDB
45
class TradeMongo(BackendCallback): ...
46
class TickerMongo(BackendCallback): ...
47
class FundingMongo(BackendCallback): ...
48
class BookMongo(BackendCallback): ...
49
class CandlesMongo(BackendCallback): ...
50
class OpenInterestMongo(BackendCallback): ...
51
class LiquidationsMongo(BackendCallback): ...
52
53
# PostgreSQL
54
class TradePostgres(BackendCallback): ...
55
class TickerPostgres(BackendCallback): ...
56
class FundingPostgres(BackendCallback): ...
57
class BookPostgres(BackendCallback): ...
58
class CandlesPostgres(BackendCallback): ...
59
class OpenInterestPostgres(BackendCallback): ...
60
class LiquidationsPostgres(BackendCallback): ...
61
62
# InfluxDB (Time Series)
63
class TradeInflux(BackendCallback): ...
64
class TickerInflux(BackendCallback): ...
65
class FundingInflux(BackendCallback): ...
66
class BookInflux(BackendCallback): ...
67
class CandlesInflux(BackendCallback): ...
68
class OpenInterestInflux(BackendCallback): ...
69
class LiquidationsInflux(BackendCallback): ...
70
```
71
72
### Message Queue Backends
73
74
Message queue and streaming backends for real-time data distribution.
75
76
```python { .api }
77
# Apache Kafka
78
class TradeKafka(BackendCallback): ...
79
class TickerKafka(BackendCallback): ...
80
class FundingKafka(BackendCallback): ...
81
class BookKafka(BackendCallback): ...
82
class CandlesKafka(BackendCallback): ...
83
class OpenInterestKafka(BackendCallback): ...
84
class LiquidationsKafka(BackendCallback): ...
85
86
# RabbitMQ
87
class TradeRabbitMQ(BackendCallback): ...
88
class TickerRabbitMQ(BackendCallback): ...
89
class FundingRabbitMQ(BackendCallback): ...
90
class BookRabbitMQ(BackendCallback): ...
91
92
# ZeroMQ
93
class TradeZMQ(BackendCallback): ...
94
class TickerZMQ(BackendCallback): ...
95
class FundingZMQ(BackendCallback): ...
96
class BookZMQ(BackendCallback): ...
97
```
98
99
### Socket and Network Backends
100
101
Network-based backends for real-time data streaming over various protocols.
102
103
```python { .api }
104
# TCP/UDP Sockets
105
class TradeSocket(BackendCallback): ...
106
class TickerSocket(BackendCallback): ...
107
class FundingSocket(BackendCallback): ...
108
class BookSocket(BackendCallback): ...
109
110
# HTTP Callbacks
111
class HTTPCallback(BackendCallback): ...
112
```
113
114
### Cloud and Specialized Backends
115
116
Cloud service and specialized storage backends.
117
118
```python { .api }
119
# Google Cloud Pub/Sub
120
class TradeGCPPubSub(BackendCallback): ...
121
class TickerGCPPubSub(BackendCallback): ...
122
class FundingGCPPubSub(BackendCallback): ...
123
class BookGCPPubSub(BackendCallback): ...
124
125
# Arctic (Time Series)
126
class TradeArctic(BackendCallback): ...
127
class TickerArctic(BackendCallback): ...
128
class FundingArctic(BackendCallback): ...
129
class BookArctic(BackendCallback): ...
130
131
# QuestDB
132
class TradeQuest(BackendCallback): ...
133
class TickerQuest(BackendCallback): ...
134
class FundingQuest(BackendCallback): ...
135
class BookQuest(BackendCallback): ...
136
137
# QuasarDB
138
class TradeQuasar(BackendCallback): ...
139
class TickerQuasar(BackendCallback): ...
140
class FundingQuasar(BackendCallback): ...
141
class BookQuasar(BackendCallback): ...
142
143
# Data Aggregation
144
class TradeAggregate(BackendCallback): ...
145
class TickerAggregate(BackendCallback): ...
146
class BookAggregate(BackendCallback): ...
147
```
148
149
### Base Backend Classes
150
151
Base classes for creating custom backends.
152
153
```python { .api }
154
class BackendCallback:
155
"""Base class for all backend callbacks."""
156
def __init__(self, **kwargs): ...
157
async def __call__(self, data, timestamp, receipt_timestamp): ...
158
159
class BackendBookCallback(BackendCallback):
160
"""Base class for order book backends."""
161
def __init__(self, depth=None, **kwargs): ...
162
163
class BackendQueue:
164
"""Base queue implementation for backends."""
165
def __init__(self, **kwargs): ...
166
```
167
168
## Usage Examples
169
170
### Redis Storage
171
172
```python
173
from cryptofeed import FeedHandler
174
from cryptofeed.exchanges import Coinbase
175
from cryptofeed.backends.redis import TradeRedis, BookRedis
176
from cryptofeed.defines import TRADES, L2_BOOK
177
178
fh = FeedHandler()
179
fh.add_feed(Coinbase(
180
symbols=['BTC-USD', 'ETH-USD'],
181
channels=[TRADES, L2_BOOK],
182
callbacks={
183
TRADES: TradeRedis(host='localhost', port=6379, db=0),
184
L2_BOOK: BookRedis(host='localhost', port=6379, db=0, depth=20)
185
}
186
))
187
fh.run()
188
```
189
190
### MongoDB Storage
191
192
```python
193
from cryptofeed import FeedHandler
194
from cryptofeed.exchanges import Binance
195
from cryptofeed.backends.mongo import TradeMongo, CandlesMongo
196
from cryptofeed.defines import TRADES, CANDLES
197
198
fh = FeedHandler()
199
fh.add_feed(Binance(
200
symbols=['BTCUSDT', 'ETHUSDT'],
201
channels=[TRADES, CANDLES],
202
callbacks={
203
TRADES: TradeMongo(
204
host='localhost',
205
db='cryptofeed',
206
collection='trades'
207
),
208
CANDLES: CandlesMongo(
209
host='localhost',
210
db='cryptofeed',
211
collection='candles'
212
)
213
}
214
))
215
fh.run()
216
```
217
218
### PostgreSQL Storage
219
220
```python
221
from cryptofeed import FeedHandler
222
from cryptofeed.exchanges import Kraken
223
from cryptofeed.backends.postgres import TradePostgres, TickerPostgres
224
from cryptofeed.defines import TRADES, TICKER
225
226
fh = FeedHandler()
227
fh.add_feed(Kraken(
228
symbols=['XBT/USD', 'ETH/USD'],
229
channels=[TRADES, TICKER],
230
callbacks={
231
TRADES: TradePostgres(
232
host='localhost',
233
user='postgres',
234
pw='password',
235
db='market_data',
236
table='trades'
237
),
238
TICKER: TickerPostgres(
239
host='localhost',
240
user='postgres',
241
pw='password',
242
db='market_data',
243
table='tickers'
244
)
245
}
246
))
247
fh.run()
248
```
249
250
### Kafka Streaming
251
252
```python
253
from cryptofeed import FeedHandler
254
from cryptofeed.exchanges import Coinbase, Binance
255
from cryptofeed.backends.kafka import TradeKafka, BookKafka
256
from cryptofeed.defines import TRADES, L2_BOOK
257
258
fh = FeedHandler()
259
260
# Stream trades to Kafka
261
fh.add_feed(Coinbase(
262
symbols=['BTC-USD'],
263
channels=[TRADES],
264
callbacks={
265
TRADES: TradeKafka(
266
bootstrap_servers='localhost:9092',
267
topic='crypto-trades'
268
)
269
}
270
))
271
272
# Stream order books to different topic
273
fh.add_feed(Binance(
274
symbols=['BTCUSDT'],
275
channels=[L2_BOOK],
276
callbacks={
277
L2_BOOK: BookKafka(
278
bootstrap_servers='localhost:9092',
279
topic='crypto-books'
280
)
281
}
282
))
283
fh.run()
284
```
285
286
### InfluxDB Time Series
287
288
```python
289
from cryptofeed import FeedHandler
290
from cryptofeed.exchanges import Bitfinex
291
from cryptofeed.backends.influxdb import TradeInflux, CandlesInflux
292
from cryptofeed.defines import TRADES, CANDLES
293
294
fh = FeedHandler()
295
fh.add_feed(Bitfinex(
296
symbols=['BTC/USD', 'ETH/USD'],
297
channels=[TRADES, CANDLES],
298
callbacks={
299
TRADES: TradeInflux(
300
host='localhost',
301
token='your-token',
302
org='your-org',
303
bucket='crypto-trades'
304
),
305
CANDLES: CandlesInflux(
306
host='localhost',
307
token='your-token',
308
org='your-org',
309
bucket='crypto-candles'
310
)
311
}
312
))
313
fh.run()
314
```
315
316
### Multiple Backends
317
318
```python
319
from cryptofeed import FeedHandler
320
from cryptofeed.exchanges import Coinbase
321
from cryptofeed.backends.redis import TradeRedis
322
from cryptofeed.backends.mongo import TradeMongo
323
from cryptofeed.backends.kafka import TradeKafka
324
from cryptofeed.defines import TRADES
325
326
def custom_trade_handler(trade):
327
"""Custom processing logic"""
328
print(f'Processing trade: {trade.symbol} {trade.amount}@{trade.price}')
329
330
fh = FeedHandler()
331
fh.add_feed(Coinbase(
332
symbols=['BTC-USD'],
333
channels=[TRADES],
334
callbacks={
335
TRADES: [
336
TradeRedis(host='localhost'), # Store in Redis
337
TradeMongo(host='localhost'), # Store in MongoDB
338
TradeKafka(topic='trades'), # Stream to Kafka
339
custom_trade_handler # Custom processing
340
]
341
}
342
))
343
fh.run()
344
```
345
346
### Socket Streaming
347
348
```python
349
from cryptofeed import FeedHandler
350
from cryptofeed.exchanges import Binance
351
from cryptofeed.backends.socket import TradeSocket
352
from cryptofeed.defines import TRADES
353
354
# Stream trades over TCP socket
355
fh = FeedHandler()
356
fh.add_feed(Binance(
357
symbols=['BTCUSDT'],
358
channels=[TRADES],
359
callbacks={
360
TRADES: TradeSocket(
361
address='localhost',
362
port=9999,
363
protocol='tcp'
364
)
365
}
366
))
367
fh.run()
368
```
369
370
### HTTP Callbacks
371
372
```python
373
from cryptofeed import FeedHandler
374
from cryptofeed.exchanges import Gemini
375
from cryptofeed.backends.http import HTTPCallback
376
from cryptofeed.defines import TRADES
377
378
fh = FeedHandler()
379
fh.add_feed(Gemini(
380
symbols=['BTC-USD'],
381
channels=[TRADES],
382
callbacks={
383
TRADES: HTTPCallback(
384
url='https://api.example.com/webhook/trades',
385
headers={'Authorization': 'Bearer your-token'}
386
)
387
}
388
))
389
fh.run()
390
```
391
392
### Custom Backend
393
394
```python
395
from cryptofeed.backends import BackendCallback
396
397
class CustomBackend(BackendCallback):
398
def __init__(self, **kwargs):
399
super().__init__(**kwargs)
400
# Initialize custom storage/processing
401
402
async def __call__(self, data, timestamp, receipt_timestamp):
403
# Process the data
404
print(f'Custom processing: {data}')
405
406
# Store or forward data as needed
407
await self.store_data(data)
408
409
async def store_data(self, data):
410
# Custom storage logic
411
pass
412
413
# Use custom backend
414
from cryptofeed import FeedHandler
415
from cryptofeed.exchanges import Coinbase
416
from cryptofeed.defines import TRADES
417
418
fh = FeedHandler()
419
fh.add_feed(Coinbase(
420
symbols=['BTC-USD'],
421
channels=[TRADES],
422
callbacks={TRADES: CustomBackend()}
423
))
424
fh.run()
425
```