Python Reddit API Wrapper - simple access to Reddit's API
—
PRAW provides comprehensive support for Reddit Live Threads, which enable real-time event coverage and collaborative updates. Live threads are perfect for breaking news, sports events, AMAs, and any situation requiring real-time community updates.
Create and discover live threads across Reddit.
class LiveHelper:
def __init__(self, reddit): ...
def create(
self,
title: str,
*,
description: str = None,
nsfw: bool = False,
resources: str = None,
**kwargs
):
"""
Create a new live thread.
Parameters:
- title: Live thread title
- description: Thread description (markdown supported)
- nsfw: Whether thread contains NSFW content
- resources: Resource links or information
Returns:
LiveThread instance
"""
def info(self, ids: list):
"""
Get information about live threads by ID.
Parameters:
- ids: List of live thread IDs
Returns:
Generator of LiveThread instances
"""
def now(self):
"""
Get currently happening live threads.
Returns:
Generator of active LiveThread instances
"""Manage individual live threads including updates, contributors, and settings.
class LiveThread:
def __init__(self, reddit, id: str = None): ...
def contributor(self):
"""
Get LiveThreadContribution instance for managing contributors.
Returns:
LiveThreadContribution instance
"""
def discussions(self, **kwargs):
"""
Get discussions (linked submissions) for this live thread.
Parameters:
- limit: Number of discussions (default: 25, max: 100)
- params: Additional query parameters
Returns:
ListingGenerator of Submission instances
"""
def updates(self, **kwargs):
"""
Get live updates from this thread.
Parameters:
- limit: Number of updates (default: 25, max: 100)
- params: Additional query parameters
Returns:
ListingGenerator of LiveUpdate instances
"""
def add_update(self, body: str):
"""
Add new update to live thread (requires contributor access).
Parameters:
- body: Update content (markdown supported)
Returns:
LiveUpdate instance
"""
def close_thread(self):
"""
Close the live thread (prevent new updates).
Only thread creators and certain contributors can close threads.
"""Access live thread metadata, status, and configuration.
# Basic properties
id: str # Live thread ID (base36)
title: str # Thread title
description: str # Thread description
description_html: str # Description as HTML
state: str # Thread state ("live", "complete", etc.)
viewer_count: int # Current number of viewers
viewer_count_fuzzed: bool # Whether viewer count is fuzzed
# Content and activity
total_views: int # Total view count
num_updates: int # Number of updates posted
updates: list # List of LiveUpdate instances
# Timestamps
created_utc: float # Creation timestamp (UTC)
updated_utc: float # Last update timestamp (UTC)
# Configuration
nsfw: bool # Whether NSFW content
websocket_url: str # WebSocket URL for real-time updates
announcement_url: str # Announcement post URL
resources: str # Resource information
resources_html: str # Resources as HTML
# Status flags
is_announcement: bool # Whether this is an announcement thread
icon: str # Thread icon URL
mobile_banner_image: str # Mobile banner image URL
banner_image: str # Banner image URLManage individual updates within live threads.
class LiveUpdate:
def __init__(self, live_thread, id: str = None): ...
def contrib(self):
"""
Get LiveUpdateContribution instance for this update.
Returns:
LiveUpdateContribution instance
"""
class LiveUpdateContribution:
"""Contributor actions for live updates."""
def remove(self):
"""Remove this live update (contributor only)."""Access individual update content and metadata.
# Update content
id: str # Update ID
body: str # Update content
body_html: str # Update content as HTML
mobile_embeds: list # Mobile embed data
embeds: list # Embed data
# Author and timing
author: Redditor # Update author
created_utc: float # Creation timestamp (UTC)
name: str # Update fullname
# Status
stricken: bool # Whether update is struck through (deprecated)Manage contributors and permissions for live threads.
class LiveThreadContribution:
"""Manage live thread contributors."""
def add(
self,
redditor,
*,
permissions: list = None
):
"""
Add contributor to live thread.
Parameters:
- redditor: Redditor instance or username
- permissions: List of permissions to grant
"""
def invite(
self,
redditor,
*,
permissions: list = None
):
"""
Invite user to contribute to live thread.
Parameters:
- redditor: Redditor instance or username
- permissions: List of permissions to grant
"""
def leave(self):
"""Leave as contributor from this live thread."""
def remove(self, redditor):
"""
Remove contributor from live thread.
Parameters:
- redditor: Redditor instance or username
"""
def remove_invite(self, redditor):
"""
Remove pending invitation.
Parameters:
- redditor: Redditor instance or username
"""
def update(
self,
redditor,
*,
permissions: list
):
"""
Update contributor permissions.
Parameters:
- redditor: Redditor instance or username
- permissions: New list of permissions
"""
def update_invite(
self,
redditor,
*,
permissions: list
):
"""
Update pending invitation permissions.
Parameters:
- redditor: Redditor instance or username
- permissions: New list of permissions
"""
def accept_invite(self):
"""Accept pending contributor invitation."""Real-time streaming of live thread updates.
class LiveThreadStream:
"""Real-time live thread update streaming."""
def updates(self, **kwargs):
"""
Stream new updates from live thread.
Parameters:
- pause_after: Pause after this many requests
- skip_existing: Skip updates created before stream start
Yields:
LiveUpdate instances as they are posted
"""
stream: LiveThreadStream # Streaming instanceManage contributor permissions for live threads.
# Available live thread permissions
LIVE_THREAD_PERMISSIONS = [
"update", # Post updates
"edit", # Edit thread settings
"manage" # Manage contributors
]
# Permission usage
def add_live_contributor():
live_thread.contributor.add(
"username",
permissions=["update"] # Can post updates only
)
live_thread.contributor.add(
"moderator",
permissions=["update", "edit", "manage"] # Full permissions
)import praw
reddit = praw.Reddit(...)
# Create a new live thread
live_thread = reddit.live.create(
title="Breaking News: Major Event Coverage",
description="Live coverage of the ongoing situation. Updates will be posted as information becomes available.",
nsfw=False,
resources="Official sources: [News Site](https://example.com)"
)
print(f"Created live thread: {live_thread.title}")
print(f"Thread ID: {live_thread.id}")
print(f"URL: https://www.reddit.com/live/{live_thread.id}")
# Add initial update
live_thread.add_update("**Live coverage starting now.** We'll be providing updates as the situation develops.")
# Update thread settings
live_thread.title = "Updated: Breaking News Coverage"# Add contributors with different permission levels
live_thread = reddit.live("abc123")
# Add update-only contributor
live_thread.contributor.add(
"reporter1",
permissions=["update"]
)
# Add full contributor
live_thread.contributor.add(
"moderator1",
permissions=["update", "edit", "manage"]
)
# Invite contributors
live_thread.contributor.invite(
"reporter2",
permissions=["update"]
)
print("Contributors invited successfully")
# Accept invitation (as invited user)
live_thread.contributor.accept_invite()
# Remove contributor
live_thread.contributor.remove("reporter1")# Post regular updates
updates = [
"**10:30 AM:** Initial reports coming in...",
"**10:35 AM:** Confirmed by official sources.",
"**10:40 AM:** Emergency services on scene.",
"**10:45 AM:** Traffic being redirected around area."
]
for update_text in updates:
update = live_thread.add_update(update_text)
print(f"Posted update: {update.id}")
time.sleep(300) # Wait 5 minutes between updates
# Post update with embedded media
media_update = live_thread.add_update(
"**Live footage available:** [Video Link](https://example.com/video)\n\n"
"Emergency crews working to secure the area. More updates coming soon."
)# Get currently active live threads
print("Active live threads:")
for thread in reddit.live.now():
print(f"Title: {thread.title}")
print(f"Viewers: {thread.viewer_count}")
print(f"Updates: {thread.num_updates}")
print(f"State: {thread.state}")
print("---")
# Get specific thread by ID
thread = reddit.live("thread_id")
print(f"Thread: {thread.title}")
print(f"Description: {thread.description}")
# Get recent updates
print("Recent updates:")
for update in thread.updates(limit=10):
print(f"[{update.created_utc}] {update.author}: {update.body}")# Stream updates from specific live thread
live_thread = reddit.live("abc123")
print(f"Streaming updates from: {live_thread.title}")
for update in live_thread.stream.updates():
print(f"\nNew update by {update.author}:")
print(f"Time: {datetime.fromtimestamp(update.created_utc)}")
print(f"Content: {update.body}")
# Process the update
if "breaking" in update.body.lower():
print("⚠️ BREAKING NEWS UPDATE!")
# You can add your own processing logic here
# e.g., send notifications, update databases, etc.
# Stream from multiple live threads
active_threads = list(reddit.live.now())
for thread in active_threads:
print(f"Monitoring: {thread.title}")
# Set up streaming for each active threaddef manage_live_event():
"""Comprehensive live event management."""
# Create event thread
event_thread = reddit.live.create(
title="Sports Championship Live Coverage",
description="Live updates from the championship game.",
resources="Official scorer: [Link](https://example.com)\nStats: [Link](https://stats.example.com)"
)
# Set up contributor team
contributors = {
"main_reporter": ["update", "edit"],
"backup_reporter": ["update"],
"moderator": ["update", "edit", "manage"]
}
for username, permissions in contributors.items():
try:
event_thread.contributor.add(username, permissions=permissions)
print(f"Added {username} with permissions: {permissions}")
except Exception as e:
print(f"Failed to add {username}: {e}")
# Post scheduled updates
game_events = [
("Game starting", "⚽ Kickoff! The championship game is now underway."),
("First goal", "🥅 GOAL! Amazing shot gives the home team a 1-0 lead!"),
("Halftime", "⏰ Halftime break. Score remains 1-0."),
("Second half", "▶️ Second half underway!"),
("Final whistle", "🏆 FULL TIME! What an incredible match!")
]
for event_name, update_text in game_events:
event_thread.add_update(f"**{event_name.upper()}:** {update_text}")
print(f"Posted: {event_name}")
# In real usage, you'd wait for actual events
# time.sleep(actual_time_between_events)
# Close thread when event ends
event_thread.close_thread()
print("Event coverage complete - thread closed")
return event_thread
# Run event management
championship_thread = manage_live_event()def analyze_live_thread(thread_id):
"""Analyze live thread performance and engagement."""
thread = reddit.live(thread_id)
print(f"Live Thread Analysis: {thread.title}")
print(f"Total Views: {thread.total_views:,}")
print(f"Current Viewers: {thread.viewer_count:,}")
print(f"Total Updates: {thread.num_updates}")
print(f"Thread State: {thread.state}")
# Analyze update frequency
updates = list(thread.updates(limit=100))
if updates:
update_times = [update.created_utc for update in updates]
time_diffs = [update_times[i] - update_times[i+1] for i in range(len(update_times)-1)]
if time_diffs:
avg_interval = sum(time_diffs) / len(time_diffs)
print(f"Average time between updates: {avg_interval/60:.1f} minutes")
# Contributor analysis
contributors = {}
for update in updates:
author = update.author.name if update.author else "[deleted]"
contributors[author] = contributors.get(author, 0) + 1
print("\nTop Contributors:")
for author, count in sorted(contributors.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {author}: {count} updates")
# Get linked discussions
discussions = list(thread.discussions(limit=10))
print(f"\nLinked Discussions: {len(discussions)}")
for discussion in discussions:
print(f" r/{discussion.subreddit.display_name}: {discussion.title}")
# Analyze specific thread
analyze_live_thread("example_thread_id")def robust_live_thread_management():
"""Live thread management with proper error handling."""
try:
# Create thread with validation
title = "Event Coverage"
if len(title) < 3:
raise ValueError("Title too short")
live_thread = reddit.live.create(
title=title,
description="Comprehensive event coverage with regular updates."
)
# Add updates with retry logic
def add_update_with_retry(content, max_retries=3):
for attempt in range(max_retries):
try:
return live_thread.add_update(content)
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
# Post updates safely
updates = [
"Event starting soon...",
"Updates will be posted regularly.",
"Thank you for following along!"
]
for i, update_content in enumerate(updates):
try:
update = add_update_with_retry(f"**Update {i+1}:** {update_content}")
print(f"Successfully posted update {i+1}")
except Exception as e:
print(f"Failed to post update {i+1}: {e}")
continue
return live_thread
except Exception as e:
print(f"Live thread management failed: {e}")
return None
# Use robust management
thread = robust_live_thread_management()
if thread:
print(f"Successfully created and managed thread: {thread.id}")class LiveThreadStream:
"""Real-time streaming for live threads."""
def updates(self, **kwargs):
"""Stream live updates as they are posted."""
class WebSocketException(Exception):
"""Raised when WebSocket operations fail."""Install with Tessl CLI
npx tessl i tessl/pypi-praw