CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-proglog

Log and progress bar manager for console, notebooks, web applications with unified APIs and nested progress tracking.

Pending
Overview
Eval results
Files

workers.mddocs/

Worker Integration

Redis Queue (RQ) worker integration that enables progress tracking in distributed job processing environments. These specialized loggers automatically persist progress state to job metadata, allowing external systems to monitor long-running background tasks.

Capabilities

RQ Worker Progress Logger

Basic progress logger that automatically saves state to Redis Queue job metadata.

class RqWorkerProgressLogger:
    def __init__(self, job):
        """
        Initialize RQ worker progress logger.
        
        Parameters:
        - job: RQ Job instance to track progress for
        """
        
    def callback(self, **kw):
        """
        Automatically save progress data to job metadata.
        
        This callback is triggered whenever the logger state is updated,
        ensuring progress information is persistently stored in Redis
        and accessible to external monitoring systems.
        
        Parameters:
        - **kw: State updates (automatically passed by logger)
        """

Usage Example:

from rq import Worker, Queue
from proglog import RqWorkerProgressLogger

def long_running_task(job_id):
    # Get the current RQ job
    from rq import get_current_job
    job = get_current_job()
    
    # Create progress logger
    logger = RqWorkerProgressLogger(job)
    
    # Progress is automatically saved to job metadata
    logger.state.update({"task": "starting", "progress": 0})
    logger.callback()
    
    for i in range(100):
        # Update progress - automatically persisted
        logger.state.update({"progress": i, "current_item": f"item_{i}"})
        logger.callback(progress=i, current_item=f"item_{i}")
        
        # Simulate work
        time.sleep(0.1)
    
    logger.state.update({"task": "completed", "progress": 100})
    logger.callback()

# Queue the job
queue = Queue()
job = queue.enqueue(long_running_task, "task_123")

# Monitor progress from another process
print(job.meta.get("progress_data", {}))

RQ Worker Bar Logger

Combined RQ worker and progress bar logger that provides both bar management and job metadata persistence.

class RqWorkerBarLogger(RqWorkerProgressLogger, ProgressBarLogger):
    def __init__(self, job, init_state=None, bars=None, ignored_bars=(), 
                 logged_bars="all", min_time_interval=0):
        """
        Initialize RQ worker progress bar logger.
        
        Combines RqWorkerProgressLogger automatic persistence with
        ProgressBarLogger bar management capabilities.
        
        Parameters:
        - job: RQ Job instance to track progress for
        - init_state (dict, optional): Initial state dictionary
        - bars: Bar configuration (None, list, tuple, or dict)
        - ignored_bars (tuple/list): Bars to ignore (default: empty tuple)
        - logged_bars ("all" or list): Bars to include in logs
        - min_time_interval (float): Minimum seconds between bar updates
        """

Usage Example:

from rq import Worker, Queue, get_current_job
from proglog import RqWorkerBarLogger
import time

def data_processing_job(data_files):
    """Process multiple data files with progress tracking."""
    job = get_current_job()
    
    # Create combined logger
    logger = RqWorkerBarLogger(
        job,
        bars=["files", "records"],
        min_time_interval=0.5  # Update every 500ms
    )
    
    # Process files with automatic progress persistence
    for filename in logger.iter_bar(files=data_files):
        logger(message=f"Processing {filename}")
        
        # Load and process records
        records = load_file(filename)
        for record in logger.iter_bar(records=records):
            process_record(record)
            time.sleep(0.01)
        
        logger(message=f"Completed {filename}")
    
    logger(message="All files processed successfully")

def monitor_job_progress(job):
    """Monitor job progress from external process."""
    while not job.is_finished:
        progress = job.meta.get("progress_data", {})
        
        if "bars" in progress:
            bars = progress["bars"]
            if "files" in bars:
                files_bar = bars["files"]
                print(f"Files: {files_bar.get('index', 0)}/{files_bar.get('total', '?')}")
            
            if "records" in bars:
                records_bar = bars["records"]
                print(f"Records: {records_bar.get('index', 0)}/{records_bar.get('total', '?')}")
        
        if "message" in progress:
            print(f"Status: {progress['message']}")
        
        time.sleep(1)

