or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

account-management.mdalert-system.mdclient-control.mdindex.mdmedia-library.mdplaylists-collections.mdserver-connection.mdsync-management.md
tile.json

alert-system.mddocs/

Real-time Server Monitoring

WebSocket-based real-time monitoring of server events, activities, and notifications for building responsive Plex applications that react immediately to server changes.

Capabilities

AlertListener Setup

Primary class for establishing WebSocket connections and receiving real-time server notifications.

class AlertListener:
    def __init__(self, server, callback, ws_url=None, **kwargs):
        """
        Create real-time alert listener for server events.
        
        Args:
            server (PlexServer): Connected Plex server
            callback (function): Function to handle incoming alerts
            ws_url (str, optional): Custom WebSocket URL
            **kwargs: Additional WebSocket connection parameters
        """
        
    def start(self):
        """Start listening for server alerts in background thread."""
        
    def stop(self):
        """Stop listening and close WebSocket connection."""
        
    def is_running(self):
        """
        Check if alert listener is actively running.
        
        Returns:
            bool: True if listener is active
        """

Alert Event Types

Server events that can be monitored through the AlertListener system.

# Common alert event types received via callback
class AlertEvent:
    """Alert event data structure."""
    
    @property
    def type(self):
        """str: Event type identifier."""
        
    @property
    def data(self):
        """dict: Event-specific data payload."""
        
    @property
    def timestamp(self):
        """datetime: When the event occurred."""

# Event type constants
ALERT_TYPES = {
    'library.refresh.started': 'Library refresh started',
    'library.refresh.progress': 'Library refresh progress update',
    'library.refresh.finished': 'Library refresh completed',
    'media.scrobble': 'Media playback scrobble event',
    'media.play': 'Media playback started',
    'media.pause': 'Media playback paused',
    'media.resume': 'Media playback resumed',
    'media.stop': 'Media playback stopped',
    'admin.database.backup': 'Database backup event',
    'admin.database.corrupted': 'Database corruption detected',
    'transcoder.progress': 'Transcoding progress update',
    'activity': 'Background activity update'
}

Callback Function Interface

Structure for handling incoming alert events.

def alert_callback(alert_data):
    """
    Callback function for processing server alerts.
    
    Args:
        alert_data (dict): Raw alert data from server containing:
            - type (str): Alert event type
            - NotificationContainer (dict): Event details
            - size (int): Number of notifications
            - data (list): List of notification objects
    """
    
# Example callback implementation structure
def process_alert(alert_data):
    alert_type = alert_data.get('type')
    container = alert_data.get('NotificationContainer', {})
    
    if alert_type == 'playing':
        # Handle playback events
        sessions = container.get('PlaySessionStateNotification', [])
        for session in sessions:
            handle_playback_event(session)
    
    elif alert_type == 'timeline':
        # Handle timeline updates
        entries = container.get('TimelineEntry', [])
        for entry in entries:
            handle_timeline_update(entry)
    
    elif alert_type == 'activity':
        # Handle background activities
        activities = container.get('ActivityNotification', [])
        for activity in activities:
            handle_activity_update(activity)

Usage Examples

Basic Alert Monitoring

from plexapi.server import PlexServer
from plexapi.alert import AlertListener

plex = PlexServer('http://localhost:32400', token='your-token')

def my_callback(alert_data):
    """Handle server alerts."""
    alert_type = alert_data.get('type', 'unknown')
    print(f"Alert received: {alert_type}")
    
    if alert_type == 'playing':
        # Handle playback state changes
        container = alert_data.get('NotificationContainer', {})
        sessions = container.get('PlaySessionStateNotification', [])
        
        for session in sessions:
            state = session.get('state', '')
            session_key = session.get('sessionKey', '')
            print(f"Session {session_key}: {state}")

# Create and start listener
listener = AlertListener(plex, my_callback)
listener.start()

# Keep program running to receive alerts
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    listener.stop()

Playback Monitoring

def playback_monitor(alert_data):
    """Monitor media playback events."""
    if alert_data.get('type') == 'playing':
        container = alert_data.get('NotificationContainer', {})
        sessions = container.get('PlaySessionStateNotification', [])
        
        for session in sessions:
            user = session.get('username', 'Unknown')
            state = session.get('state', '')
            title = session.get('title', 'Unknown')
            
            if state == 'playing':
                print(f"{user} started playing: {title}")
            elif state == 'paused':
                print(f"{user} paused: {title}")
            elif state == 'stopped':
                print(f"{user} stopped: {title}")

listener = AlertListener(plex, playback_monitor)
listener.start()

Library Activity Monitoring

def library_monitor(alert_data):
    """Monitor library scanning and updates."""
    alert_type = alert_data.get('type', '')
    
    if alert_type == 'activity':
        container = alert_data.get('NotificationContainer', {})
        activities = container.get('ActivityNotification', [])
        
        for activity in activities:
            activity_type = activity.get('type', '')
            title = activity.get('title', '')
            progress = activity.get('progress', 0)
            
            if activity_type == 'library.refresh':
                print(f"Library scan: {title} - {progress}% complete")
            elif activity_type == 'transcoder':
                print(f"Transcoding: {title} - {progress}% complete")

listener = AlertListener(plex, library_monitor)
listener.start()

Timeline Event Monitoring

