or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdcryptocurrency.mdindex.mdmarket-data.mdstreaming.mdtrading-operations.md

streaming.mddocs/

0

# Real-time Streaming

1

2

WebSocket streaming for real-time market data feeds and trading account updates. Supports subscription-based data streams with handler functions and decorator patterns for event processing.

3

4

## Capabilities

5

6

### Stream Initialization

7

8

Create and configure streaming connections with authentication and data feed selection.

9

10

```python { .api }

11

class Stream:

12

def __init__(

13

self,

14

key_id: str = None,

15

secret_key: str = None,

16

base_url: str = None,

17

data_stream_url: str = None,

18

data_feed: str = 'iex',

19

raw_data: bool = False,

20

crypto_exchanges: List[str] = None,

21

websocket_params: Dict = None

22

): ...

23

```

24

25

**Usage Example:**

26

27

```python

28

import alpaca_trade_api as tradeapi

29

30

# Initialize stream with credentials

31

stream = tradeapi.Stream('your-key-id', 'your-secret-key', data_feed='iex')

32

```

33

34

### Market Data Subscriptions

35

36

Subscribe to real-time market data feeds including trades, quotes, and bars.

37

38

```python { .api }

39

def subscribe_trades(handler: Callable, *symbols: str,

40

handler_cancel_errors: Callable = None,

41

handler_corrections: Callable = None) -> None:

42

"""Subscribe to trade data streams."""

43

44

def subscribe_quotes(handler: Callable, *symbols: str) -> None:

45

"""Subscribe to quote data streams."""

46

47

def subscribe_bars(handler: Callable, *symbols: str) -> None:

48

"""Subscribe to minute bar data streams."""

49

50

def subscribe_updated_bars(handler: Callable, *symbols: str) -> None:

51

"""Subscribe to updated bar data streams."""

52

53

def subscribe_daily_bars(handler: Callable, *symbols: str) -> None:

54

"""Subscribe to daily bar data streams."""

55

56

def subscribe_statuses(handler: Callable, *symbols: str) -> None:

57

"""Subscribe to trading status updates."""

58

59

def subscribe_lulds(handler: Callable, *symbols: str) -> None:

60

"""Subscribe to Limit Up Limit Down updates."""

61

```

62

63

**Usage Examples:**

64

65

```python

66

# Define handler functions

67

def handle_trade(trade):

68

print(f"Trade: {trade.symbol} {trade.size} @ ${trade.price}")

69

70

def handle_quote(quote):

71

spread = quote.ask_price - quote.bid_price

72

print(f"Quote: {quote.symbol} Bid=${quote.bid_price} Ask=${quote.ask_price} Spread=${spread:.2f}")

73

74

def handle_bar(bar):

75

print(f"Bar: {bar.symbol} O=${bar.open} H=${bar.high} L=${bar.low} C=${bar.close} V={bar.volume}")

76

77

# Subscribe to streams

78

stream.subscribe_trades(handle_trade, 'AAPL', 'TSLA')

79

stream.subscribe_quotes(handle_quote, 'AAPL', 'TSLA', 'GOOGL')

80

stream.subscribe_bars(handle_bar, 'SPY')

81

```

82

83

### Trading Account Updates

84

85

Subscribe to real-time updates for trading account events and order status changes.

86

87

```python { .api }

88

def subscribe_trade_updates(handler: Callable) -> None:

89

"""Subscribe to trading account updates."""

90

```

91

92

**Usage Example:**

93

94

```python

95

def handle_trade_update(update):

96

print(f"Trade Update: {update.event} - Order {update.order.id}")

97

if update.event == 'fill':

98

print(f" Filled: {update.order.symbol} {update.order.side} {update.order.filled_qty}")

99

elif update.event == 'partial_fill':

100

print(f" Partial Fill: {update.order.filled_qty}/{update.order.qty}")

101

elif update.event == 'canceled':

102

print(f" Canceled: {update.order.symbol}")

103

104

stream.subscribe_trade_updates(handle_trade_update)

105

```

