CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tweepy

Library for accessing the X API (Twitter)

Overview
Eval results
Files

async-interface.mddocs/

Asynchronous Interface

Tweepy provides a complete async/await interface for non-blocking access to all Twitter API functionality. The asynchronous interface mirrors the synchronous interface with identical method signatures, requiring only the addition of await keywords.

Note: Async functionality was added in Tweepy v4.10+ for Client, v4.11+ for Paginator, and v4.12+ for direct message support.

Capabilities

AsyncClient

The AsyncClient provides async access to all Twitter API v2 endpoints with identical functionality to the synchronous Client.

class AsyncClient:
    def __init__(self, bearer_token=None, consumer_key=None, consumer_secret=None,
                 access_token=None, access_token_secret=None, *, 
                 return_type=Response, wait_on_rate_limit=False):
        """
        Initialize asynchronous Twitter API v2 Client.
        
        Parameters: Identical to synchronous Client
        - bearer_token (str, optional): Bearer token for app-only authentication
        - consumer_key (str, optional): Consumer key for user authentication
        - consumer_secret (str, optional): Consumer secret for user authentication
        - access_token (str, optional): Access token for user authentication
        - access_token_secret (str, optional): Access token secret for user authentication
        - return_type (type): Response container type (default: Response)
        - wait_on_rate_limit (bool): Wait when rate limit hit (default: False)
        """

AsyncClient Methods

All Client methods are available as async methods requiring await:

# Tweet management (async versions)
async def create_tweet(self, text=None, **kwargs): ...
async def delete_tweet(self, id, **kwargs): ...
async def get_tweet(self, id, **kwargs): ...
async def get_tweets(self, ids, **kwargs): ...

# Search methods (async versions)  
async def search_recent_tweets(self, query, **kwargs): ...
async def search_all_tweets(self, query, **kwargs): ...

# User methods (async versions)
async def get_me(self, **kwargs): ...
async def get_user(self, **kwargs): ...
async def get_users(self, **kwargs): ...
async def follow_user(self, target_user_id, **kwargs): ...
async def unfollow_user(self, target_user_id, **kwargs): ...
# Note: follow() and unfollow() alias methods are NOT available in AsyncClient

# Timeline methods (async versions)
async def get_home_timeline(self, **kwargs): ...
async def get_users_tweets(self, id, **kwargs): ...
async def get_users_mentions(self, id, **kwargs): ...

# All other Client methods available as async versions...

AsyncStreamingClient

The AsyncStreamingClient provides async streaming with identical event handlers to the synchronous version.

class AsyncStreamingClient:
    def __init__(self, bearer_token, *, chunk_size=512, daemon=False,
                 max_retries=float('inf'), **kwargs):
        """
        Initialize asynchronous streaming client.
        
        Parameters: Identical to synchronous StreamingClient
        """

    def filter(self, *, threaded=False, **kwargs):
        """
        Start async filtered stream (returns coroutine).
        
        Parameters: Identical to synchronous filter()
        Note: This method returns a coroutine that should be awaited
        """

    def sample(self, *, threaded=False, **kwargs):
        """
        Start async sample stream (returns coroutine).
        
        Parameters: Identical to synchronous sample()
        Note: This method returns a coroutine that should be awaited
        """

    def disconnect(self):
        """
        Disconnect from the streaming endpoint.
        
        Returns:
        None
        """

    async def add_rules(self, add, **kwargs):
        """
        Add streaming rules asynchronously.
        
        Parameters:
        - add (list): List of StreamRule objects or rule dictionaries
        - **kwargs: Additional parameters
        
        Returns:
        Response with rule addition results
        """

    async def delete_rules(self, ids, **kwargs):
        """
        Delete streaming rules asynchronously.
        
        Parameters:
        - ids (list): List of rule IDs to delete
        - **kwargs: Additional parameters
        
        Returns:
        Response with deletion results
        """

    async def get_rules(self, **kwargs):
        """
        Get current streaming rules asynchronously.
        
        Parameters:
        - **kwargs: Additional parameters
        
        Returns:
        Response with current rules
        """

    # Event handlers remain the same (can be async or sync)
    async def on_tweet(self, tweet): ...
    async def on_connect(self): ...
    async def on_disconnect(self): ...
    # ... other event handlers

AsyncPaginator

The AsyncPaginator provides async pagination for API v2 endpoints.

