0
# Streaming
1
2
Tweepy's StreamingClient provides real-time access to Twitter data through the Twitter API v2 streaming endpoints. It supports both filtered streaming (based on rules) and sample streaming (1% random sample) with customizable event handlers and automatic reconnection.
3
4
## Capabilities
5
6
### StreamingClient Initialization
7
8
Create a StreamingClient instance with authentication and configuration options.
9
10
```python { .api }
11
class StreamingClient:
12
def __init__(self, bearer_token, *, chunk_size=512, daemon=False,
13
max_retries=float('inf'), proxy=None, return_type=Response,
14
verify=True, wait_on_rate_limit=False):
15
"""
16
Initialize Twitter API v2 streaming client.
17
18
Parameters:
19
- bearer_token (str): Bearer token for authentication
20
- chunk_size (int): Size of data chunks to read (default: 512)
21
- daemon (bool): Run as daemon thread (default: False)
22
- max_retries (int): Maximum reconnection attempts (default: infinite)
23
- proxy (str, optional): Proxy server URL
24
- return_type (type): Response container type (default: Response)
25
- verify (bool): Verify SSL certificates (default: True)
26
- wait_on_rate_limit (bool): Wait when rate limit hit (default: False)
27
"""
28
```
29
30
### Stream Control
31
32
Start and stop streaming connections with different stream types.
33
34
```python { .api }
35
def filter(self, *, backfill_minutes=None, expansions=None, media_fields=None,
36
place_fields=None, poll_fields=None, space_fields=None,
37
threaded=False, tweet_fields=None, user_fields=None):
38
"""
39
Start filtered stream based on added rules.
40
41
Parameters:
42
- threaded (bool): Run stream in separate thread (default: False)
43
- backfill_minutes (int, optional): Backfill tweets from last N minutes (5 max)
44
- expansions (list, optional): Additional data to include
45
- tweet_fields (list, optional): Tweet fields to include
46
- user_fields (list, optional): User fields to include
47
- media_fields (list, optional): Media fields to include
48
- poll_fields (list, optional): Poll fields to include
49
- place_fields (list, optional): Place fields to include
50
- space_fields (list, optional): Space fields to include
51
52
Returns:
53
None (blocking call unless threaded=True)
54
"""
55
56
def sample(self, *, backfill_minutes=None, expansions=None, media_fields=None,
57
place_fields=None, poll_fields=None, space_fields=None,
58
threaded=False, tweet_fields=None, user_fields=None):
59
"""
60
Start sample stream (1% random sample of all tweets).
61
62
Parameters: Same as filter()
63
64
Returns:
65
None (blocking call unless threaded=True)
66
"""
67
68
def disconnect(self):
69
"""
70
Disconnect from the streaming endpoint.
71
72
Returns:
73
None
74
"""
75
```
76
77
### Rule Management
78
79
Manage filtering rules for the filtered stream.
80
81
```python { .api }
82
def add_rules(self, add, *, dry_run=None):
83
"""
84
Add filtering rules for the stream.
85
86
Parameters:
87
- add (list): List of StreamRule objects or rule dictionaries
88
- dry_run (bool, optional): Test rules without adding them
89
90
Returns:
91
Response with rule addition results
92
"""
93
94
def delete_rules(self, ids, *, dry_run=None):
95
"""
96
Delete filtering rules by their IDs.
97
98
Parameters:
99
- ids (list): List of rule IDs to delete
100
- dry_run (bool, optional): Test deletion without removing rules
101
102
Returns:
103
Response with rule deletion results
104
"""
105
106
def get_rules(self, *, ids=None):
107
"""
108
Get current filtering rules.
109
110
Parameters:
111
- ids (list, optional): Specific rule IDs to retrieve
112
113
Returns:
114
Response with current rules
115
"""
116
```
117
118
### Event Handlers
119
120
Override these methods to handle different streaming events.
121
122
```python { .api }
123
def on_connect(self):
124
"""
125
Called when stream successfully connects.
126
Override this method to handle connection events.
127
"""
128
129
def on_disconnect(self):
130
"""
131
Called when stream is disconnected.
132
Override this method to handle disconnection events.
133
"""
134
135
def on_tweet(self, tweet):
136
"""
137
Called when a tweet is received.
138
139
Parameters:
140
- tweet (Tweet): Tweet object with all requested fields
141
142
Override this method to process incoming tweets.
143
"""
144
145
def on_includes(self, includes):
146
"""
147
Called when includes data is received.
148
149
Parameters:
150
- includes (dict): Additional data (users, media, polls, places, spaces)
151
152
Override this method to process included data.
153
"""
154
155
def on_errors(self, errors):
156
"""
157
Called when errors are received in the stream.
158
159
Parameters:
160
- errors (list): List of error objects
161
162
Override this method to handle stream errors.
163
"""
164
165
def on_matching_rules(self, matching_rules):
166
"""
167
Called when matching rules information is received.
168
169
Parameters:
170
- matching_rules (list): List of rules that matched the tweet
171
172
Override this method to process rule matching information.
173
"""
174
175
def on_response(self, response):
176
"""
177
Called for each response from the stream.
178
179
Parameters:
180
- response (Response): Complete response object
181
182
Override this method for low-level response handling.
183
"""
184
185
def on_data(self, raw_data):
186
"""
187
Called with raw JSON data from the stream.
188
189
Parameters:
190
- raw_data (str): Raw JSON string
191
192
Override this method for custom data parsing.
193
"""
194
195
def on_keep_alive(self):
196
"""
197
Called when a keep-alive signal is received.
198
Override this method to handle keep-alive events.
199
"""
200
201
def on_connection_error(self):
202
"""
203
Called when a connection error occurs.
204
Override this method to handle connection errors.
205
"""
206
207
def on_request_error(self, status_code):
208
"""
209
Called when an HTTP error response is received.
210
211
Parameters:
212
- status_code (int): HTTP status code
213
214
Override this method to handle HTTP errors.
215
"""
216
217
def on_exception(self, exception):
218
"""
219
Called when an exception occurs during streaming.
220
221
Parameters:
222
- exception (Exception): Exception object
223
224
Override this method to handle exceptions.
225
"""
226
227
def on_closed(self, response):
228
"""
229
Called when the stream is closed by Twitter.
230
231
Parameters:
232
- response (requests.Response): Final response from Twitter
233
234
Override this method to handle stream closure.
235
"""
236
```
237
238
## Stream Rule Object
239
240
Rules define what tweets to receive in filtered streaming.
241
242
```python { .api }
243
class StreamRule:
244
def __init__(self, value=None, tag=None, id=None):
245
"""
246
Create a streaming rule.
247
248
Parameters:
249
- value (str): Rule filter expression
250
- tag (str, optional): Rule identifier tag
251
- id (str, optional): Rule ID (set by Twitter)
252
"""
253
254
# Attributes
255
value: str # Rule filter expression
256
tag: str # Optional rule tag
257
id: str # Rule ID (assigned by Twitter)
258
```
259
260
## Usage Examples
261
262
### Basic Filtered Streaming
263
264
```python
265
import tweepy
266
267
class MyStreamListener(tweepy.StreamingClient):
268
def on_tweet(self, tweet):
269
print(f"New tweet from @{tweet.author_id}: {tweet.text}")
270
271
def on_connect(self):
272
print("Connected to Twitter stream")
273
274
def on_disconnect(self):
275
print("Disconnected from Twitter stream")
276
277
# Initialize streaming client
278
stream = MyStreamListener(bearer_token="your_bearer_token")
279
280
# Add filtering rules
281
rules = [
282
tweepy.StreamRule("python programming -is:retweet lang:en", tag="python"),
283
tweepy.StreamRule("javascript OR nodejs", tag="js"),
284
]
285
stream.add_rules(rules)
286
287
# Start streaming (blocking call)
288
stream.filter()
289
```
290
291
### Sample Streaming
292
293
```python
294
import tweepy
295
296
class SampleStreamListener(tweepy.StreamingClient):
297
def __init__(self, bearer_token):
298
super().__init__(bearer_token)
299
self.tweet_count = 0
300
301
def on_tweet(self, tweet):
302
self.tweet_count += 1
303
print(f"Tweet #{self.tweet_count}: {tweet.text[:50]}...")
304
305
# Stop after 100 tweets
306
if self.tweet_count >= 100:
307
self.disconnect()
308
309
# Start sample stream
310
stream = SampleStreamListener(bearer_token="your_bearer_token")
311
stream.sample()
312
```
313
314
### Advanced Streaming with Full Data
315
316
```python
317
import tweepy
318
import json
319
320
class AdvancedStreamListener(tweepy.StreamingClient):
321
def on_tweet(self, tweet):
322
# Access full tweet data
323
print(f"Tweet ID: {tweet.id}")
324
print(f"Author: {tweet.author_id}")
325
print(f"Text: {tweet.text}")
326
print(f"Created: {tweet.created_at}")
327
328
# Access metrics if available
329
if hasattr(tweet, 'public_metrics'):
330
metrics = tweet.public_metrics
331
print(f"Likes: {metrics.get('like_count', 0)}")
332
print(f"Retweets: {metrics.get('retweet_count', 0)}")
333
334
def on_includes(self, includes):
335
# Process included data (users, media, etc.)
336
if 'users' in includes:
337
for user in includes['users']:
338
print(f"User: {user.username} ({user.name})")
339
340
def on_matching_rules(self, matching_rules):
341
# See which rules matched
342
for rule in matching_rules:
343
print(f"Matched rule: {rule['tag']} - {rule['value']}")
344
345
def on_errors(self, errors):
346
# Handle errors
347
for error in errors:
348
print(f"Stream error: {error}")
349
350
# Initialize with expanded data
351
stream = AdvancedStreamListener(bearer_token="your_bearer_token")
352
353
# Add rules
354
rules = [
355
tweepy.StreamRule("(AI OR \"artificial intelligence\") lang:en -is:retweet", tag="ai_tweets")
356
]
357
stream.add_rules(rules)
358
359
# Start with expanded fields
360
stream.filter(
361
expansions=["author_id", "attachments.media_keys"],
362
tweet_fields=["created_at", "public_metrics", "context_annotations"],
363
user_fields=["name", "username", "verified", "public_metrics"]
364
)
365
```
366
367
### Threaded Streaming
368
369
```python
370
import tweepy
371
import time
372
373
class ThreadedStreamListener(tweepy.StreamingClient):
374
def on_tweet(self, tweet):
375
print(f"Background tweet: {tweet.text[:30]}...")
376
377
# Start streaming in background thread
378
stream = ThreadedStreamListener(bearer_token="your_bearer_token")
379
380
# Add rules
381
rules = [tweepy.StreamRule("music", tag="music_tweets")]
382
stream.add_rules(rules)
383
384
# Start threaded stream
385
stream.filter(threaded=True)
386
387
# Continue with other work
388
for i in range(10):
389
print(f"Main thread working... {i}")
390
time.sleep(2)
391
392
# Stop streaming
393
stream.disconnect()
394
```
395
396
### Rule Management
397
398
```python
399
import tweepy
400
401
# Initialize client
402
stream = tweepy.StreamingClient(bearer_token="your_bearer_token")
403
404
# Get current rules
405
response = stream.get_rules()
406
if response.data:
407
print("Current rules:")
408
for rule in response.data:
409
print(f"- {rule.id}: {rule.value} (tag: {rule.tag})")
410
411
# Add new rules
412
new_rules = [
413
tweepy.StreamRule("cats OR dogs", tag="pets"),
414
tweepy.StreamRule("football OR soccer", tag="sports"),
415
]
416
result = stream.add_rules(new_rules)
417
print(f"Added {len(result.data)} rules")
418
419
# Delete specific rules
420
if response.data:
421
rule_ids = [rule.id for rule in response.data]
422
stream.delete_rules(rule_ids)
423
print(f"Deleted {len(rule_ids)} rules")
424
```
425
426
## Rule Syntax
427
428
Twitter streaming rules support a rich query syntax:
429
430
### Basic Operators
431
- `cats dogs` - AND (tweets containing both "cats" and "dogs")
432
- `cats OR dogs` - OR (tweets containing either "cats" or "dogs")
433
- `-cats` - NOT (tweets not containing "cats")
434
- `(cats OR dogs) -puppies` - Grouping with parentheses
435
436
### Field-specific Operators
437
- `from:username` - Tweets from specific user
438
- `to:username` - Tweets replying to specific user
439
- `@username` - Tweets mentioning specific user
440
- `#hashtag` - Tweets with specific hashtag
441
- `lang:en` - Tweets in specific language
442
- `is:retweet` - Only retweets
443
- `-is:retweet` - Exclude retweets
444
- `is:reply` - Only replies
445
- `has:images` - Tweets with images
446
- `has:videos` - Tweets with videos
447
- `has:links` - Tweets with URLs
448
449
### Advanced Features
450
- Exact phrase matching: `"exact phrase"`
451
- Emoji support: `🔥` or `:fire:`
452
- Keyword proximity: `"word1 word2"~10`
453
- Regular expressions (limited support)
454
455
## Error Handling and Reconnection
456
457
StreamingClient automatically handles reconnection with exponential backoff:
458
459
```python
460
class RobustStreamListener(tweepy.StreamingClient):
461
def on_connection_error(self):
462
print("Connection error - will retry automatically")
463
464
def on_request_error(self, status_code):
465
print(f"HTTP error {status_code}")
466
if status_code == 420: # Rate limit
467
print("Rate limited - backing off")
468
469
def on_exception(self, exception):
470
print(f"Exception occurred: {exception}")
471
472
def on_closed(self, response):
473
print(f"Stream closed by Twitter: {response.status_code}")
474
475
# Configure retry behavior
476
stream = RobustStreamListener(
477
bearer_token="your_bearer_token",
478
max_retries=10, # Limit retry attempts
479
wait_on_rate_limit=True # Wait when rate limited
480
)
481
```
482
483
## Performance Considerations
484
485
- **Chunk Size**: Adjust `chunk_size` parameter for optimal performance
486
- **Threading**: Use `threaded=True` for non-blocking streaming
487
- **Field Selection**: Request only needed fields to reduce bandwidth
488
- **Rule Efficiency**: Write efficient rules to match relevant tweets
489
- **Backfill**: Use `backfill_minutes` sparingly as it counts against rate limits