def timeline_monitor(alert_data):
    """Monitor media timeline updates."""
    if alert_data.get('type') == 'timeline':
        container = alert_data.get('NotificationContainer', {})
        entries = container.get('TimelineEntry', [])
        
        for entry in entries:
            item_type = entry.get('type', '')
            state = entry.get('state', '')
            title = entry.get('title', 'Unknown')
            
            if state == 'created':
                print(f"New {item_type} added: {title}")
            elif state == 'updated':
                print(f"{item_type} updated: {title}")
            elif state == 'deleted':
                print(f"{item_type} deleted: {title}")

listener = AlertListener(plex, timeline_monitor)
listener.start()

Multi-Event Handler

class PlexEventHandler:
    """Comprehensive event handler for multiple alert types."""
    
    def __init__(self, plex_server):
        self.plex = plex_server
        self.listener = AlertListener(plex_server, self.handle_alert)
        
    def handle_alert(self, alert_data):
        """Route alerts to specific handlers."""
        alert_type = alert_data.get('type', '')
        
        if alert_type == 'playing':
            self.handle_playback(alert_data)
        elif alert_type == 'timeline':
            self.handle_timeline(alert_data)
        elif alert_type == 'activity':
            self.handle_activity(alert_data)
        else:
            print(f"Unhandled alert type: {alert_type}")
    
    def handle_playback(self, alert_data):
        """Handle playback state changes."""
        container = alert_data.get('NotificationContainer', {})
        sessions = container.get('PlaySessionStateNotification', [])
        
        for session in sessions:
            self.log_playback_event(session)
    
    def handle_timeline(self, alert_data):
        """Handle library timeline updates."""
        container = alert_data.get('NotificationContainer', {})
        entries = container.get('TimelineEntry', [])
        
        for entry in entries:
            self.log_timeline_event(entry)
    
    def handle_activity(self, alert_data):
        """Handle background activity updates."""
        container = alert_data.get('NotificationContainer', {})
        activities = container.get('ActivityNotification', [])
        
        for activity in activities:
            self.log_activity_event(activity)
    
    def log_playback_event(self, session):
        """Log playback events to file or database."""
        timestamp = datetime.now()
        user = session.get('username', 'Unknown')
        state = session.get('state', '')
        title = session.get('title', 'Unknown')
        
        log_entry = f"{timestamp}: {user} {state} {title}"
        print(log_entry)
        # Save to database or log file
    
    def log_timeline_event(self, entry):
        """Log timeline events."""
        # Implementation for timeline logging
        pass
    
    def log_activity_event(self, activity):
        """Log activity events."""
        # Implementation for activity logging
        pass
    
    def start(self):
        """Start monitoring."""
        self.listener.start()
        print("Event monitoring started")
    
    def stop(self):
        """Stop monitoring."""
        self.listener.stop()
        print("Event monitoring stopped")

# Usage
handler = PlexEventHandler(plex)
handler.start()

# Run until interrupted
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    handler.stop()

Error Handling and Reconnection

import time
import logging
from plexapi.alert import AlertListener
from plexapi.exceptions import PlexApiException

class RobustAlertListener:
    """Alert listener with automatic reconnection."""
    
    def __init__(self, server, callback, max_retries=5):
        self.server = server
        self.callback = callback
        self.max_retries = max_retries
        self.retry_count = 0
        self.listener = None
        self.running = False
    
    def start(self):
        """Start listening with automatic retry on failure."""
        self.running = True
        while self.running and self.retry_count < self.max_retries:
            try:
                self.listener = AlertListener(self.server, self.callback)
                self.listener.start()
                self.retry_count = 0  # Reset on successful connection
                break
                
            except PlexApiException as e:
                self.retry_count += 1
                wait_time = min(2 ** self.retry_count, 60)  # Exponential backoff
                logging.error(f"Alert listener failed: {e}. Retrying in {wait_time}s...")
                time.sleep(wait_time)
    
    def stop(self):
        """Stop listening."""
        self.running = False
        if self.listener:
            self.listener.stop()

# Usage with robust error handling
robust_listener = RobustAlertListener(plex, my_callback)
robust_listener.start()

Performance Considerations

def efficient_callback(alert_data):
    """Efficiently handle high-volume alerts."""
    # Process alerts quickly to avoid blocking
    alert_type = alert_data.get('type', '')
    
    # Queue heavy processing for background thread
    if alert_type == 'playing':
        # Quick processing only
        session_count = len(alert_data.get('NotificationContainer', {}).get('PlaySessionStateNotification', []))
        print(f"Active sessions: {session_count}")
    
    # For complex processing, consider using a queue
    # alert_queue.put(alert_data)

# Optional: Use threading for heavy processing
import queue
import threading

alert_queue = queue.Queue()

def background_processor():
    """Process alerts in background thread."""
    while True:
        try:
            alert_data = alert_queue.get(timeout=1)
            # Perform heavy processing here
            process_complex_alert(alert_data)
            alert_queue.task_done()
        except queue.Empty:
            continue

# Start background processor
processor_thread = threading.Thread(target=background_processor, daemon=True)
processor_thread.start()

Installation Requirements

The AlertListener functionality requires the optional WebSocket dependency:

pip install PlexAPI[alert]

This installs the websocket-client package required for real-time WebSocket connections to the Plex server.