class AsyncPaginator:
    def __init__(self, method, *args, **kwargs):
        """
        Initialize async paginator.
        
        Parameters: Identical to synchronous Paginator
        """

    async def flatten(self, limit=None):
        """
        Async generator yielding individual items.
        
        Parameters:
        - limit (int, optional): Maximum number of items
        
        Yields:
        Individual items from paginated responses
        """

    async def get_next(self):
        """Get next page asynchronously."""

    async def get_previous(self):
        """Get previous page asynchronously."""

Session Management

AsyncClient uses aiohttp sessions for HTTP connections, providing connection pooling and reuse.

class AsyncClient:
    # Session attribute for HTTP connections
    session: aiohttp.ClientSession
    
    # Context manager support (Note: not implemented in current version)
    # async def __aenter__(self): ...
    # async def __aexit__(self, *args): ...

Important Notes:

  • AsyncClient maintains an internal aiohttp session
  • Session is created automatically on first request
  • Sessions are not automatically closed (manual cleanup may be needed for long-running applications)
  • Context manager support is not currently implemented
  • For advanced use cases, consider session configuration through aiohttp parameters

Usage Examples

Basic AsyncClient Usage

import asyncio
import tweepy

async def main():
    # Initialize async client
    client = tweepy.AsyncClient(bearer_token="your_bearer_token")
    
    # All methods require await
    tweet = await client.get_tweet("1234567890123456789")
    print(f"Tweet: {tweet.data.text}")
    
    # Search for tweets
    search_results = await client.search_recent_tweets(
        query="python programming",
        max_results=10
    )
    
    for tweet in search_results.data:
        print(f"- {tweet.text}")
    
    # User operations
    user = await client.get_user(username="python")
    print(f"User: {user.data.name} (@{user.data.username})")

# Run async function
asyncio.run(main())

Concurrent API Calls

import asyncio
import tweepy

async def get_user_info(client, username):
    """Get user info and recent tweets concurrently."""
    # Start both requests concurrently
    user_task = client.get_user(username=username, user_fields=["public_metrics"])
    tweets_task = client.search_recent_tweets(f"from:{username}", max_results=5)
    
    # Wait for both to complete
    user_response, tweets_response = await asyncio.gather(user_task, tweets_task)
    
    return {
        'user': user_response.data,
        'recent_tweets': tweets_response.data or []
    }

async def analyze_multiple_users():
    client = tweepy.AsyncClient(bearer_token="your_bearer_token")
    
    usernames = ["python", "github", "stackoverflow", "nodejs", "docker"]
    
    # Process all users concurrently
    tasks = [get_user_info(client, username) for username in usernames]
    results = await asyncio.gather(*tasks)
    
    # Process results
    for username, data in zip(usernames, results):
        user = data['user']
        tweets = data['recent_tweets']
        
        print(f"\n@{username} ({user.name})")
        print(f"Followers: {user.public_metrics['followers_count']:,}")
        print(f"Recent tweets: {len(tweets)}")
        
        for tweet in tweets[:3]:  # Show first 3
            print(f"  - {tweet.text[:60]}...")

asyncio.run(analyze_multiple_users())

Async Streaming

import asyncio
import tweepy

class AsyncTweetProcessor(tweepy.AsyncStreamingClient):
    def __init__(self, bearer_token):
        super().__init__(bearer_token)
        self.tweet_count = 0
        
    async def on_connect(self):
        print("Connected to async stream")
        
    async def on_tweet(self, tweet):
        self.tweet_count += 1
        print(f"Async tweet #{self.tweet_count}: {tweet.text[:50]}...")
        
        # Perform async processing
        await self.process_tweet(tweet)
        
        # Stop after 50 tweets
        if self.tweet_count >= 50:
            self.disconnect()
            
    async def process_tweet(self, tweet):
        # Simulate async processing (e.g., database write, API call)
        await asyncio.sleep(0.1)
        
        # Could make additional async API calls here
        # author = await some_client.get_user(id=tweet.author_id)

async def run_async_stream():
    stream = AsyncTweetProcessor(bearer_token="your_bearer_token")
    
    # Add rules
    rules = [tweepy.StreamRule("python OR javascript", tag="programming")]
    await stream.add_rules(rules)
    
    # Start streaming
    await stream.filter()

asyncio.run(run_async_stream())

Async Pagination

import asyncio
import tweepy

