CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler for normalized market data across 30+ exchanges

Pending
Overview
Eval results
Files

feed-management.mddocs/

Core Feed Management

Core functionality for managing cryptocurrency data feeds from multiple exchanges, including feed lifecycle management, connection handling, and event processing through the central FeedHandler orchestrator.

Capabilities

FeedHandler Class

Central orchestrator class that manages multiple exchange feeds, handles event loops, and coordinates data processing across all connected exchanges.

class FeedHandler:
    def __init__(self, config=None, raw_data_collection=None):
        """
        Initialize the feed handler.
        
        Args:
            config (Config, optional): Configuration object or file path
            raw_data_collection (callable, optional): Callback for raw message collection
        """

Feed Management

Add and manage individual exchange feeds with specific symbols, channels, and callbacks.

def add_feed(self, feed, loop=None, **kwargs):
    """
    Add an exchange feed to the handler.
    
    Args:
        feed (Feed): Exchange feed instance (e.g., Coinbase, Binance)
        loop (asyncio.AbstractEventLoop, optional): Event loop to use
        **kwargs: Additional arguments passed to feed
    """

Feed Execution

Start and stop feed processing with event loop management and signal handling.

def run(self, start_loop=True, install_signal_handlers=True, exception_handler=None):
    """
    Start processing all configured feeds.
    
    Args:
        start_loop (bool): Whether to start the asyncio event loop
        install_signal_handlers (bool): Whether to install signal handlers for graceful shutdown
        exception_handler (callable, optional): Custom exception handler function
    """

def stop(self, loop=None):
    """
    Stop all feeds and close connections.
    
    Args:
        loop (asyncio.AbstractEventLoop, optional): Event loop to use
    """

async def stop_async(self, loop=None):
    """
    Asynchronously stop all feeds.
    
    Args:
        loop (asyncio.AbstractEventLoop, optional): Event loop to use
    """

def close(self, loop=None):
    """
    Close the event loop and clean up resources.
    
    Args:
        loop (asyncio.AbstractEventLoop, optional): Event loop to close
    """

def _stop(self, loop=None):
    """
    Internal method to stop all feeds and return shutdown tasks.
    
    Args:
        loop (asyncio.AbstractEventLoop, optional): Event loop to use
        
    Returns:
        List of shutdown tasks for feeds
    """

NBBO Integration

Add National Best Bid/Offer calculation that aggregates the best prices across multiple exchanges.

def add_nbbo(self, feeds, symbols, callback, config=None):
    """
    Add NBBO calculation for specified feeds and symbols.
    
    Args:
        feeds (List[Feed]): List of exchange feed classes
        symbols (List[str]): List of symbols to track
        callback (callable): Function to call with NBBO updates
        config (Config, optional): Configuration for NBBO calculation
    """

Usage Examples

Basic Feed Setup

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Binance
from cryptofeed.defines import TRADES, TICKER

def trade_callback(trade):
    print(f'{trade.exchange}: {trade.symbol} - {trade.side} {trade.amount}@{trade.price}')

def ticker_callback(ticker):
    print(f'{ticker.exchange}: {ticker.symbol} - bid:{ticker.bid} ask:{ticker.ask}')

fh = FeedHandler()

# Add multiple feeds
fh.add_feed(Coinbase(
    symbols=['BTC-USD'], 
    channels=[TRADES], 
    callbacks={TRADES: trade_callback}
))

fh.add_feed(Binance(
    symbols=['BTCUSDT'], 
    channels=[TICKER], 
    callbacks={TICKER: ticker_callback}
))

fh.run()

Configuration-Based Setup

from cryptofeed import FeedHandler
from cryptofeed.config import Config

# Load from YAML configuration file
config = Config('config.yaml')
fh = FeedHandler(config=config)
fh.run()

NBBO Calculation

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Kraken, Gemini

def nbbo_callback(symbol, bid, bid_size, ask, ask_size, bid_feed, ask_feed):
    print(f'{symbol}: best bid {bid}@{bid_size} ({bid_feed}) | best ask {ask}@{ask_size} ({ask_feed})')

fh = FeedHandler()
fh.add_nbbo([Coinbase, Kraken, Gemini], ['BTC-USD'], nbbo_callback)
fh.run()

Raw Data Collection

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase

def raw_data_callback(msg, timestamp):
    print(f'Raw message at {timestamp}: {msg}')

fh = FeedHandler(raw_data_collection=raw_data_callback)
fh.add_feed(Coinbase(symbols=['BTC-USD'], channels=[TRADES]))
fh.run()

Exception Handling

from cryptofeed import FeedHandler
import logging

def exception_handler(loop, context):
    logging.error(f'Exception in feed: {context["exception"]}')
    # Custom recovery logic here

fh = FeedHandler()
# ... add feeds ...
fh.run(exception_handler=exception_handler)

Install with Tessl CLI

npx tessl i tessl/pypi-cryptofeed

docs

backends.md

constants.md

exchanges.md

feed-management.md

index.md

types.md

tile.json