CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler for normalized market data across 30+ exchanges

Pending
Overview
Eval results
Files

backends.mddocs/

Storage Backends

Built-in backend implementations for storing and streaming cryptocurrency market data to various databases, message queues, and other systems with minimal setup and configuration.

Capabilities

Redis Backends

Redis-based storage and streaming backends supporting both data storage and real-time streaming.

# Redis Storage (Sorted Sets)
class TradeRedis(BackendCallback): ...
class TickerRedis(BackendCallback): ...
class FundingRedis(BackendCallback): ...
class BookRedis(BackendCallback): ...
class BookSnapshotRedisKey(BackendCallback): ...
class CandlesRedis(BackendCallback): ...
class OpenInterestRedis(BackendCallback): ...
class LiquidationsRedis(BackendCallback): ...
class OrderInfoRedis(BackendCallback): ...
class TransactionsRedis(BackendCallback): ...
class BalancesRedis(BackendCallback): ...
class FillsRedis(BackendCallback): ...

# Redis Streams
class TradeStream(BackendCallback): ...
class TickerStream(BackendCallback): ...
class FundingStream(BackendCallback): ...
class BookStream(BackendCallback): ...
class CandlesStream(BackendCallback): ...
class OpenInterestStream(BackendCallback): ...
class LiquidationsStream(BackendCallback): ...
class OrderInfoStream(BackendCallback): ...
class TransactionsStream(BackendCallback): ...
class BalancesStream(BackendCallback): ...
class FillsStream(BackendCallback): ...

Database Backends

Database storage backends for persistent data storage with support for time-series and relational databases.

# MongoDB
class TradeMongo(BackendCallback): ...
class TickerMongo(BackendCallback): ...
class FundingMongo(BackendCallback): ...
class BookMongo(BackendCallback): ...
class CandlesMongo(BackendCallback): ...
class OpenInterestMongo(BackendCallback): ...
class LiquidationsMongo(BackendCallback): ...

# PostgreSQL
class TradePostgres(BackendCallback): ...
class TickerPostgres(BackendCallback): ...
class FundingPostgres(BackendCallback): ...
class BookPostgres(BackendCallback): ...
class CandlesPostgres(BackendCallback): ...
class OpenInterestPostgres(BackendCallback): ...
class LiquidationsPostgres(BackendCallback): ...

# InfluxDB (Time Series)
class TradeInflux(BackendCallback): ...
class TickerInflux(BackendCallback): ...
class FundingInflux(BackendCallback): ...
class BookInflux(BackendCallback): ...
class CandlesInflux(BackendCallback): ...
class OpenInterestInflux(BackendCallback): ...
class LiquidationsInflux(BackendCallback): ...

Message Queue Backends

Message queue and streaming backends for real-time data distribution.

# Apache Kafka
class TradeKafka(BackendCallback): ...
class TickerKafka(BackendCallback): ...
class FundingKafka(BackendCallback): ...
class BookKafka(BackendCallback): ...
class CandlesKafka(BackendCallback): ...
class OpenInterestKafka(BackendCallback): ...
class LiquidationsKafka(BackendCallback): ...

# RabbitMQ
class TradeRabbitMQ(BackendCallback): ...
class TickerRabbitMQ(BackendCallback): ...
class FundingRabbitMQ(BackendCallback): ...
class BookRabbitMQ(BackendCallback): ...

# ZeroMQ
class TradeZMQ(BackendCallback): ...
class TickerZMQ(BackendCallback): ...
class FundingZMQ(BackendCallback): ...
class BookZMQ(BackendCallback): ...

Socket and Network Backends

Network-based backends for real-time data streaming over various protocols.

# TCP/UDP Sockets
class TradeSocket(BackendCallback): ...
class TickerSocket(BackendCallback): ...
class FundingSocket(BackendCallback): ...
class BookSocket(BackendCallback): ...

# HTTP Callbacks
class HTTPCallback(BackendCallback): ...

Cloud and Specialized Backends

Cloud service and specialized storage backends.

# Google Cloud Pub/Sub
class TradeGCPPubSub(BackendCallback): ...
class TickerGCPPubSub(BackendCallback): ...
class FundingGCPPubSub(BackendCallback): ...
class BookGCPPubSub(BackendCallback): ...

# Arctic (Time Series)
class TradeArctic(BackendCallback): ...
class TickerArctic(BackendCallback): ...
class FundingArctic(BackendCallback): ...
class BookArctic(BackendCallback): ...

# QuestDB
class TradeQuest(BackendCallback): ...
class TickerQuest(BackendCallback): ...
class FundingQuest(BackendCallback): ...
class BookQuest(BackendCallback): ...

# QuasarDB
class TradeQuasar(BackendCallback): ...
class TickerQuasar(BackendCallback): ...
class FundingQuasar(BackendCallback): ...
class BookQuasar(BackendCallback): ...

# Data Aggregation
class TradeAggregate(BackendCallback): ...
class TickerAggregate(BackendCallback): ...
class BookAggregate(BackendCallback): ...

Base Backend Classes

Base classes for creating custom backends.

class BackendCallback:
    """Base class for all backend callbacks."""
    def __init__(self, **kwargs): ...
    async def __call__(self, data, timestamp, receipt_timestamp): ...

class BackendBookCallback(BackendCallback):
    """Base class for order book backends."""
    def __init__(self, depth=None, **kwargs): ...

class BackendQueue:
    """Base queue implementation for backends."""
    def __init__(self, **kwargs): ...

Usage Examples

Redis Storage

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.backends.redis import TradeRedis, BookRedis
from cryptofeed.defines import TRADES, L2_BOOK

