Cryptocurrency Exchange Websocket Data Feed Handler for normalized market data across 30+ exchanges
—
Core functionality for managing cryptocurrency data feeds from multiple exchanges, including feed lifecycle management, connection handling, and event processing through the central FeedHandler orchestrator.
Central orchestrator class that manages multiple exchange feeds, handles event loops, and coordinates data processing across all connected exchanges.
class FeedHandler:
def __init__(self, config=None, raw_data_collection=None):
"""
Initialize the feed handler.
Args:
config (Config, optional): Configuration object or file path
raw_data_collection (callable, optional): Callback for raw message collection
"""Add and manage individual exchange feeds with specific symbols, channels, and callbacks.
def add_feed(self, feed, loop=None, **kwargs):
"""
Add an exchange feed to the handler.
Args:
feed (Feed): Exchange feed instance (e.g., Coinbase, Binance)
loop (asyncio.AbstractEventLoop, optional): Event loop to use
**kwargs: Additional arguments passed to feed
"""Start and stop feed processing with event loop management and signal handling.
def run(self, start_loop=True, install_signal_handlers=True, exception_handler=None):
"""
Start processing all configured feeds.
Args:
start_loop (bool): Whether to start the asyncio event loop
install_signal_handlers (bool): Whether to install signal handlers for graceful shutdown
exception_handler (callable, optional): Custom exception handler function
"""
def stop(self, loop=None):
"""
Stop all feeds and close connections.
Args:
loop (asyncio.AbstractEventLoop, optional): Event loop to use
"""
async def stop_async(self, loop=None):
"""
Asynchronously stop all feeds.
Args:
loop (asyncio.AbstractEventLoop, optional): Event loop to use
"""
def close(self, loop=None):
"""
Close the event loop and clean up resources.
Args:
loop (asyncio.AbstractEventLoop, optional): Event loop to close
"""
def _stop(self, loop=None):
"""
Internal method to stop all feeds and return shutdown tasks.
Args:
loop (asyncio.AbstractEventLoop, optional): Event loop to use
Returns:
List of shutdown tasks for feeds
"""Add National Best Bid/Offer calculation that aggregates the best prices across multiple exchanges.
def add_nbbo(self, feeds, symbols, callback, config=None):
"""
Add NBBO calculation for specified feeds and symbols.
Args:
feeds (List[Feed]): List of exchange feed classes
symbols (List[str]): List of symbols to track
callback (callable): Function to call with NBBO updates
config (Config, optional): Configuration for NBBO calculation
"""from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Binance
from cryptofeed.defines import TRADES, TICKER
def trade_callback(trade):
print(f'{trade.exchange}: {trade.symbol} - {trade.side} {trade.amount}@{trade.price}')
def ticker_callback(ticker):
print(f'{ticker.exchange}: {ticker.symbol} - bid:{ticker.bid} ask:{ticker.ask}')
fh = FeedHandler()
# Add multiple feeds
fh.add_feed(Coinbase(
symbols=['BTC-USD'],
channels=[TRADES],
callbacks={TRADES: trade_callback}
))
fh.add_feed(Binance(
symbols=['BTCUSDT'],
channels=[TICKER],
callbacks={TICKER: ticker_callback}
))
fh.run()from cryptofeed import FeedHandler
from cryptofeed.config import Config
# Load from YAML configuration file
config = Config('config.yaml')
fh = FeedHandler(config=config)
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Kraken, Gemini
def nbbo_callback(symbol, bid, bid_size, ask, ask_size, bid_feed, ask_feed):
print(f'{symbol}: best bid {bid}@{bid_size} ({bid_feed}) | best ask {ask}@{ask_size} ({ask_feed})')
fh = FeedHandler()
fh.add_nbbo([Coinbase, Kraken, Gemini], ['BTC-USD'], nbbo_callback)
fh.run()from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
def raw_data_callback(msg, timestamp):
print(f'Raw message at {timestamp}: {msg}')
fh = FeedHandler(raw_data_collection=raw_data_callback)
fh.add_feed(Coinbase(symbols=['BTC-USD'], channels=[TRADES]))
fh.run()from cryptofeed import FeedHandler
import logging
def exception_handler(loop, context):
logging.error(f'Exception in feed: {context["exception"]}')
# Custom recovery logic here
fh = FeedHandler()
# ... add feeds ...
fh.run(exception_handler=exception_handler)Install with Tessl CLI
npx tessl i tessl/pypi-cryptofeed