async def analyze_user_followers():
    client = tweepy.AsyncClient(bearer_token="your_bearer_token")
    
    # Create async paginator
    paginator = tweepy.AsyncPaginator(
        client.get_users_followers,
        id="783214",  # Twitter's user ID
        max_results=1000,
        user_fields=["public_metrics", "verified", "created_at"]
    )
    
    follower_stats = {
        'total': 0,
        'verified': 0,
        'high_followers': 0,
        'recent_joiners': 0
    }
    
    # Process followers asynchronously
    async for follower in paginator.flatten(limit=10000):
        follower_stats['total'] += 1
        
        if getattr(follower, 'verified', False):
            follower_stats['verified'] += 1
            
        if follower.public_metrics['followers_count'] > 10000:
            follower_stats['high_followers'] += 1
            
        # Check if joined in last year (simplified)
        if '2023' in str(follower.created_at) or '2024' in str(follower.created_at):
            follower_stats['recent_joiners'] += 1
    
    print("Follower Analysis:")
    print(f"Total analyzed: {follower_stats['total']:,}")
    print(f"Verified: {follower_stats['verified']:,}")
    print(f"High-influence (>10k followers): {follower_stats['high_followers']:,}")
    print(f"Recent joiners (2023-2024): {follower_stats['recent_joiners']:,}")

asyncio.run(analyze_user_followers())

Error Handling in Async Code

import asyncio
import tweepy

async def robust_async_operations():
    client = tweepy.AsyncClient(
        consumer_key="your_consumer_key",
        consumer_secret="your_consumer_secret",
        access_token="your_access_token",
        access_token_secret="your_access_token_secret",
        wait_on_rate_limit=True
    )
    
    try:
        # Attempt to create tweet
        response = await client.create_tweet(text="Hello from async Tweepy!")
        print(f"Tweet created: {response.data['id']}")
        
    except tweepy.BadRequest as e:
        print(f"Bad request: {e}")
        
    except tweepy.Unauthorized as e:
        print("Authentication failed")
        
    except tweepy.TooManyRequests as e:
        print("Rate limited - will wait and retry")
        # wait_on_rate_limit=True handles this automatically
        
    except tweepy.HTTPException as e:
        print(f"HTTP error: {e}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")

asyncio.run(robust_async_operations())

Session Cleanup (Advanced)

import asyncio
import tweepy

async def manual_session_cleanup():
    # For long-running applications, you may want to manage sessions manually
    client = tweepy.AsyncClient(bearer_token="your_bearer_token")
    
    try:
        # Perform operations
        tweets = await client.search_recent_tweets("python", max_results=10)
        
        for tweet in tweets.data:
            print(tweet.text)
            
    finally:
        # Manually close session if needed (advanced usage)
        if hasattr(client, 'session') and client.session:
            await client.session.close()

asyncio.run(manual_session_cleanup())

Dependencies

The async interface requires additional dependencies:

# Install with async support
pip install tweepy[async]

# Or install dependencies manually
pip install aiohttp async-lru

Performance Considerations

Concurrent Operations

  • Use asyncio.gather() for concurrent API calls
  • Limit concurrent requests to respect rate limits
  • Consider using semaphores to control concurrency

Memory Usage

  • Async operations can use more memory due to coroutine overhead
  • Consider processing results in batches for large datasets
  • Use async generators (async for) for memory-efficient iteration

Rate Limiting

  • Set wait_on_rate_limit=True for automatic rate limit handling
  • Consider implementing custom backoff strategies for high-throughput applications
  • Monitor rate limit headers in responses
import asyncio
import tweepy

async def controlled_concurrent_requests():
    client = tweepy.AsyncClient(bearer_token="your_bearer_token")
    
    # Limit concurrent requests
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests
    
    async def get_user_with_semaphore(username):
        async with semaphore:
            return await client.get_user(username=username)
    
    usernames = ["python", "github", "stackoverflow", "nodejs", "docker", "reactjs"]
    
    # Execute with controlled concurrency
    results = await asyncio.gather(*[
        get_user_with_semaphore(username) for username in usernames
    ])
    
    for result in results:
        user = result.data
        print(f"@{user.username}: {user.public_metrics['followers_count']:,} followers")

asyncio.run(controlled_concurrent_requests())

Install with Tessl CLI

npx tessl i tessl/pypi-tweepy

docs

async-interface.md

authentication.md

client-v2.md

data-models.md

index.md

legacy-api.md

streaming.md

utilities.md

tile.json