WebSocket-based real-time monitoring of server events, activities, and notifications for building responsive Plex applications that react immediately to server changes.
Primary class for establishing WebSocket connections and receiving real-time server notifications.
class AlertListener:
def __init__(self, server, callback, ws_url=None, **kwargs):
"""
Create real-time alert listener for server events.
Args:
server (PlexServer): Connected Plex server
callback (function): Function to handle incoming alerts
ws_url (str, optional): Custom WebSocket URL
**kwargs: Additional WebSocket connection parameters
"""
def start(self):
"""Start listening for server alerts in background thread."""
def stop(self):
"""Stop listening and close WebSocket connection."""
def is_running(self):
"""
Check if alert listener is actively running.
Returns:
bool: True if listener is active
"""Server events that can be monitored through the AlertListener system.
# Common alert event types received via callback
class AlertEvent:
"""Alert event data structure."""
@property
def type(self):
"""str: Event type identifier."""
@property
def data(self):
"""dict: Event-specific data payload."""
@property
def timestamp(self):
"""datetime: When the event occurred."""
# Event type constants
ALERT_TYPES = {
'library.refresh.started': 'Library refresh started',
'library.refresh.progress': 'Library refresh progress update',
'library.refresh.finished': 'Library refresh completed',
'media.scrobble': 'Media playback scrobble event',
'media.play': 'Media playback started',
'media.pause': 'Media playback paused',
'media.resume': 'Media playback resumed',
'media.stop': 'Media playback stopped',
'admin.database.backup': 'Database backup event',
'admin.database.corrupted': 'Database corruption detected',
'transcoder.progress': 'Transcoding progress update',
'activity': 'Background activity update'
}Structure for handling incoming alert events.
def alert_callback(alert_data):
"""
Callback function for processing server alerts.
Args:
alert_data (dict): Raw alert data from server containing:
- type (str): Alert event type
- NotificationContainer (dict): Event details
- size (int): Number of notifications
- data (list): List of notification objects
"""
# Example callback implementation structure
def process_alert(alert_data):
alert_type = alert_data.get('type')
container = alert_data.get('NotificationContainer', {})
if alert_type == 'playing':
# Handle playback events
sessions = container.get('PlaySessionStateNotification', [])
for session in sessions:
handle_playback_event(session)
elif alert_type == 'timeline':
# Handle timeline updates
entries = container.get('TimelineEntry', [])
for entry in entries:
handle_timeline_update(entry)
elif alert_type == 'activity':
# Handle background activities
activities = container.get('ActivityNotification', [])
for activity in activities:
handle_activity_update(activity)from plexapi.server import PlexServer
from plexapi.alert import AlertListener
plex = PlexServer('http://localhost:32400', token='your-token')
def my_callback(alert_data):
"""Handle server alerts."""
alert_type = alert_data.get('type', 'unknown')
print(f"Alert received: {alert_type}")
if alert_type == 'playing':
# Handle playback state changes
container = alert_data.get('NotificationContainer', {})
sessions = container.get('PlaySessionStateNotification', [])
for session in sessions:
state = session.get('state', '')
session_key = session.get('sessionKey', '')
print(f"Session {session_key}: {state}")
# Create and start listener
listener = AlertListener(plex, my_callback)
listener.start()
# Keep program running to receive alerts
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
listener.stop()def playback_monitor(alert_data):
"""Monitor media playback events."""
if alert_data.get('type') == 'playing':
container = alert_data.get('NotificationContainer', {})
sessions = container.get('PlaySessionStateNotification', [])
for session in sessions:
user = session.get('username', 'Unknown')
state = session.get('state', '')
title = session.get('title', 'Unknown')
if state == 'playing':
print(f"{user} started playing: {title}")
elif state == 'paused':
print(f"{user} paused: {title}")
elif state == 'stopped':
print(f"{user} stopped: {title}")
listener = AlertListener(plex, playback_monitor)
listener.start()def library_monitor(alert_data):
"""Monitor library scanning and updates."""
alert_type = alert_data.get('type', '')
if alert_type == 'activity':
container = alert_data.get('NotificationContainer', {})
activities = container.get('ActivityNotification', [])
for activity in activities:
activity_type = activity.get('type', '')
title = activity.get('title', '')
progress = activity.get('progress', 0)
if activity_type == 'library.refresh':
print(f"Library scan: {title} - {progress}% complete")
elif activity_type == 'transcoder':
print(f"Transcoding: {title} - {progress}% complete")
listener = AlertListener(plex, library_monitor)
listener.start()def timeline_monitor(alert_data):
"""Monitor media timeline updates."""
if alert_data.get('type') == 'timeline':
container = alert_data.get('NotificationContainer', {})
entries = container.get('TimelineEntry', [])
for entry in entries:
item_type = entry.get('type', '')
state = entry.get('state', '')
title = entry.get('title', 'Unknown')
if state == 'created':
print(f"New {item_type} added: {title}")
elif state == 'updated':
print(f"{item_type} updated: {title}")
elif state == 'deleted':
print(f"{item_type} deleted: {title}")
listener = AlertListener(plex, timeline_monitor)
listener.start()class PlexEventHandler:
"""Comprehensive event handler for multiple alert types."""
def __init__(self, plex_server):
self.plex = plex_server
self.listener = AlertListener(plex_server, self.handle_alert)
def handle_alert(self, alert_data):
"""Route alerts to specific handlers."""
alert_type = alert_data.get('type', '')
if alert_type == 'playing':
self.handle_playback(alert_data)
elif alert_type == 'timeline':
self.handle_timeline(alert_data)
elif alert_type == 'activity':
self.handle_activity(alert_data)
else:
print(f"Unhandled alert type: {alert_type}")
def handle_playback(self, alert_data):
"""Handle playback state changes."""
container = alert_data.get('NotificationContainer', {})
sessions = container.get('PlaySessionStateNotification', [])
for session in sessions:
self.log_playback_event(session)
def handle_timeline(self, alert_data):
"""Handle library timeline updates."""
container = alert_data.get('NotificationContainer', {})
entries = container.get('TimelineEntry', [])
for entry in entries:
self.log_timeline_event(entry)
def handle_activity(self, alert_data):
"""Handle background activity updates."""
container = alert_data.get('NotificationContainer', {})
activities = container.get('ActivityNotification', [])
for activity in activities:
self.log_activity_event(activity)
def log_playback_event(self, session):
"""Log playback events to file or database."""
timestamp = datetime.now()
user = session.get('username', 'Unknown')
state = session.get('state', '')
title = session.get('title', 'Unknown')
log_entry = f"{timestamp}: {user} {state} {title}"
print(log_entry)
# Save to database or log file
def log_timeline_event(self, entry):
"""Log timeline events."""
# Implementation for timeline logging
pass
def log_activity_event(self, activity):
"""Log activity events."""
# Implementation for activity logging
pass
def start(self):
"""Start monitoring."""
self.listener.start()
print("Event monitoring started")
def stop(self):
"""Stop monitoring."""
self.listener.stop()
print("Event monitoring stopped")
# Usage
handler = PlexEventHandler(plex)
handler.start()
# Run until interrupted
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
handler.stop()import time
import logging
from plexapi.alert import AlertListener
from plexapi.exceptions import PlexApiException
class RobustAlertListener:
"""Alert listener with automatic reconnection."""
def __init__(self, server, callback, max_retries=5):
self.server = server
self.callback = callback
self.max_retries = max_retries
self.retry_count = 0
self.listener = None
self.running = False
def start(self):
"""Start listening with automatic retry on failure."""
self.running = True
while self.running and self.retry_count < self.max_retries:
try:
self.listener = AlertListener(self.server, self.callback)
self.listener.start()
self.retry_count = 0 # Reset on successful connection
break
except PlexApiException as e:
self.retry_count += 1
wait_time = min(2 ** self.retry_count, 60) # Exponential backoff
logging.error(f"Alert listener failed: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
def stop(self):
"""Stop listening."""
self.running = False
if self.listener:
self.listener.stop()
# Usage with robust error handling
robust_listener = RobustAlertListener(plex, my_callback)
robust_listener.start()def efficient_callback(alert_data):
"""Efficiently handle high-volume alerts."""
# Process alerts quickly to avoid blocking
alert_type = alert_data.get('type', '')
# Queue heavy processing for background thread
if alert_type == 'playing':
# Quick processing only
session_count = len(alert_data.get('NotificationContainer', {}).get('PlaySessionStateNotification', []))
print(f"Active sessions: {session_count}")
# For complex processing, consider using a queue
# alert_queue.put(alert_data)
# Optional: Use threading for heavy processing
import queue
import threading
alert_queue = queue.Queue()
def background_processor():
"""Process alerts in background thread."""
while True:
try:
alert_data = alert_queue.get(timeout=1)
# Perform heavy processing here
process_complex_alert(alert_data)
alert_queue.task_done()
except queue.Empty:
continue
# Start background processor
processor_thread = threading.Thread(target=background_processor, daemon=True)
processor_thread.start()The AlertListener functionality requires the optional WebSocket dependency:
pip install PlexAPI[alert]This installs the websocket-client package required for real-time WebSocket connections to the Plex server.