# Usage
queue = Queue()
files = ["data1.csv", "data2.csv", "data3.csv"]
job = queue.enqueue(data_processing_job, files)

# Monitor from another thread/process
monitor_job_progress(job)

Progress Data Structure

The progress data automatically stored in job metadata follows this structure:

# Job metadata structure (job.meta["progress_data"])
progress_data = {
    # All current logger state
    "task": str,
    "progress": int,
    "message": str,
    # ... any other state fields
    
    # Progress bar states (if using RqWorkerBarLogger)
    "bars": {
        "bar_name": {
            "title": str,
            "index": int, 
            "total": int,
            "message": str,
            "indent": int
        }
        # ... additional bars
    }
}

Accessing Progress Data:

from rq import Queue

# Get job by ID
queue = Queue()
job = queue.job("job-id-here")

# Access progress data
progress = job.meta.get("progress_data", {})

# Check overall progress
current_progress = progress.get("progress", 0)
status_message = progress.get("message", "No status")

# Check bar progress
if "bars" in progress:
    for bar_name, bar_info in progress["bars"].items():
        current = bar_info.get("index", 0)
        total = bar_info.get("total", 0)
        title = bar_info.get("title", bar_name)
        print(f"{title}: {current}/{total}")

Integration Patterns

Background Job with Progress

from rq import Queue, Worker
from proglog import RqWorkerBarLogger

def batch_processor(items, batch_size=10):
    """Process items in batches with progress tracking."""
    job = get_current_job()
    logger = RqWorkerBarLogger(job, bars=["batches", "items"])
    
    batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
    
    for batch in logger.iter_bar(batches=batches):
        logger(message=f"Processing batch of {len(batch)} items")
        
        for item in logger.iter_bar(items=batch):
            result = process_item(item)
            time.sleep(0.1)  # Simulate processing time
        
        logger(message=f"Batch completed")
    
    logger(message="All processing complete")

# Queue and monitor
queue = Queue()
job = queue.enqueue(batch_processor, list(range(100)))

Web Dashboard Integration

from flask import Flask, jsonify
from rq import Queue

app = Flask(__name__)
queue = Queue()

@app.route('/job/<job_id>/progress')
def get_job_progress(job_id):
    """API endpoint to get job progress."""
    try:
        job = queue.job(job_id)
        progress_data = job.meta.get("progress_data", {})
        
        return jsonify({
            "status": job.get_status(),
            "progress": progress_data.get("progress", 0),
            "message": progress_data.get("message", ""),
            "bars": progress_data.get("bars", {}),
            "is_finished": job.is_finished
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 404

# Frontend can poll this endpoint for live progress updates

Cleanup and Error Handling

def robust_worker_task(data):
    """Worker task with proper error handling and cleanup."""
    job = get_current_job()
    logger = RqWorkerBarLogger(job)
    
    try:
        logger(message="Task started", progress=0)
        
        for i, item in enumerate(logger.iter_bar(items=data)):
            try:
                result = process_item(item)
                progress = int((i + 1) / len(data) * 100)
                logger(progress=progress, message=f"Processed {i+1}/{len(data)}")
                
            except Exception as item_error:
                logger(message=f"Error processing item {i}: {item_error}")
                continue
        
        logger(message="Task completed successfully", progress=100)
        
    except Exception as e:
        logger(message=f"Task failed: {str(e)}", progress=-1)
        raise
    
    finally:
        # Ensure final state is saved
        logger.callback()

Install with Tessl CLI

npx tessl i tessl/pypi-proglog

docs

basic-logging.md

index.md

progress-bars.md

workers.md

tile.json