Log and progress bar manager for console, notebooks, web applications with unified APIs and nested progress tracking.
—
Redis Queue (RQ) worker integration that enables progress tracking in distributed job processing environments. These specialized loggers automatically persist progress state to job metadata, allowing external systems to monitor long-running background tasks.
Basic progress logger that automatically saves state to Redis Queue job metadata.
class RqWorkerProgressLogger:
def __init__(self, job):
"""
Initialize RQ worker progress logger.
Parameters:
- job: RQ Job instance to track progress for
"""
def callback(self, **kw):
"""
Automatically save progress data to job metadata.
This callback is triggered whenever the logger state is updated,
ensuring progress information is persistently stored in Redis
and accessible to external monitoring systems.
Parameters:
- **kw: State updates (automatically passed by logger)
"""Usage Example:
from rq import Worker, Queue
from proglog import RqWorkerProgressLogger
def long_running_task(job_id):
# Get the current RQ job
from rq import get_current_job
job = get_current_job()
# Create progress logger
logger = RqWorkerProgressLogger(job)
# Progress is automatically saved to job metadata
logger.state.update({"task": "starting", "progress": 0})
logger.callback()
for i in range(100):
# Update progress - automatically persisted
logger.state.update({"progress": i, "current_item": f"item_{i}"})
logger.callback(progress=i, current_item=f"item_{i}")
# Simulate work
time.sleep(0.1)
logger.state.update({"task": "completed", "progress": 100})
logger.callback()
# Queue the job
queue = Queue()
job = queue.enqueue(long_running_task, "task_123")
# Monitor progress from another process
print(job.meta.get("progress_data", {}))Combined RQ worker and progress bar logger that provides both bar management and job metadata persistence.
class RqWorkerBarLogger(RqWorkerProgressLogger, ProgressBarLogger):
def __init__(self, job, init_state=None, bars=None, ignored_bars=(),
logged_bars="all", min_time_interval=0):
"""
Initialize RQ worker progress bar logger.
Combines RqWorkerProgressLogger automatic persistence with
ProgressBarLogger bar management capabilities.
Parameters:
- job: RQ Job instance to track progress for
- init_state (dict, optional): Initial state dictionary
- bars: Bar configuration (None, list, tuple, or dict)
- ignored_bars (tuple/list): Bars to ignore (default: empty tuple)
- logged_bars ("all" or list): Bars to include in logs
- min_time_interval (float): Minimum seconds between bar updates
"""Usage Example:
from rq import Worker, Queue, get_current_job
from proglog import RqWorkerBarLogger
import time
def data_processing_job(data_files):
"""Process multiple data files with progress tracking."""
job = get_current_job()
# Create combined logger
logger = RqWorkerBarLogger(
job,
bars=["files", "records"],
min_time_interval=0.5 # Update every 500ms
)
# Process files with automatic progress persistence
for filename in logger.iter_bar(files=data_files):
logger(message=f"Processing {filename}")
# Load and process records
records = load_file(filename)
for record in logger.iter_bar(records=records):
process_record(record)
time.sleep(0.01)
logger(message=f"Completed {filename}")
logger(message="All files processed successfully")
def monitor_job_progress(job):
"""Monitor job progress from external process."""
while not job.is_finished:
progress = job.meta.get("progress_data", {})
if "bars" in progress:
bars = progress["bars"]
if "files" in bars:
files_bar = bars["files"]
print(f"Files: {files_bar.get('index', 0)}/{files_bar.get('total', '?')}")
if "records" in bars:
records_bar = bars["records"]
print(f"Records: {records_bar.get('index', 0)}/{records_bar.get('total', '?')}")
if "message" in progress:
print(f"Status: {progress['message']}")
time.sleep(1)
# Usage
queue = Queue()
files = ["data1.csv", "data2.csv", "data3.csv"]
job = queue.enqueue(data_processing_job, files)
# Monitor from another thread/process
monitor_job_progress(job)The progress data automatically stored in job metadata follows this structure:
# Job metadata structure (job.meta["progress_data"])
progress_data = {
# All current logger state
"task": str,
"progress": int,
"message": str,
# ... any other state fields
# Progress bar states (if using RqWorkerBarLogger)
"bars": {
"bar_name": {
"title": str,
"index": int,
"total": int,
"message": str,
"indent": int
}
# ... additional bars
}
}Accessing Progress Data:
from rq import Queue
# Get job by ID
queue = Queue()
job = queue.job("job-id-here")
# Access progress data
progress = job.meta.get("progress_data", {})
# Check overall progress
current_progress = progress.get("progress", 0)
status_message = progress.get("message", "No status")
# Check bar progress
if "bars" in progress:
for bar_name, bar_info in progress["bars"].items():
current = bar_info.get("index", 0)
total = bar_info.get("total", 0)
title = bar_info.get("title", bar_name)
print(f"{title}: {current}/{total}")from rq import Queue, Worker
from proglog import RqWorkerBarLogger
def batch_processor(items, batch_size=10):
"""Process items in batches with progress tracking."""
job = get_current_job()
logger = RqWorkerBarLogger(job, bars=["batches", "items"])
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
for batch in logger.iter_bar(batches=batches):
logger(message=f"Processing batch of {len(batch)} items")
for item in logger.iter_bar(items=batch):
result = process_item(item)
time.sleep(0.1) # Simulate processing time
logger(message=f"Batch completed")
logger(message="All processing complete")
# Queue and monitor
queue = Queue()
job = queue.enqueue(batch_processor, list(range(100)))from flask import Flask, jsonify
from rq import Queue
app = Flask(__name__)
queue = Queue()
@app.route('/job/<job_id>/progress')
def get_job_progress(job_id):
"""API endpoint to get job progress."""
try:
job = queue.job(job_id)
progress_data = job.meta.get("progress_data", {})
return jsonify({
"status": job.get_status(),
"progress": progress_data.get("progress", 0),
"message": progress_data.get("message", ""),
"bars": progress_data.get("bars", {}),
"is_finished": job.is_finished
})
except Exception as e:
return jsonify({"error": str(e)}), 404
# Frontend can poll this endpoint for live progress updatesdef robust_worker_task(data):
"""Worker task with proper error handling and cleanup."""
job = get_current_job()
logger = RqWorkerBarLogger(job)
try:
logger(message="Task started", progress=0)
for i, item in enumerate(logger.iter_bar(items=data)):
try:
result = process_item(item)
progress = int((i + 1) / len(data) * 100)
logger(progress=progress, message=f"Processed {i+1}/{len(data)}")
except Exception as item_error:
logger(message=f"Error processing item {i}: {item_error}")
continue
logger(message="Task completed successfully", progress=100)
except Exception as e:
logger(message=f"Task failed: {str(e)}", progress=-1)
raise
finally:
# Ensure final state is saved
logger.callback()Install with Tessl CLI
npx tessl i tessl/pypi-proglog