CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tweepy

Library for accessing the X API (Twitter)

Overview
Eval results
Files

streaming.mddocs/

Streaming

Tweepy's StreamingClient provides real-time access to Twitter data through the Twitter API v2 streaming endpoints. It supports both filtered streaming (based on rules) and sample streaming (1% random sample) with customizable event handlers and automatic reconnection.

Capabilities

StreamingClient Initialization

Create a StreamingClient instance with authentication and configuration options.

class StreamingClient:
    def __init__(self, bearer_token, *, chunk_size=512, daemon=False, 
                 max_retries=float('inf'), proxy=None, return_type=Response,
                 verify=True, wait_on_rate_limit=False):
        """
        Initialize Twitter API v2 streaming client.
        
        Parameters:
        - bearer_token (str): Bearer token for authentication
        - chunk_size (int): Size of data chunks to read (default: 512)
        - daemon (bool): Run as daemon thread (default: False)
        - max_retries (int): Maximum reconnection attempts (default: infinite)
        - proxy (str, optional): Proxy server URL
        - return_type (type): Response container type (default: Response)
        - verify (bool): Verify SSL certificates (default: True)
        - wait_on_rate_limit (bool): Wait when rate limit hit (default: False)
        """

Stream Control

Start and stop streaming connections with different stream types.

def filter(self, *, backfill_minutes=None, expansions=None, media_fields=None,
           place_fields=None, poll_fields=None, space_fields=None, 
           threaded=False, tweet_fields=None, user_fields=None):
    """
    Start filtered stream based on added rules.
    
    Parameters:
    - threaded (bool): Run stream in separate thread (default: False)
    - backfill_minutes (int, optional): Backfill tweets from last N minutes (5 max)
    - expansions (list, optional): Additional data to include
    - tweet_fields (list, optional): Tweet fields to include
    - user_fields (list, optional): User fields to include
    - media_fields (list, optional): Media fields to include
    - poll_fields (list, optional): Poll fields to include
    - place_fields (list, optional): Place fields to include
    - space_fields (list, optional): Space fields to include
    
    Returns:
    None (blocking call unless threaded=True)
    """

def sample(self, *, backfill_minutes=None, expansions=None, media_fields=None,
           place_fields=None, poll_fields=None, space_fields=None,
           threaded=False, tweet_fields=None, user_fields=None):
    """
    Start sample stream (1% random sample of all tweets).
    
    Parameters: Same as filter()
    
    Returns:
    None (blocking call unless threaded=True)
    """

def disconnect(self):
    """
    Disconnect from the streaming endpoint.
    
    Returns:
    None
    """

Rule Management

Manage filtering rules for the filtered stream.

def add_rules(self, add, *, dry_run=None):
    """
    Add filtering rules for the stream.
    
    Parameters:
    - add (list): List of StreamRule objects or rule dictionaries
    - dry_run (bool, optional): Test rules without adding them
    
    Returns:
    Response with rule addition results
    """

def delete_rules(self, ids, *, dry_run=None):
    """
    Delete filtering rules by their IDs.
    
    Parameters:
    - ids (list): List of rule IDs to delete
    - dry_run (bool, optional): Test deletion without removing rules
    
    Returns:
    Response with rule deletion results
    """

def get_rules(self, *, ids=None):
    """
    Get current filtering rules.
    
    Parameters:
    - ids (list, optional): Specific rule IDs to retrieve
    
    Returns:
    Response with current rules
    """

Event Handlers

Override these methods to handle different streaming events.

def on_connect(self):
    """
    Called when stream successfully connects.
    Override this method to handle connection events.
    """

def on_disconnect(self):
    """
    Called when stream is disconnected.
    Override this method to handle disconnection events.
    """

def on_tweet(self, tweet):
    """
    Called when a tweet is received.
    
    Parameters:
    - tweet (Tweet): Tweet object with all requested fields
    
    Override this method to process incoming tweets.
    """

def on_includes(self, includes):
    """
    Called when includes data is received.
    
    Parameters:
    - includes (dict): Additional data (users, media, polls, places, spaces)
    
    Override this method to process included data.
    """

def on_errors(self, errors):
    """
    Called when errors are received in the stream.
    
    Parameters:
    - errors (list): List of error objects
    
    Override this method to handle stream errors.
    """