fh = FeedHandler()
fh.add_feed(Coinbase(
    symbols=['BTC-USD', 'ETH-USD'],
    channels=[TRADES, L2_BOOK],
    callbacks={
        TRADES: TradeRedis(host='localhost', port=6379, db=0),
        L2_BOOK: BookRedis(host='localhost', port=6379, db=0, depth=20)
    }
))
fh.run()

MongoDB Storage

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Binance
from cryptofeed.backends.mongo import TradeMongo, CandlesMongo
from cryptofeed.defines import TRADES, CANDLES

fh = FeedHandler()
fh.add_feed(Binance(
    symbols=['BTCUSDT', 'ETHUSDT'],
    channels=[TRADES, CANDLES],
    callbacks={
        TRADES: TradeMongo(
            host='localhost',
            db='cryptofeed',
            collection='trades'
        ),
        CANDLES: CandlesMongo(
            host='localhost',
            db='cryptofeed',
            collection='candles'
        )
    }
))
fh.run()

PostgreSQL Storage

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Kraken
from cryptofeed.backends.postgres import TradePostgres, TickerPostgres
from cryptofeed.defines import TRADES, TICKER

fh = FeedHandler()
fh.add_feed(Kraken(
    symbols=['XBT/USD', 'ETH/USD'],
    channels=[TRADES, TICKER],
    callbacks={
        TRADES: TradePostgres(
            host='localhost',
            user='postgres',
            pw='password',
            db='market_data',
            table='trades'
        ),
        TICKER: TickerPostgres(
            host='localhost',
            user='postgres',
            pw='password',
            db='market_data',
            table='tickers'
        )
    }
))
fh.run()

Kafka Streaming

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Binance
from cryptofeed.backends.kafka import TradeKafka, BookKafka
from cryptofeed.defines import TRADES, L2_BOOK

fh = FeedHandler()

# Stream trades to Kafka
fh.add_feed(Coinbase(
    symbols=['BTC-USD'],
    channels=[TRADES],
    callbacks={
        TRADES: TradeKafka(
            bootstrap_servers='localhost:9092',
            topic='crypto-trades'
        )
    }
))

# Stream order books to different topic
fh.add_feed(Binance(
    symbols=['BTCUSDT'],
    channels=[L2_BOOK],
    callbacks={
        L2_BOOK: BookKafka(
            bootstrap_servers='localhost:9092',
            topic='crypto-books'
        )
    }
))
fh.run()

InfluxDB Time Series

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Bitfinex
from cryptofeed.backends.influxdb import TradeInflux, CandlesInflux
from cryptofeed.defines import TRADES, CANDLES

fh = FeedHandler()
fh.add_feed(Bitfinex(
    symbols=['BTC/USD', 'ETH/USD'],
    channels=[TRADES, CANDLES],
    callbacks={
        TRADES: TradeInflux(
            host='localhost',
            token='your-token',
            org='your-org',
            bucket='crypto-trades'
        ),
        CANDLES: CandlesInflux(
            host='localhost',
            token='your-token',    
            org='your-org',
            bucket='crypto-candles'
        )
    }
))
fh.run()

Multiple Backends

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.backends.redis import TradeRedis
from cryptofeed.backends.mongo import TradeMongo
from cryptofeed.backends.kafka import TradeKafka
from cryptofeed.defines import TRADES

def custom_trade_handler(trade):
    """Custom processing logic"""
    print(f'Processing trade: {trade.symbol} {trade.amount}@{trade.price}')

fh = FeedHandler()
fh.add_feed(Coinbase(
    symbols=['BTC-USD'],
    channels=[TRADES],
    callbacks={
        TRADES: [
            TradeRedis(host='localhost'),      # Store in Redis
            TradeMongo(host='localhost'),      # Store in MongoDB
            TradeKafka(topic='trades'),        # Stream to Kafka
            custom_trade_handler               # Custom processing
        ]
    }
))
fh.run()

Socket Streaming

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Binance
from cryptofeed.backends.socket import TradeSocket
from cryptofeed.defines import TRADES

# Stream trades over TCP socket
fh = FeedHandler()
fh.add_feed(Binance(
    symbols=['BTCUSDT'],
    channels=[TRADES],
    callbacks={
        TRADES: TradeSocket(
            address='localhost',
            port=9999,
            protocol='tcp'
        )
    }
))
fh.run()

HTTP Callbacks

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Gemini
from cryptofeed.backends.http import HTTPCallback
from cryptofeed.defines import TRADES

fh = FeedHandler()
fh.add_feed(Gemini(
    symbols=['BTC-USD'],
    channels=[TRADES],
    callbacks={
        TRADES: HTTPCallback(
            url='https://api.example.com/webhook/trades',
            headers={'Authorization': 'Bearer your-token'}
        )
    }
))
fh.run()

Custom Backend

from cryptofeed.backends import BackendCallback

class CustomBackend(BackendCallback):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # Initialize custom storage/processing
    
    async def __call__(self, data, timestamp, receipt_timestamp):
        # Process the data
        print(f'Custom processing: {data}')
        
        # Store or forward data as needed
        await self.store_data(data)
    
    async def store_data(self, data):
        # Custom storage logic
        pass

# Use custom backend
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import TRADES

fh = FeedHandler()
fh.add_feed(Coinbase(
    symbols=['BTC-USD'],
    channels=[TRADES],
    callbacks={TRADES: CustomBackend()}
))
fh.run()

Install with Tessl CLI

npx tessl i tessl/pypi-cryptofeed

docs

backends.md

constants.md

exchanges.md

feed-management.md

index.md

types.md

tile.json