106

107

### Decorator Pattern

108

109

Use decorators for clean event handler registration.

110

111

```python { .api }

112

def on_trade(*symbols: str) -> Callable:

113

"""Decorator for trade event handlers."""

114

115

def on_quote(*symbols: str) -> Callable:

116

"""Decorator for quote event handlers."""

117

118

def on_bar(*symbols: str) -> Callable:

119

"""Decorator for bar event handlers."""

120

121

def on_updated_bar(*symbols: str) -> Callable:

122

"""Decorator for updated bar event handlers."""

123

124

def on_daily_bar(*symbols: str) -> Callable:

125

"""Decorator for daily bar event handlers."""

126

127

def on_status(*symbols: str) -> Callable:

128

"""Decorator for status event handlers."""

129

130

def on_luld(*symbols: str) -> Callable:

131

"""Decorator for LULD event handlers."""

132

133

def on_trade_update() -> Callable:

134

"""Decorator for trade update handlers."""

135

136

def on_cancel_error(*symbols: str) -> Callable:

137

"""Decorator for cancel error handlers."""

138

139

def on_correction(*symbols: str) -> Callable:

140

"""Decorator for correction handlers."""

141

```

142

143

**Usage Examples:**

144

145

```python

146

# Using decorators for clean handler registration

147

@stream.on_trade('AAPL', 'TSLA')

148

def trade_handler(trade):

149

if trade.size >= 1000: # Large trades only

150

print(f"Large trade: {trade.symbol} {trade.size} shares @ ${trade.price}")

151

152

@stream.on_quote('AAPL', 'MSFT', 'GOOGL')

153

def quote_handler(quote):

154

spread_bps = ((quote.ask_price - quote.bid_price) / quote.bid_price) * 10000

155

if spread_bps > 10: # Wide spreads

156

print(f"Wide spread: {quote.symbol} {spread_bps:.1f} bps")

157

158

@stream.on_bar('SPY', 'QQQ')

159

def bar_handler(bar):

160

# Calculate momentum

161

body = abs(bar.close - bar.open)

162

range_size = bar.high - bar.low

163

momentum = body / range_size if range_size > 0 else 0

164

print(f"Bar momentum: {bar.symbol} {momentum:.2f}")

165

166

@stream.on_trade_update()

167

def trade_update_handler(update):

168

if update.event in ['fill', 'partial_fill']:

169

print(f"Execution: {update.order.symbol} {update.order.side} {update.order.filled_qty} @ ${update.price}")

170

```

171

172

### Stream Management

173

174

Control streaming connections and manage subscriptions.

175

176

```python { .api }

177

def run() -> None:

178

"""Start the streaming connection (blocking)."""

179

180

def stop() -> None:

181

"""Stop the streaming connection."""

182

183

async def stop_ws() -> None:

184

"""Stop WebSocket connections (async)."""

185

186

def is_open() -> bool:

187

"""Check if WebSocket connection is open."""

188

```

189

190

**Usage Examples:**

191

192

```python

193

# Start streaming (blocking)

194

try:

195

print("Starting stream...")

196

stream.run()

197

except KeyboardInterrupt:

198

print("Stream stopped by user")

199

200

# Non-blocking usage with threading

201

import threading

202

203

def start_stream():

204

stream.run()

205

206

stream_thread = threading.Thread(target=start_stream)

207

stream_thread.daemon = True

208

stream_thread.start()

209

210

# Your main application logic here

211

time.sleep(60)

212

stream.stop()

213

```

214

215

### Unsubscription

216

217

Remove subscriptions when no longer needed.

218

219