def on_matching_rules(self, matching_rules):
    """
    Called when matching rules information is received.
    
    Parameters:
    - matching_rules (list): List of rules that matched the tweet
    
    Override this method to process rule matching information.
    """

def on_response(self, response):
    """
    Called for each response from the stream.
    
    Parameters:
    - response (Response): Complete response object
    
    Override this method for low-level response handling.
    """

def on_data(self, raw_data):
    """
    Called with raw JSON data from the stream.
    
    Parameters:
    - raw_data (str): Raw JSON string
    
    Override this method for custom data parsing.
    """

def on_keep_alive(self):
    """
    Called when a keep-alive signal is received.
    Override this method to handle keep-alive events.
    """

def on_connection_error(self):
    """
    Called when a connection error occurs.
    Override this method to handle connection errors.
    """

def on_request_error(self, status_code):
    """
    Called when an HTTP error response is received.
    
    Parameters:
    - status_code (int): HTTP status code
    
    Override this method to handle HTTP errors.
    """

def on_exception(self, exception):
    """
    Called when an exception occurs during streaming.
    
    Parameters:
    - exception (Exception): Exception object
    
    Override this method to handle exceptions.
    """

def on_closed(self, response):
    """
    Called when the stream is closed by Twitter.
    
    Parameters:
    - response (requests.Response): Final response from Twitter
    
    Override this method to handle stream closure.
    """

Stream Rule Object

Rules define what tweets to receive in filtered streaming.

class StreamRule:
    def __init__(self, value=None, tag=None, id=None):
        """
        Create a streaming rule.
        
        Parameters:
        - value (str): Rule filter expression
        - tag (str, optional): Rule identifier tag
        - id (str, optional): Rule ID (set by Twitter)
        """
        
    # Attributes
    value: str  # Rule filter expression
    tag: str    # Optional rule tag
    id: str     # Rule ID (assigned by Twitter)

Usage Examples

Basic Filtered Streaming

import tweepy

class MyStreamListener(tweepy.StreamingClient):
    def on_tweet(self, tweet):
        print(f"New tweet from @{tweet.author_id}: {tweet.text}")
        
    def on_connect(self):
        print("Connected to Twitter stream")
        
    def on_disconnect(self):
        print("Disconnected from Twitter stream")

# Initialize streaming client
stream = MyStreamListener(bearer_token="your_bearer_token")

# Add filtering rules
rules = [
    tweepy.StreamRule("python programming -is:retweet lang:en", tag="python"),
    tweepy.StreamRule("javascript OR nodejs", tag="js"),
]
stream.add_rules(rules)

# Start streaming (blocking call)
stream.filter()

Sample Streaming

import tweepy

class SampleStreamListener(tweepy.StreamingClient):
    def __init__(self, bearer_token):
        super().__init__(bearer_token)
        self.tweet_count = 0
        
    def on_tweet(self, tweet):
        self.tweet_count += 1
        print(f"Tweet #{self.tweet_count}: {tweet.text[:50]}...")
        
        # Stop after 100 tweets
        if self.tweet_count >= 100:
            self.disconnect()

# Start sample stream
stream = SampleStreamListener(bearer_token="your_bearer_token")
stream.sample()

Advanced Streaming with Full Data

import tweepy
import json

class AdvancedStreamListener(tweepy.StreamingClient):
    def on_tweet(self, tweet):
        # Access full tweet data
        print(f"Tweet ID: {tweet.id}")
        print(f"Author: {tweet.author_id}")
        print(f"Text: {tweet.text}")
        print(f"Created: {tweet.created_at}")
        
        # Access metrics if available
        if hasattr(tweet, 'public_metrics'):
            metrics = tweet.public_metrics
            print(f"Likes: {metrics.get('like_count', 0)}")
            print(f"Retweets: {metrics.get('retweet_count', 0)}")
            
    def on_includes(self, includes):
        # Process included data (users, media, etc.)
        if 'users' in includes:
            for user in includes['users']:
                print(f"User: {user.username} ({user.name})")
                
    def on_matching_rules(self, matching_rules):
        # See which rules matched
        for rule in matching_rules:
            print(f"Matched rule: {rule['tag']} - {rule['value']}")
            
    def on_errors(self, errors):
        # Handle errors
        for error in errors:
            print(f"Stream error: {error}")

