0
# Core Feed Management
1
2
Core functionality for managing cryptocurrency data feeds from multiple exchanges, including feed lifecycle management, connection handling, and event processing through the central FeedHandler orchestrator.
3
4
## Capabilities
5
6
### FeedHandler Class
7
8
Central orchestrator class that manages multiple exchange feeds, handles event loops, and coordinates data processing across all connected exchanges.
9
10
```python { .api }
11
class FeedHandler:
12
def __init__(self, config=None, raw_data_collection=None):
13
"""
14
Initialize the feed handler.
15
16
Args:
17
config (Config, optional): Configuration object or file path
18
raw_data_collection (callable, optional): Callback for raw message collection
19
"""
20
```
21
22
### Feed Management
23
24
Add and manage individual exchange feeds with specific symbols, channels, and callbacks.
25
26
```python { .api }
27
def add_feed(self, feed, loop=None, **kwargs):
28
"""
29
Add an exchange feed to the handler.
30
31
Args:
32
feed (Feed): Exchange feed instance (e.g., Coinbase, Binance)
33
loop (asyncio.AbstractEventLoop, optional): Event loop to use
34
**kwargs: Additional arguments passed to feed
35
"""
36
```
37
38
### Feed Execution
39
40
Start and stop feed processing with event loop management and signal handling.
41
42
```python { .api }
43
def run(self, start_loop=True, install_signal_handlers=True, exception_handler=None):
44
"""
45
Start processing all configured feeds.
46
47
Args:
48
start_loop (bool): Whether to start the asyncio event loop
49
install_signal_handlers (bool): Whether to install signal handlers for graceful shutdown
50
exception_handler (callable, optional): Custom exception handler function
51
"""
52
53
def stop(self, loop=None):
54
"""
55
Stop all feeds and close connections.
56
57
Args:
58
loop (asyncio.AbstractEventLoop, optional): Event loop to use
59
"""
60
61
async def stop_async(self, loop=None):
62
"""
63
Asynchronously stop all feeds.
64
65
Args:
66
loop (asyncio.AbstractEventLoop, optional): Event loop to use
67
"""
68
69
def close(self, loop=None):
70
"""
71
Close the event loop and clean up resources.
72
73
Args:
74
loop (asyncio.AbstractEventLoop, optional): Event loop to close
75
"""
76
77
def _stop(self, loop=None):
78
"""
79
Internal method to stop all feeds and return shutdown tasks.
80
81
Args:
82
loop (asyncio.AbstractEventLoop, optional): Event loop to use
83
84
Returns:
85
List of shutdown tasks for feeds
86
"""
87
```
88
89
### NBBO Integration
90
91
Add National Best Bid/Offer calculation that aggregates the best prices across multiple exchanges.
92
93
```python { .api }
94
def add_nbbo(self, feeds, symbols, callback, config=None):
95
"""
96
Add NBBO calculation for specified feeds and symbols.
97
98
Args:
99
feeds (List[Feed]): List of exchange feed classes
100
symbols (List[str]): List of symbols to track
101
callback (callable): Function to call with NBBO updates
102
config (Config, optional): Configuration for NBBO calculation
103
"""
104
```
105
106
## Usage Examples
107
108
### Basic Feed Setup
109
110
```python
111
from cryptofeed import FeedHandler
112
from cryptofeed.exchanges import Coinbase, Binance
113
from cryptofeed.defines import TRADES, TICKER
114
115
def trade_callback(trade):
116
print(f'{trade.exchange}: {trade.symbol} - {trade.side} {trade.amount}@{trade.price}')
117
118
def ticker_callback(ticker):
119
print(f'{ticker.exchange}: {ticker.symbol} - bid:{ticker.bid} ask:{ticker.ask}')
120
121
fh = FeedHandler()
122
123
# Add multiple feeds
124
fh.add_feed(Coinbase(
125
symbols=['BTC-USD'],
126
channels=[TRADES],
127
callbacks={TRADES: trade_callback}
128
))
129
130
fh.add_feed(Binance(
131
symbols=['BTCUSDT'],
132
channels=[TICKER],
133
callbacks={TICKER: ticker_callback}
134
))
135
136
fh.run()
137
```
138
139
### Configuration-Based Setup
140
141
```python
142
from cryptofeed import FeedHandler
143
from cryptofeed.config import Config
144
145
# Load from YAML configuration file
146
config = Config('config.yaml')
147
fh = FeedHandler(config=config)
148
fh.run()
149
```
150
151
### NBBO Calculation
152
153
```python
154
from cryptofeed import FeedHandler
155
from cryptofeed.exchanges import Coinbase, Kraken, Gemini
156
157
def nbbo_callback(symbol, bid, bid_size, ask, ask_size, bid_feed, ask_feed):
158
print(f'{symbol}: best bid {bid}@{bid_size} ({bid_feed}) | best ask {ask}@{ask_size} ({ask_feed})')
159
160
fh = FeedHandler()
161
fh.add_nbbo([Coinbase, Kraken, Gemini], ['BTC-USD'], nbbo_callback)
162
fh.run()
163
```
164
165
### Raw Data Collection
166
167
```python
168
from cryptofeed import FeedHandler
169
from cryptofeed.exchanges import Coinbase
170
171
def raw_data_callback(msg, timestamp):
172
print(f'Raw message at {timestamp}: {msg}')
173
174
fh = FeedHandler(raw_data_collection=raw_data_callback)
175
fh.add_feed(Coinbase(symbols=['BTC-USD'], channels=[TRADES]))
176
fh.run()
177
```
178
179
### Exception Handling
180
181
```python
182
from cryptofeed import FeedHandler
183
import logging
184
185
def exception_handler(loop, context):
186
logging.error(f'Exception in feed: {context["exception"]}')
187
# Custom recovery logic here
188
189
fh = FeedHandler()
190
# ... add feeds ...
191
fh.run(exception_handler=exception_handler)
192
```