0
# Real-time Streaming
1
2
WebSocket streaming for real-time market data feeds and trading account updates. Supports subscription-based data streams with handler functions and decorator patterns for event processing.
3
4
## Capabilities
5
6
### Stream Initialization
7
8
Create and configure streaming connections with authentication and data feed selection.
9
10
```python { .api }
11
class Stream:
12
def __init__(
13
self,
14
key_id: str = None,
15
secret_key: str = None,
16
base_url: str = None,
17
data_stream_url: str = None,
18
data_feed: str = 'iex',
19
raw_data: bool = False,
20
crypto_exchanges: List[str] = None,
21
websocket_params: Dict = None
22
): ...
23
```
24
25
**Usage Example:**
26
27
```python
28
import alpaca_trade_api as tradeapi
29
30
# Initialize stream with credentials
31
stream = tradeapi.Stream('your-key-id', 'your-secret-key', data_feed='iex')
32
```
33
34
### Market Data Subscriptions
35
36
Subscribe to real-time market data feeds including trades, quotes, and bars.
37
38
```python { .api }
39
def subscribe_trades(handler: Callable, *symbols: str,
40
handler_cancel_errors: Callable = None,
41
handler_corrections: Callable = None) -> None:
42
"""Subscribe to trade data streams."""
43
44
def subscribe_quotes(handler: Callable, *symbols: str) -> None:
45
"""Subscribe to quote data streams."""
46
47
def subscribe_bars(handler: Callable, *symbols: str) -> None:
48
"""Subscribe to minute bar data streams."""
49
50
def subscribe_updated_bars(handler: Callable, *symbols: str) -> None:
51
"""Subscribe to updated bar data streams."""
52
53
def subscribe_daily_bars(handler: Callable, *symbols: str) -> None:
54
"""Subscribe to daily bar data streams."""
55
56
def subscribe_statuses(handler: Callable, *symbols: str) -> None:
57
"""Subscribe to trading status updates."""
58
59
def subscribe_lulds(handler: Callable, *symbols: str) -> None:
60
"""Subscribe to Limit Up Limit Down updates."""
61
```
62
63
**Usage Examples:**
64
65
```python
66
# Define handler functions
67
def handle_trade(trade):
68
print(f"Trade: {trade.symbol} {trade.size} @ ${trade.price}")
69
70
def handle_quote(quote):
71
spread = quote.ask_price - quote.bid_price
72
print(f"Quote: {quote.symbol} Bid=${quote.bid_price} Ask=${quote.ask_price} Spread=${spread:.2f}")
73
74
def handle_bar(bar):
75
print(f"Bar: {bar.symbol} O=${bar.open} H=${bar.high} L=${bar.low} C=${bar.close} V={bar.volume}")
76
77
# Subscribe to streams
78
stream.subscribe_trades(handle_trade, 'AAPL', 'TSLA')
79
stream.subscribe_quotes(handle_quote, 'AAPL', 'TSLA', 'GOOGL')
80
stream.subscribe_bars(handle_bar, 'SPY')
81
```
82
83
### Trading Account Updates
84
85
Subscribe to real-time updates for trading account events and order status changes.
86
87
```python { .api }
88
def subscribe_trade_updates(handler: Callable) -> None:
89
"""Subscribe to trading account updates."""
90
```
91
92
**Usage Example:**
93
94
```python
95
def handle_trade_update(update):
96
print(f"Trade Update: {update.event} - Order {update.order.id}")
97
if update.event == 'fill':
98
print(f" Filled: {update.order.symbol} {update.order.side} {update.order.filled_qty}")
99
elif update.event == 'partial_fill':
100
print(f" Partial Fill: {update.order.filled_qty}/{update.order.qty}")
101
elif update.event == 'canceled':
102
print(f" Canceled: {update.order.symbol}")
103
104
stream.subscribe_trade_updates(handle_trade_update)
105
```
106
107
### Decorator Pattern
108
109
Use decorators for clean event handler registration.
110
111
```python { .api }
112
def on_trade(*symbols: str) -> Callable:
113
"""Decorator for trade event handlers."""
114
115
def on_quote(*symbols: str) -> Callable:
116
"""Decorator for quote event handlers."""
117
118
def on_bar(*symbols: str) -> Callable:
119
"""Decorator for bar event handlers."""
120
121
def on_updated_bar(*symbols: str) -> Callable:
122
"""Decorator for updated bar event handlers."""
123
124
def on_daily_bar(*symbols: str) -> Callable:
125
"""Decorator for daily bar event handlers."""
126
127
def on_status(*symbols: str) -> Callable:
128
"""Decorator for status event handlers."""
129
130
def on_luld(*symbols: str) -> Callable:
131
"""Decorator for LULD event handlers."""
132
133
def on_trade_update() -> Callable:
134
"""Decorator for trade update handlers."""
135
136
def on_cancel_error(*symbols: str) -> Callable:
137
"""Decorator for cancel error handlers."""
138
139
def on_correction(*symbols: str) -> Callable:
140
"""Decorator for correction handlers."""
141
```
142
143
**Usage Examples:**
144
145
```python
146
# Using decorators for clean handler registration
147
@stream.on_trade('AAPL', 'TSLA')
148
def trade_handler(trade):
149
if trade.size >= 1000: # Large trades only
150
print(f"Large trade: {trade.symbol} {trade.size} shares @ ${trade.price}")
151
152
@stream.on_quote('AAPL', 'MSFT', 'GOOGL')
153
def quote_handler(quote):
154
spread_bps = ((quote.ask_price - quote.bid_price) / quote.bid_price) * 10000
155
if spread_bps > 10: # Wide spreads
156
print(f"Wide spread: {quote.symbol} {spread_bps:.1f} bps")
157
158
@stream.on_bar('SPY', 'QQQ')
159
def bar_handler(bar):
160
# Calculate momentum
161
body = abs(bar.close - bar.open)
162
range_size = bar.high - bar.low
163
momentum = body / range_size if range_size > 0 else 0
164
print(f"Bar momentum: {bar.symbol} {momentum:.2f}")
165
166
@stream.on_trade_update()
167
def trade_update_handler(update):
168
if update.event in ['fill', 'partial_fill']:
169
print(f"Execution: {update.order.symbol} {update.order.side} {update.order.filled_qty} @ ${update.price}")
170
```
171
172
### Stream Management
173
174
Control streaming connections and manage subscriptions.
175
176
```python { .api }
177
def run() -> None:
178
"""Start the streaming connection (blocking)."""
179
180
def stop() -> None:
181
"""Stop the streaming connection."""
182
183
async def stop_ws() -> None:
184
"""Stop WebSocket connections (async)."""
185
186
def is_open() -> bool:
187
"""Check if WebSocket connection is open."""
188
```
189
190
**Usage Examples:**
191
192
```python
193
# Start streaming (blocking)
194
try:
195
print("Starting stream...")
196
stream.run()
197
except KeyboardInterrupt:
198
print("Stream stopped by user")
199
200
# Non-blocking usage with threading
201
import threading
202
203
def start_stream():
204
stream.run()
205
206
stream_thread = threading.Thread(target=start_stream)
207
stream_thread.daemon = True
208
stream_thread.start()
209
210
# Your main application logic here
211
time.sleep(60)
212
stream.stop()
213
```
214
215
### Unsubscription
216
217
Remove subscriptions when no longer needed.
218
219
```python { .api }
220
def unsubscribe_trades(*symbols: str) -> None:
221
"""Unsubscribe from trade streams."""
222
223
def unsubscribe_quotes(*symbols: str) -> None:
224
"""Unsubscribe from quote streams."""
225
226
def unsubscribe_bars(*symbols: str) -> None:
227
"""Unsubscribe from bar streams."""
228
229
def unsubscribe_updated_bars(*symbols: str) -> None:
230
"""Unsubscribe from updated bar streams."""
231
232
def unsubscribe_daily_bars(*symbols: str) -> None:
233
"""Unsubscribe from daily bar streams."""
234
235
def unsubscribe_statuses(*symbols: str) -> None:
236
"""Unsubscribe from status streams."""
237
238
def unsubscribe_lulds(*symbols: str) -> None:
239
"""Unsubscribe from LULD streams."""
240
241
def unsubscribe_crypto_trades(*symbols: str) -> None:
242
"""Unsubscribe from crypto trade streams."""
243
244
def unsubscribe_crypto_quotes(*symbols: str) -> None:
245
"""Unsubscribe from crypto quote streams."""
246
247
def unsubscribe_crypto_bars(*symbols: str) -> None:
248
"""Unsubscribe from crypto bar streams."""
249
250
def unsubscribe_crypto_updated_bars(*symbols: str) -> None:
251
"""Unsubscribe from crypto updated bar streams."""
252
253
def unsubscribe_crypto_daily_bars(*symbols: str) -> None:
254
"""Unsubscribe from crypto daily bar streams."""
255
256
def unsubscribe_crypto_orderbooks(*symbols: str) -> None:
257
"""Unsubscribe from crypto orderbook streams."""
258
259
def unsubscribe_news(*symbols: str) -> None:
260
"""Unsubscribe from news streams."""
261
```
262
263
**Usage Example:**
264
265
```python
266
# Dynamic subscription management
267
def manage_subscriptions(portfolio_symbols):
268
# Unsubscribe from old symbols
269
stream.unsubscribe_trades('OLD1', 'OLD2')
270
stream.unsubscribe_quotes('OLD1', 'OLD2')
271
272
# Subscribe to new portfolio
273
stream.subscribe_trades(handle_trade, *portfolio_symbols)
274
stream.subscribe_quotes(handle_quote, *portfolio_symbols)
275
276
# Update subscriptions based on portfolio changes
277
new_portfolio = ['AAPL', 'TSLA', 'NVDA']
278
manage_subscriptions(new_portfolio)
279
```
280
281
### News Streaming
282
283
Subscribe to real-time financial news feeds.
284
285
```python { .api }
286
def subscribe_news(handler: Callable, *symbols: str) -> None:
287
"""Subscribe to news data streams."""
288
289
def on_news(*symbols: str) -> Callable:
290
"""Decorator for news event handlers."""
291
292
def unsubscribe_news(*symbols: str) -> None:
293
"""Unsubscribe from news streams."""
294
```
295
296
**Usage Examples:**
297
298
```python
299
@stream.on_news('AAPL', 'TSLA')
300
def news_handler(news):
301
print(f"News Alert: {news.headline}")
302
print(f"Symbols: {', '.join(news.symbols)}")
303
if any(word in news.headline.lower() for word in ['earnings', 'acquisition', 'merger']):
304
print("⚠️ Market-moving news detected!")
305
306
# Or with handler function
307
def handle_market_news(news):
308
# Process news for sentiment analysis or trading signals
309
sentiment_score = analyze_sentiment(news.summary) # Your function
310
for symbol in news.symbols:
311
update_symbol_sentiment(symbol, sentiment_score) # Your function
312
313
stream.subscribe_news(handle_market_news, 'AAPL', 'TSLA', 'GOOGL')
314
```
315
316
## Types
317
318
```python { .api }
319
class TradeUpdate:
320
@property
321
def event(self) -> str: ... # 'new', 'fill', 'partial_fill', 'canceled', 'expired', etc.
322
@property
323
def order(self) -> Order: ...
324
@property
325
def timestamp(self) -> pd.Timestamp: ...
326
@property
327
def position_qty(self) -> int: ...
328
@property
329
def price(self) -> float: ...
330
@property
331
def qty(self) -> int: ...
332
333
# Market data types are the same as in Market Data section:
334
# TradeV2, QuoteV2, BarV2, StatusV2, LULDV2, NewsV2, etc.
335
```