Library for accessing the X API (Twitter)
Tweepy provides a complete async/await interface for non-blocking access to all Twitter API functionality. The asynchronous interface mirrors the synchronous interface with identical method signatures, requiring only the addition of await keywords.
Note: Async functionality was added in Tweepy v4.10+ for Client, v4.11+ for Paginator, and v4.12+ for direct message support.
The AsyncClient provides async access to all Twitter API v2 endpoints with identical functionality to the synchronous Client.
class AsyncClient:
def __init__(self, bearer_token=None, consumer_key=None, consumer_secret=None,
access_token=None, access_token_secret=None, *,
return_type=Response, wait_on_rate_limit=False):
"""
Initialize asynchronous Twitter API v2 Client.
Parameters: Identical to synchronous Client
- bearer_token (str, optional): Bearer token for app-only authentication
- consumer_key (str, optional): Consumer key for user authentication
- consumer_secret (str, optional): Consumer secret for user authentication
- access_token (str, optional): Access token for user authentication
- access_token_secret (str, optional): Access token secret for user authentication
- return_type (type): Response container type (default: Response)
- wait_on_rate_limit (bool): Wait when rate limit hit (default: False)
"""All Client methods are available as async methods requiring await:
# Tweet management (async versions)
async def create_tweet(self, text=None, **kwargs): ...
async def delete_tweet(self, id, **kwargs): ...
async def get_tweet(self, id, **kwargs): ...
async def get_tweets(self, ids, **kwargs): ...
# Search methods (async versions)
async def search_recent_tweets(self, query, **kwargs): ...
async def search_all_tweets(self, query, **kwargs): ...
# User methods (async versions)
async def get_me(self, **kwargs): ...
async def get_user(self, **kwargs): ...
async def get_users(self, **kwargs): ...
async def follow_user(self, target_user_id, **kwargs): ...
async def unfollow_user(self, target_user_id, **kwargs): ...
# Note: follow() and unfollow() alias methods are NOT available in AsyncClient
# Timeline methods (async versions)
async def get_home_timeline(self, **kwargs): ...
async def get_users_tweets(self, id, **kwargs): ...
async def get_users_mentions(self, id, **kwargs): ...
# All other Client methods available as async versions...The AsyncStreamingClient provides async streaming with identical event handlers to the synchronous version.
class AsyncStreamingClient:
def __init__(self, bearer_token, *, chunk_size=512, daemon=False,
max_retries=float('inf'), **kwargs):
"""
Initialize asynchronous streaming client.
Parameters: Identical to synchronous StreamingClient
"""
def filter(self, *, threaded=False, **kwargs):
"""
Start async filtered stream (returns coroutine).
Parameters: Identical to synchronous filter()
Note: This method returns a coroutine that should be awaited
"""
def sample(self, *, threaded=False, **kwargs):
"""
Start async sample stream (returns coroutine).
Parameters: Identical to synchronous sample()
Note: This method returns a coroutine that should be awaited
"""
def disconnect(self):
"""
Disconnect from the streaming endpoint.
Returns:
None
"""
async def add_rules(self, add, **kwargs):
"""
Add streaming rules asynchronously.
Parameters:
- add (list): List of StreamRule objects or rule dictionaries
- **kwargs: Additional parameters
Returns:
Response with rule addition results
"""
async def delete_rules(self, ids, **kwargs):
"""
Delete streaming rules asynchronously.
Parameters:
- ids (list): List of rule IDs to delete
- **kwargs: Additional parameters
Returns:
Response with deletion results
"""
async def get_rules(self, **kwargs):
"""
Get current streaming rules asynchronously.
Parameters:
- **kwargs: Additional parameters
Returns:
Response with current rules
"""
# Event handlers remain the same (can be async or sync)
async def on_tweet(self, tweet): ...
async def on_connect(self): ...
async def on_disconnect(self): ...
# ... other event handlersThe AsyncPaginator provides async pagination for API v2 endpoints.
class AsyncPaginator:
def __init__(self, method, *args, **kwargs):
"""
Initialize async paginator.
Parameters: Identical to synchronous Paginator
"""
async def flatten(self, limit=None):
"""
Async generator yielding individual items.
Parameters:
- limit (int, optional): Maximum number of items
Yields:
Individual items from paginated responses
"""
async def get_next(self):
"""Get next page asynchronously."""
async def get_previous(self):
"""Get previous page asynchronously."""AsyncClient uses aiohttp sessions for HTTP connections, providing connection pooling and reuse.
class AsyncClient:
# Session attribute for HTTP connections
session: aiohttp.ClientSession
# Context manager support (Note: not implemented in current version)
# async def __aenter__(self): ...
# async def __aexit__(self, *args): ...Important Notes:
import asyncio
import tweepy
async def main():
# Initialize async client
client = tweepy.AsyncClient(bearer_token="your_bearer_token")
# All methods require await
tweet = await client.get_tweet("1234567890123456789")
print(f"Tweet: {tweet.data.text}")
# Search for tweets
search_results = await client.search_recent_tweets(
query="python programming",
max_results=10
)
for tweet in search_results.data:
print(f"- {tweet.text}")
# User operations
user = await client.get_user(username="python")
print(f"User: {user.data.name} (@{user.data.username})")
# Run async function
asyncio.run(main())import asyncio
import tweepy
async def get_user_info(client, username):
"""Get user info and recent tweets concurrently."""
# Start both requests concurrently
user_task = client.get_user(username=username, user_fields=["public_metrics"])
tweets_task = client.search_recent_tweets(f"from:{username}", max_results=5)
# Wait for both to complete
user_response, tweets_response = await asyncio.gather(user_task, tweets_task)
return {
'user': user_response.data,
'recent_tweets': tweets_response.data or []
}
async def analyze_multiple_users():
client = tweepy.AsyncClient(bearer_token="your_bearer_token")
usernames = ["python", "github", "stackoverflow", "nodejs", "docker"]
# Process all users concurrently
tasks = [get_user_info(client, username) for username in usernames]
results = await asyncio.gather(*tasks)
# Process results
for username, data in zip(usernames, results):
user = data['user']
tweets = data['recent_tweets']
print(f"\n@{username} ({user.name})")
print(f"Followers: {user.public_metrics['followers_count']:,}")
print(f"Recent tweets: {len(tweets)}")
for tweet in tweets[:3]: # Show first 3
print(f" - {tweet.text[:60]}...")
asyncio.run(analyze_multiple_users())import asyncio
import tweepy
class AsyncTweetProcessor(tweepy.AsyncStreamingClient):
def __init__(self, bearer_token):
super().__init__(bearer_token)
self.tweet_count = 0
async def on_connect(self):
print("Connected to async stream")
async def on_tweet(self, tweet):
self.tweet_count += 1
print(f"Async tweet #{self.tweet_count}: {tweet.text[:50]}...")
# Perform async processing
await self.process_tweet(tweet)
# Stop after 50 tweets
if self.tweet_count >= 50:
self.disconnect()
async def process_tweet(self, tweet):
# Simulate async processing (e.g., database write, API call)
await asyncio.sleep(0.1)
# Could make additional async API calls here
# author = await some_client.get_user(id=tweet.author_id)
async def run_async_stream():
stream = AsyncTweetProcessor(bearer_token="your_bearer_token")
# Add rules
rules = [tweepy.StreamRule("python OR javascript", tag="programming")]
await stream.add_rules(rules)
# Start streaming
await stream.filter()
asyncio.run(run_async_stream())import asyncio
import tweepy
async def analyze_user_followers():
client = tweepy.AsyncClient(bearer_token="your_bearer_token")
# Create async paginator
paginator = tweepy.AsyncPaginator(
client.get_users_followers,
id="783214", # Twitter's user ID
max_results=1000,
user_fields=["public_metrics", "verified", "created_at"]
)
follower_stats = {
'total': 0,
'verified': 0,
'high_followers': 0,
'recent_joiners': 0
}
# Process followers asynchronously
async for follower in paginator.flatten(limit=10000):
follower_stats['total'] += 1
if getattr(follower, 'verified', False):
follower_stats['verified'] += 1
if follower.public_metrics['followers_count'] > 10000:
follower_stats['high_followers'] += 1
# Check if joined in last year (simplified)
if '2023' in str(follower.created_at) or '2024' in str(follower.created_at):
follower_stats['recent_joiners'] += 1
print("Follower Analysis:")
print(f"Total analyzed: {follower_stats['total']:,}")
print(f"Verified: {follower_stats['verified']:,}")
print(f"High-influence (>10k followers): {follower_stats['high_followers']:,}")
print(f"Recent joiners (2023-2024): {follower_stats['recent_joiners']:,}")
asyncio.run(analyze_user_followers())import asyncio
import tweepy
async def robust_async_operations():
client = tweepy.AsyncClient(
consumer_key="your_consumer_key",
consumer_secret="your_consumer_secret",
access_token="your_access_token",
access_token_secret="your_access_token_secret",
wait_on_rate_limit=True
)
try:
# Attempt to create tweet
response = await client.create_tweet(text="Hello from async Tweepy!")
print(f"Tweet created: {response.data['id']}")
except tweepy.BadRequest as e:
print(f"Bad request: {e}")
except tweepy.Unauthorized as e:
print("Authentication failed")
except tweepy.TooManyRequests as e:
print("Rate limited - will wait and retry")
# wait_on_rate_limit=True handles this automatically
except tweepy.HTTPException as e:
print(f"HTTP error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(robust_async_operations())import asyncio
import tweepy
async def manual_session_cleanup():
# For long-running applications, you may want to manage sessions manually
client = tweepy.AsyncClient(bearer_token="your_bearer_token")
try:
# Perform operations
tweets = await client.search_recent_tweets("python", max_results=10)
for tweet in tweets.data:
print(tweet.text)
finally:
# Manually close session if needed (advanced usage)
if hasattr(client, 'session') and client.session:
await client.session.close()
asyncio.run(manual_session_cleanup())The async interface requires additional dependencies:
# Install with async support
pip install tweepy[async]
# Or install dependencies manually
pip install aiohttp async-lruasyncio.gather() for concurrent API callsasync for) for memory-efficient iterationwait_on_rate_limit=True for automatic rate limit handlingimport asyncio
import tweepy
async def controlled_concurrent_requests():
client = tweepy.AsyncClient(bearer_token="your_bearer_token")
# Limit concurrent requests
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
async def get_user_with_semaphore(username):
async with semaphore:
return await client.get_user(username=username)
usernames = ["python", "github", "stackoverflow", "nodejs", "docker", "reactjs"]
# Execute with controlled concurrency
results = await asyncio.gather(*[
get_user_with_semaphore(username) for username in usernames
])
for result in results:
user = result.data
print(f"@{user.username}: {user.public_metrics['followers_count']:,} followers")
asyncio.run(controlled_concurrent_requests())Install with Tessl CLI
npx tessl i tessl/pypi-tweepy