```python { .api }

220

def unsubscribe_trades(*symbols: str) -> None:

221

"""Unsubscribe from trade streams."""

222

223

def unsubscribe_quotes(*symbols: str) -> None:

224

"""Unsubscribe from quote streams."""

225

226

def unsubscribe_bars(*symbols: str) -> None:

227

"""Unsubscribe from bar streams."""

228

229

def unsubscribe_updated_bars(*symbols: str) -> None:

230

"""Unsubscribe from updated bar streams."""

231

232

def unsubscribe_daily_bars(*symbols: str) -> None:

233

"""Unsubscribe from daily bar streams."""

234

235

def unsubscribe_statuses(*symbols: str) -> None:

236

"""Unsubscribe from status streams."""

237

238

def unsubscribe_lulds(*symbols: str) -> None:

239

"""Unsubscribe from LULD streams."""

240

241

def unsubscribe_crypto_trades(*symbols: str) -> None:

242

"""Unsubscribe from crypto trade streams."""

243

244

def unsubscribe_crypto_quotes(*symbols: str) -> None:

245

"""Unsubscribe from crypto quote streams."""

246

247

def unsubscribe_crypto_bars(*symbols: str) -> None:

248

"""Unsubscribe from crypto bar streams."""

249

250

def unsubscribe_crypto_updated_bars(*symbols: str) -> None:

251

"""Unsubscribe from crypto updated bar streams."""

252

253

def unsubscribe_crypto_daily_bars(*symbols: str) -> None:

254

"""Unsubscribe from crypto daily bar streams."""

255

256

def unsubscribe_crypto_orderbooks(*symbols: str) -> None:

257

"""Unsubscribe from crypto orderbook streams."""

258

259

def unsubscribe_news(*symbols: str) -> None:

260

"""Unsubscribe from news streams."""

261

```

262

263

**Usage Example:**

264

265

```python

266

# Dynamic subscription management

267

def manage_subscriptions(portfolio_symbols):

268

# Unsubscribe from old symbols

269

stream.unsubscribe_trades('OLD1', 'OLD2')

270

stream.unsubscribe_quotes('OLD1', 'OLD2')

271

272

# Subscribe to new portfolio

273

stream.subscribe_trades(handle_trade, *portfolio_symbols)

274

stream.subscribe_quotes(handle_quote, *portfolio_symbols)

275

276

# Update subscriptions based on portfolio changes

277

new_portfolio = ['AAPL', 'TSLA', 'NVDA']

278

manage_subscriptions(new_portfolio)

279

```

280

281

### News Streaming

282

283

Subscribe to real-time financial news feeds.

284

285

```python { .api }

286

def subscribe_news(handler: Callable, *symbols: str) -> None:

287

"""Subscribe to news data streams."""

288

289

def on_news(*symbols: str) -> Callable:

290

"""Decorator for news event handlers."""

291

292

def unsubscribe_news(*symbols: str) -> None:

293

"""Unsubscribe from news streams."""

294

```

295

296

**Usage Examples:**

297

298

```python

299

@stream.on_news('AAPL', 'TSLA')

300

def news_handler(news):

301

print(f"News Alert: {news.headline}")

302

print(f"Symbols: {', '.join(news.symbols)}")

303

if any(word in news.headline.lower() for word in ['earnings', 'acquisition', 'merger']):

304

print("⚠️ Market-moving news detected!")

305

306

# Or with handler function

307

def handle_market_news(news):

308

# Process news for sentiment analysis or trading signals

309

sentiment_score = analyze_sentiment(news.summary) # Your function

310

for symbol in news.symbols:

311

update_symbol_sentiment(symbol, sentiment_score) # Your function

312

313

stream.subscribe_news(handle_market_news, 'AAPL', 'TSLA', 'GOOGL')

314

```

315

316

## Types

317

318

```python { .api }

319

class TradeUpdate:

320

@property

321

def event(self) -> str: ... # 'new', 'fill', 'partial_fill', 'canceled', 'expired', etc.

322

@property

323

def order(self) -> Order: ...

324

@property

325

def timestamp(self) -> pd.Timestamp: ...

326

@property

327

def position_qty(self) -> int: ...

328

@property

329

def price(self) -> float: ...

330

@property

331

def qty(self) -> int: ...

332

333

# Market data types are the same as in Market Data section:

334

# TradeV2, QuoteV2, BarV2, StatusV2, LULDV2, NewsV2, etc.

335

```