Library for accessing the X API (Twitter)
Tweepy's StreamingClient provides real-time access to Twitter data through the Twitter API v2 streaming endpoints. It supports both filtered streaming (based on rules) and sample streaming (1% random sample) with customizable event handlers and automatic reconnection.
Create a StreamingClient instance with authentication and configuration options.
class StreamingClient:
def __init__(self, bearer_token, *, chunk_size=512, daemon=False,
max_retries=float('inf'), proxy=None, return_type=Response,
verify=True, wait_on_rate_limit=False):
"""
Initialize Twitter API v2 streaming client.
Parameters:
- bearer_token (str): Bearer token for authentication
- chunk_size (int): Size of data chunks to read (default: 512)
- daemon (bool): Run as daemon thread (default: False)
- max_retries (int): Maximum reconnection attempts (default: infinite)
- proxy (str, optional): Proxy server URL
- return_type (type): Response container type (default: Response)
- verify (bool): Verify SSL certificates (default: True)
- wait_on_rate_limit (bool): Wait when rate limit hit (default: False)
"""Start and stop streaming connections with different stream types.
def filter(self, *, backfill_minutes=None, expansions=None, media_fields=None,
place_fields=None, poll_fields=None, space_fields=None,
threaded=False, tweet_fields=None, user_fields=None):
"""
Start filtered stream based on added rules.
Parameters:
- threaded (bool): Run stream in separate thread (default: False)
- backfill_minutes (int, optional): Backfill tweets from last N minutes (5 max)
- expansions (list, optional): Additional data to include
- tweet_fields (list, optional): Tweet fields to include
- user_fields (list, optional): User fields to include
- media_fields (list, optional): Media fields to include
- poll_fields (list, optional): Poll fields to include
- place_fields (list, optional): Place fields to include
- space_fields (list, optional): Space fields to include
Returns:
None (blocking call unless threaded=True)
"""
def sample(self, *, backfill_minutes=None, expansions=None, media_fields=None,
place_fields=None, poll_fields=None, space_fields=None,
threaded=False, tweet_fields=None, user_fields=None):
"""
Start sample stream (1% random sample of all tweets).
Parameters: Same as filter()
Returns:
None (blocking call unless threaded=True)
"""
def disconnect(self):
"""
Disconnect from the streaming endpoint.
Returns:
None
"""Manage filtering rules for the filtered stream.
def add_rules(self, add, *, dry_run=None):
"""
Add filtering rules for the stream.
Parameters:
- add (list): List of StreamRule objects or rule dictionaries
- dry_run (bool, optional): Test rules without adding them
Returns:
Response with rule addition results
"""
def delete_rules(self, ids, *, dry_run=None):
"""
Delete filtering rules by their IDs.
Parameters:
- ids (list): List of rule IDs to delete
- dry_run (bool, optional): Test deletion without removing rules
Returns:
Response with rule deletion results
"""
def get_rules(self, *, ids=None):
"""
Get current filtering rules.
Parameters:
- ids (list, optional): Specific rule IDs to retrieve
Returns:
Response with current rules
"""Override these methods to handle different streaming events.
def on_connect(self):
"""
Called when stream successfully connects.
Override this method to handle connection events.
"""
def on_disconnect(self):
"""
Called when stream is disconnected.
Override this method to handle disconnection events.
"""
def on_tweet(self, tweet):
"""
Called when a tweet is received.
Parameters:
- tweet (Tweet): Tweet object with all requested fields
Override this method to process incoming tweets.
"""
def on_includes(self, includes):
"""
Called when includes data is received.
Parameters:
- includes (dict): Additional data (users, media, polls, places, spaces)
Override this method to process included data.
"""
def on_errors(self, errors):
"""
Called when errors are received in the stream.
Parameters:
- errors (list): List of error objects
Override this method to handle stream errors.
"""
def on_matching_rules(self, matching_rules):
"""
Called when matching rules information is received.
Parameters:
- matching_rules (list): List of rules that matched the tweet
Override this method to process rule matching information.
"""
def on_response(self, response):
"""
Called for each response from the stream.
Parameters:
- response (Response): Complete response object
Override this method for low-level response handling.
"""
def on_data(self, raw_data):
"""
Called with raw JSON data from the stream.
Parameters:
- raw_data (str): Raw JSON string
Override this method for custom data parsing.
"""
def on_keep_alive(self):
"""
Called when a keep-alive signal is received.
Override this method to handle keep-alive events.
"""
def on_connection_error(self):
"""
Called when a connection error occurs.
Override this method to handle connection errors.
"""
def on_request_error(self, status_code):
"""
Called when an HTTP error response is received.
Parameters:
- status_code (int): HTTP status code
Override this method to handle HTTP errors.
"""
def on_exception(self, exception):
"""
Called when an exception occurs during streaming.
Parameters:
- exception (Exception): Exception object
Override this method to handle exceptions.
"""
def on_closed(self, response):
"""
Called when the stream is closed by Twitter.
Parameters:
- response (requests.Response): Final response from Twitter
Override this method to handle stream closure.
"""Rules define what tweets to receive in filtered streaming.
class StreamRule:
def __init__(self, value=None, tag=None, id=None):
"""
Create a streaming rule.
Parameters:
- value (str): Rule filter expression
- tag (str, optional): Rule identifier tag
- id (str, optional): Rule ID (set by Twitter)
"""
# Attributes
value: str # Rule filter expression
tag: str # Optional rule tag
id: str # Rule ID (assigned by Twitter)import tweepy
class MyStreamListener(tweepy.StreamingClient):
def on_tweet(self, tweet):
print(f"New tweet from @{tweet.author_id}: {tweet.text}")
def on_connect(self):
print("Connected to Twitter stream")
def on_disconnect(self):
print("Disconnected from Twitter stream")
# Initialize streaming client
stream = MyStreamListener(bearer_token="your_bearer_token")
# Add filtering rules
rules = [
tweepy.StreamRule("python programming -is:retweet lang:en", tag="python"),
tweepy.StreamRule("javascript OR nodejs", tag="js"),
]
stream.add_rules(rules)
# Start streaming (blocking call)
stream.filter()import tweepy
class SampleStreamListener(tweepy.StreamingClient):
def __init__(self, bearer_token):
super().__init__(bearer_token)
self.tweet_count = 0
def on_tweet(self, tweet):
self.tweet_count += 1
print(f"Tweet #{self.tweet_count}: {tweet.text[:50]}...")
# Stop after 100 tweets
if self.tweet_count >= 100:
self.disconnect()
# Start sample stream
stream = SampleStreamListener(bearer_token="your_bearer_token")
stream.sample()import tweepy
import json
class AdvancedStreamListener(tweepy.StreamingClient):
def on_tweet(self, tweet):
# Access full tweet data
print(f"Tweet ID: {tweet.id}")
print(f"Author: {tweet.author_id}")
print(f"Text: {tweet.text}")
print(f"Created: {tweet.created_at}")
# Access metrics if available
if hasattr(tweet, 'public_metrics'):
metrics = tweet.public_metrics
print(f"Likes: {metrics.get('like_count', 0)}")
print(f"Retweets: {metrics.get('retweet_count', 0)}")
def on_includes(self, includes):
# Process included data (users, media, etc.)
if 'users' in includes:
for user in includes['users']:
print(f"User: {user.username} ({user.name})")
def on_matching_rules(self, matching_rules):
# See which rules matched
for rule in matching_rules:
print(f"Matched rule: {rule['tag']} - {rule['value']}")
def on_errors(self, errors):
# Handle errors
for error in errors:
print(f"Stream error: {error}")
# Initialize with expanded data
stream = AdvancedStreamListener(bearer_token="your_bearer_token")
# Add rules
rules = [
tweepy.StreamRule("(AI OR \"artificial intelligence\") lang:en -is:retweet", tag="ai_tweets")
]
stream.add_rules(rules)
# Start with expanded fields
stream.filter(
expansions=["author_id", "attachments.media_keys"],
tweet_fields=["created_at", "public_metrics", "context_annotations"],
user_fields=["name", "username", "verified", "public_metrics"]
)import tweepy
import time
class ThreadedStreamListener(tweepy.StreamingClient):
def on_tweet(self, tweet):
print(f"Background tweet: {tweet.text[:30]}...")
# Start streaming in background thread
stream = ThreadedStreamListener(bearer_token="your_bearer_token")
# Add rules
rules = [tweepy.StreamRule("music", tag="music_tweets")]
stream.add_rules(rules)
# Start threaded stream
stream.filter(threaded=True)
# Continue with other work
for i in range(10):
print(f"Main thread working... {i}")
time.sleep(2)
# Stop streaming
stream.disconnect()import tweepy
# Initialize client
stream = tweepy.StreamingClient(bearer_token="your_bearer_token")
# Get current rules
response = stream.get_rules()
if response.data:
print("Current rules:")
for rule in response.data:
print(f"- {rule.id}: {rule.value} (tag: {rule.tag})")
# Add new rules
new_rules = [
tweepy.StreamRule("cats OR dogs", tag="pets"),
tweepy.StreamRule("football OR soccer", tag="sports"),
]
result = stream.add_rules(new_rules)
print(f"Added {len(result.data)} rules")
# Delete specific rules
if response.data:
rule_ids = [rule.id for rule in response.data]
stream.delete_rules(rule_ids)
print(f"Deleted {len(rule_ids)} rules")Twitter streaming rules support a rich query syntax:
cats dogs - AND (tweets containing both "cats" and "dogs")cats OR dogs - OR (tweets containing either "cats" or "dogs")-cats - NOT (tweets not containing "cats")(cats OR dogs) -puppies - Grouping with parenthesesfrom:username - Tweets from specific userto:username - Tweets replying to specific user@username - Tweets mentioning specific user#hashtag - Tweets with specific hashtaglang:en - Tweets in specific languageis:retweet - Only retweets-is:retweet - Exclude retweetsis:reply - Only replieshas:images - Tweets with imageshas:videos - Tweets with videoshas:links - Tweets with URLs"exact phrase"🔥 or :fire:"word1 word2"~10StreamingClient automatically handles reconnection with exponential backoff:
class RobustStreamListener(tweepy.StreamingClient):
def on_connection_error(self):
print("Connection error - will retry automatically")
def on_request_error(self, status_code):
print(f"HTTP error {status_code}")
if status_code == 420: # Rate limit
print("Rate limited - backing off")
def on_exception(self, exception):
print(f"Exception occurred: {exception}")
def on_closed(self, response):
print(f"Stream closed by Twitter: {response.status_code}")
# Configure retry behavior
stream = RobustStreamListener(
bearer_token="your_bearer_token",
max_retries=10, # Limit retry attempts
wait_on_rate_limit=True # Wait when rate limited
)chunk_size parameter for optimal performancethreaded=True for non-blocking streamingbackfill_minutes sparingly as it counts against rate limitsInstall with Tessl CLI
npx tessl i tessl/pypi-tweepy