# Initialize with expanded data
stream = AdvancedStreamListener(bearer_token="your_bearer_token")

# Add rules
rules = [
    tweepy.StreamRule("(AI OR \"artificial intelligence\") lang:en -is:retweet", tag="ai_tweets")
]
stream.add_rules(rules)

# Start with expanded fields
stream.filter(
    expansions=["author_id", "attachments.media_keys"],
    tweet_fields=["created_at", "public_metrics", "context_annotations"],
    user_fields=["name", "username", "verified", "public_metrics"]
)

Threaded Streaming

import tweepy
import time

class ThreadedStreamListener(tweepy.StreamingClient):
    def on_tweet(self, tweet):
        print(f"Background tweet: {tweet.text[:30]}...")

# Start streaming in background thread
stream = ThreadedStreamListener(bearer_token="your_bearer_token")

# Add rules
rules = [tweepy.StreamRule("music", tag="music_tweets")]
stream.add_rules(rules)

# Start threaded stream
stream.filter(threaded=True)

# Continue with other work
for i in range(10):
    print(f"Main thread working... {i}")
    time.sleep(2)

# Stop streaming
stream.disconnect()

Rule Management

import tweepy

# Initialize client
stream = tweepy.StreamingClient(bearer_token="your_bearer_token")

# Get current rules
response = stream.get_rules()
if response.data:
    print("Current rules:")
    for rule in response.data:
        print(f"- {rule.id}: {rule.value} (tag: {rule.tag})")

# Add new rules
new_rules = [
    tweepy.StreamRule("cats OR dogs", tag="pets"),
    tweepy.StreamRule("football OR soccer", tag="sports"),
]
result = stream.add_rules(new_rules)
print(f"Added {len(result.data)} rules")

# Delete specific rules
if response.data:
    rule_ids = [rule.id for rule in response.data]
    stream.delete_rules(rule_ids)
    print(f"Deleted {len(rule_ids)} rules")

Rule Syntax

Twitter streaming rules support a rich query syntax:

Basic Operators

  • cats dogs - AND (tweets containing both "cats" and "dogs")
  • cats OR dogs - OR (tweets containing either "cats" or "dogs")
  • -cats - NOT (tweets not containing "cats")
  • (cats OR dogs) -puppies - Grouping with parentheses

Field-specific Operators

  • from:username - Tweets from specific user
  • to:username - Tweets replying to specific user
  • @username - Tweets mentioning specific user
  • #hashtag - Tweets with specific hashtag
  • lang:en - Tweets in specific language
  • is:retweet - Only retweets
  • -is:retweet - Exclude retweets
  • is:reply - Only replies
  • has:images - Tweets with images
  • has:videos - Tweets with videos
  • has:links - Tweets with URLs

Advanced Features

  • Exact phrase matching: "exact phrase"
  • Emoji support: 🔥 or :fire:
  • Keyword proximity: "word1 word2"~10
  • Regular expressions (limited support)

Error Handling and Reconnection

StreamingClient automatically handles reconnection with exponential backoff:

class RobustStreamListener(tweepy.StreamingClient):
    def on_connection_error(self):
        print("Connection error - will retry automatically")
        
    def on_request_error(self, status_code):
        print(f"HTTP error {status_code}")
        if status_code == 420:  # Rate limit
            print("Rate limited - backing off")
            
    def on_exception(self, exception):
        print(f"Exception occurred: {exception}")
        
    def on_closed(self, response):
        print(f"Stream closed by Twitter: {response.status_code}")

# Configure retry behavior
stream = RobustStreamListener(
    bearer_token="your_bearer_token",
    max_retries=10,  # Limit retry attempts
    wait_on_rate_limit=True  # Wait when rate limited
)

Performance Considerations

  • Chunk Size: Adjust chunk_size parameter for optimal performance
  • Threading: Use threaded=True for non-blocking streaming
  • Field Selection: Request only needed fields to reduce bandwidth
  • Rule Efficiency: Write efficient rules to match relevant tweets
  • Backfill: Use backfill_minutes sparingly as it counts against rate limits

Install with Tessl CLI

npx tessl i tessl/pypi-tweepy

docs

async-interface.md

authentication.md

client-v2.md

data-models.md

index.md

legacy-api.md

streaming.md

utilities.md

tile.json