or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backends.mdconstants.mdexchanges.mdfeed-management.mdindex.mdtypes.md

feed-management.mddocs/

0

# Core Feed Management

1

2

Core functionality for managing cryptocurrency data feeds from multiple exchanges, including feed lifecycle management, connection handling, and event processing through the central FeedHandler orchestrator.

3

4

## Capabilities

5

6

### FeedHandler Class

7

8

Central orchestrator class that manages multiple exchange feeds, handles event loops, and coordinates data processing across all connected exchanges.

9

10

```python { .api }

11

class FeedHandler:

12

def __init__(self, config=None, raw_data_collection=None):

13

"""

14

Initialize the feed handler.

15

16

Args:

17

config (Config, optional): Configuration object or file path

18

raw_data_collection (callable, optional): Callback for raw message collection

19

"""

20

```

21

22

### Feed Management

23

24

Add and manage individual exchange feeds with specific symbols, channels, and callbacks.

25

26

```python { .api }

27

def add_feed(self, feed, loop=None, **kwargs):

28

"""

29

Add an exchange feed to the handler.

30

31

Args:

32

feed (Feed): Exchange feed instance (e.g., Coinbase, Binance)

33

loop (asyncio.AbstractEventLoop, optional): Event loop to use

34

**kwargs: Additional arguments passed to feed

35

"""

36

```

37

38

### Feed Execution

39

40

Start and stop feed processing with event loop management and signal handling.

41

42

```python { .api }

43

def run(self, start_loop=True, install_signal_handlers=True, exception_handler=None):

44

"""

45

Start processing all configured feeds.

46

47

Args:

48

start_loop (bool): Whether to start the asyncio event loop

49

install_signal_handlers (bool): Whether to install signal handlers for graceful shutdown

50

exception_handler (callable, optional): Custom exception handler function

51

"""

52

53

def stop(self, loop=None):

54

"""

55

Stop all feeds and close connections.

56

57

Args:

58

loop (asyncio.AbstractEventLoop, optional): Event loop to use

59

"""

60

61

async def stop_async(self, loop=None):

62

"""

63

Asynchronously stop all feeds.

64

65

Args:

66

loop (asyncio.AbstractEventLoop, optional): Event loop to use

67

"""

68

69

def close(self, loop=None):

70

"""

71

Close the event loop and clean up resources.

72

73

Args:

74

loop (asyncio.AbstractEventLoop, optional): Event loop to close

75

"""

76

77

def _stop(self, loop=None):

78

"""

79

Internal method to stop all feeds and return shutdown tasks.

80

81

Args:

82

loop (asyncio.AbstractEventLoop, optional): Event loop to use

83

84

Returns:

85

List of shutdown tasks for feeds

86

"""

87

```

88

89

### NBBO Integration

90

91

Add National Best Bid/Offer calculation that aggregates the best prices across multiple exchanges.

92

93

```python { .api }

94

def add_nbbo(self, feeds, symbols, callback, config=None):

95

"""

96

Add NBBO calculation for specified feeds and symbols.

97

98

Args:

99

feeds (List[Feed]): List of exchange feed classes

100

symbols (List[str]): List of symbols to track

101

callback (callable): Function to call with NBBO updates

102

config (Config, optional): Configuration for NBBO calculation

103

"""

104

```

105

106

## Usage Examples

107

108

### Basic Feed Setup

109

110

```python

111

from cryptofeed import FeedHandler

112

from cryptofeed.exchanges import Coinbase, Binance

113

from cryptofeed.defines import TRADES, TICKER

114

115

def trade_callback(trade):

116

print(f'{trade.exchange}: {trade.symbol} - {trade.side} {trade.amount}@{trade.price}')

117

118

def ticker_callback(ticker):

119

print(f'{ticker.exchange}: {ticker.symbol} - bid:{ticker.bid} ask:{ticker.ask}')

120

121

fh = FeedHandler()

122

123

# Add multiple feeds

124

fh.add_feed(Coinbase(

125

symbols=['BTC-USD'],

126

channels=[TRADES],

127

callbacks={TRADES: trade_callback}

128

))

129

130

fh.add_feed(Binance(

131

symbols=['BTCUSDT'],

132

channels=[TICKER],

133

callbacks={TICKER: ticker_callback}

134

))

135

136

fh.run()

137

```

138

139

### Configuration-Based Setup

140

141

```python

142

from cryptofeed import FeedHandler

143

from cryptofeed.config import Config

144

145

# Load from YAML configuration file

146

config = Config('config.yaml')

147

fh = FeedHandler(config=config)

148

fh.run()

149

```

150

151

### NBBO Calculation

152

153

```python

154

from cryptofeed import FeedHandler

155

from cryptofeed.exchanges import Coinbase, Kraken, Gemini

156

157

def nbbo_callback(symbol, bid, bid_size, ask, ask_size, bid_feed, ask_feed):

158

print(f'{symbol}: best bid {bid}@{bid_size} ({bid_feed}) | best ask {ask}@{ask_size} ({ask_feed})')

159

160

fh = FeedHandler()

161

fh.add_nbbo([Coinbase, Kraken, Gemini], ['BTC-USD'], nbbo_callback)

162

fh.run()

163

```

164

165

### Raw Data Collection

166

167

```python

168

from cryptofeed import FeedHandler

169

from cryptofeed.exchanges import Coinbase

170

171

def raw_data_callback(msg, timestamp):

172

print(f'Raw message at {timestamp}: {msg}')

173

174

fh = FeedHandler(raw_data_collection=raw_data_callback)

175

fh.add_feed(Coinbase(symbols=['BTC-USD'], channels=[TRADES]))

176

fh.run()

177

```

178

179

### Exception Handling

180

181

```python

182

from cryptofeed import FeedHandler

183

import logging

184

185

def exception_handler(loop, context):

186

logging.error(f'Exception in feed: {context["exception"]}')

187

# Custom recovery logic here

188

189

fh = FeedHandler()

190

# ... add feeds ...

191

fh.run(exception_handler=exception_handler)

192

```