Cryptocurrency Exchange Websocket Data Feed Handler for normalized market data across 30+ exchanges
—
Built-in backend implementations for storing and streaming cryptocurrency market data to various databases, message queues, and other systems with minimal setup and configuration.
Redis-based storage and streaming backends supporting both data storage and real-time streaming.
# Redis Storage (Sorted Sets)
class TradeRedis(BackendCallback): ...
class TickerRedis(BackendCallback): ...
class FundingRedis(BackendCallback): ...
class BookRedis(BackendCallback): ...
class BookSnapshotRedisKey(BackendCallback): ...
class CandlesRedis(BackendCallback): ...
class OpenInterestRedis(BackendCallback): ...
class LiquidationsRedis(BackendCallback): ...
class OrderInfoRedis(BackendCallback): ...
class TransactionsRedis(BackendCallback): ...
class BalancesRedis(BackendCallback): ...
class FillsRedis(BackendCallback): ...
# Redis Streams
class TradeStream(BackendCallback): ...
class TickerStream(BackendCallback): ...
class FundingStream(BackendCallback): ...
class BookStream(BackendCallback): ...
class CandlesStream(BackendCallback): ...
class OpenInterestStream(BackendCallback): ...
class LiquidationsStream(BackendCallback): ...
class OrderInfoStream(BackendCallback): ...
class TransactionsStream(BackendCallback): ...
class BalancesStream(BackendCallback): ...
class FillsStream(BackendCallback): ...Database storage backends for persistent data storage with support for time-series and relational databases.
# MongoDB
class TradeMongo(BackendCallback): ...
class TickerMongo(BackendCallback): ...
class FundingMongo(BackendCallback): ...
class BookMongo(BackendCallback): ...
class CandlesMongo(BackendCallback): ...
class OpenInterestMongo(BackendCallback): ...
class LiquidationsMongo(BackendCallback): ...
# PostgreSQL
class TradePostgres(BackendCallback): ...
class TickerPostgres(BackendCallback): ...
class FundingPostgres(BackendCallback): ...
class BookPostgres(BackendCallback): ...
class CandlesPostgres(BackendCallback): ...
class OpenInterestPostgres(BackendCallback): ...
class LiquidationsPostgres(BackendCallback): ...
# InfluxDB (Time Series)
class TradeInflux(BackendCallback): ...
class TickerInflux(BackendCallback): ...
class FundingInflux(BackendCallback): ...
class BookInflux(BackendCallback): ...
class CandlesInflux(BackendCallback): ...
class OpenInterestInflux(BackendCallback): ...
class LiquidationsInflux(BackendCallback): ...Message queue and streaming backends for real-time data distribution.
# Apache Kafka
class TradeKafka(BackendCallback): ...
class TickerKafka(BackendCallback): ...
class FundingKafka(BackendCallback): ...
class BookKafka(BackendCallback): ...
class CandlesKafka(BackendCallback): ...
class OpenInterestKafka(BackendCallback): ...
class LiquidationsKafka(BackendCallback): ...
# RabbitMQ
class TradeRabbitMQ(BackendCallback): ...
class TickerRabbitMQ(BackendCallback): ...
class FundingRabbitMQ(BackendCallback): ...
class BookRabbitMQ(BackendCallback): ...
# ZeroMQ
class TradeZMQ(BackendCallback): ...
class TickerZMQ(BackendCallback): ...
class FundingZMQ(BackendCallback): ...
class BookZMQ(BackendCallback): ...Network-based backends for real-time data streaming over various protocols.
# TCP/UDP Sockets
class TradeSocket(BackendCallback): ...
class TickerSocket(BackendCallback): ...
class FundingSocket(BackendCallback): ...
class BookSocket(BackendCallback): ...
# HTTP Callbacks
class HTTPCallback(BackendCallback): ...Cloud service and specialized storage backends.
# Google Cloud Pub/Sub
class TradeGCPPubSub(BackendCallback): ...
class TickerGCPPubSub(BackendCallback): ...
class FundingGCPPubSub(BackendCallback): ...
class BookGCPPubSub(BackendCallback): ...
# Arctic (Time Series)
class TradeArctic(BackendCallback): ...
class TickerArctic(BackendCallback): ...
class FundingArctic(BackendCallback): ...
class BookArctic(BackendCallback): ...
# QuestDB
class TradeQuest(BackendCallback): ...
class TickerQuest(BackendCallback): ...
class FundingQuest(BackendCallback): ...
class BookQuest(BackendCallback): ...
# QuasarDB
class TradeQuasar(BackendCallback): ...
class TickerQuasar(BackendCallback): ...
class FundingQuasar(BackendCallback): ...
class BookQuasar(BackendCallback): ...
# Data Aggregation
class TradeAggregate(BackendCallback): ...
class TickerAggregate(BackendCallback): ...
class BookAggregate(BackendCallback): ...Base classes for creating custom backends.
class BackendCallback:
"""Base class for all backend callbacks."""
def __init__(self, **kwargs): ...
async def __call__(self, data, timestamp, receipt_timestamp): ...
class BackendBookCallback(BackendCallback):
"""Base class for order book backends."""
def __init__(self, depth=None, **kwargs): ...
class BackendQueue:
"""Base queue implementation for backends."""
def __init__(self, **kwargs): ...from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.backends.redis import TradeRedis, BookRedis
from cryptofeed.defines import TRADES, L2_BOOK
fh = FeedHandler()
fh.add_feed(Coinbase(
symbols=['BTC-USD', 'ETH-USD'],
channels=[TRADES, L2_BOOK],
callbacks={
TRADES: TradeRedis(host='localhost', port=6379, db=0),
L2_BOOK: BookRedis(host='localhost', port=6379, db=0, depth=20)
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Binance
from cryptofeed.backends.mongo import TradeMongo, CandlesMongo
from cryptofeed.defines import TRADES, CANDLES
fh = FeedHandler()
fh.add_feed(Binance(
symbols=['BTCUSDT', 'ETHUSDT'],
channels=[TRADES, CANDLES],
callbacks={
TRADES: TradeMongo(
host='localhost',
db='cryptofeed',
collection='trades'
),
CANDLES: CandlesMongo(
host='localhost',
db='cryptofeed',
collection='candles'
)
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Kraken
from cryptofeed.backends.postgres import TradePostgres, TickerPostgres
from cryptofeed.defines import TRADES, TICKER
fh = FeedHandler()
fh.add_feed(Kraken(
symbols=['XBT/USD', 'ETH/USD'],
channels=[TRADES, TICKER],
callbacks={
TRADES: TradePostgres(
host='localhost',
user='postgres',
pw='password',
db='market_data',
table='trades'
),
TICKER: TickerPostgres(
host='localhost',
user='postgres',
pw='password',
db='market_data',
table='tickers'
)
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Binance
from cryptofeed.backends.kafka import TradeKafka, BookKafka
from cryptofeed.defines import TRADES, L2_BOOK
fh = FeedHandler()
# Stream trades to Kafka
fh.add_feed(Coinbase(
symbols=['BTC-USD'],
channels=[TRADES],
callbacks={
TRADES: TradeKafka(
bootstrap_servers='localhost:9092',
topic='crypto-trades'
)
}
))
# Stream order books to different topic
fh.add_feed(Binance(
symbols=['BTCUSDT'],
channels=[L2_BOOK],
callbacks={
L2_BOOK: BookKafka(
bootstrap_servers='localhost:9092',
topic='crypto-books'
)
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Bitfinex
from cryptofeed.backends.influxdb import TradeInflux, CandlesInflux
from cryptofeed.defines import TRADES, CANDLES
fh = FeedHandler()
fh.add_feed(Bitfinex(
symbols=['BTC/USD', 'ETH/USD'],
channels=[TRADES, CANDLES],
callbacks={
TRADES: TradeInflux(
host='localhost',
token='your-token',
org='your-org',
bucket='crypto-trades'
),
CANDLES: CandlesInflux(
host='localhost',
token='your-token',
org='your-org',
bucket='crypto-candles'
)
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.backends.redis import TradeRedis
from cryptofeed.backends.mongo import TradeMongo
from cryptofeed.backends.kafka import TradeKafka
from cryptofeed.defines import TRADES
def custom_trade_handler(trade):
"""Custom processing logic"""
print(f'Processing trade: {trade.symbol} {trade.amount}@{trade.price}')
fh = FeedHandler()
fh.add_feed(Coinbase(
symbols=['BTC-USD'],
channels=[TRADES],
callbacks={
TRADES: [
TradeRedis(host='localhost'), # Store in Redis
TradeMongo(host='localhost'), # Store in MongoDB
TradeKafka(topic='trades'), # Stream to Kafka
custom_trade_handler # Custom processing
]
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Binance
from cryptofeed.backends.socket import TradeSocket
from cryptofeed.defines import TRADES
# Stream trades over TCP socket
fh = FeedHandler()
fh.add_feed(Binance(
symbols=['BTCUSDT'],
channels=[TRADES],
callbacks={
TRADES: TradeSocket(
address='localhost',
port=9999,
protocol='tcp'
)
}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Gemini
from cryptofeed.backends.http import HTTPCallback
from cryptofeed.defines import TRADES
fh = FeedHandler()
fh.add_feed(Gemini(
symbols=['BTC-USD'],
channels=[TRADES],
callbacks={
TRADES: HTTPCallback(
url='https://api.example.com/webhook/trades',
headers={'Authorization': 'Bearer your-token'}
)
}
))
fh.run()from cryptofeed.backends import BackendCallback
class CustomBackend(BackendCallback):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Initialize custom storage/processing
async def __call__(self, data, timestamp, receipt_timestamp):
# Process the data
print(f'Custom processing: {data}')
# Store or forward data as needed
await self.store_data(data)
async def store_data(self, data):
# Custom storage logic
pass
# Use custom backend
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import TRADES
fh = FeedHandler()
fh.add_feed(Coinbase(
symbols=['BTC-USD'],
channels=[TRADES],
callbacks={TRADES: CustomBackend()}
))
fh.run()Install with Tessl CLI
npx tessl i tessl/pypi-cryptofeed