0
# Worker Integration
1
2
Redis Queue (RQ) worker integration that enables progress tracking in distributed job processing environments. These specialized loggers automatically persist progress state to job metadata, allowing external systems to monitor long-running background tasks.
3
4
## Capabilities
5
6
### RQ Worker Progress Logger
7
8
Basic progress logger that automatically saves state to Redis Queue job metadata.
9
10
```python { .api }
11
class RqWorkerProgressLogger:
12
def __init__(self, job):
13
"""
14
Initialize RQ worker progress logger.
15
16
Parameters:
17
- job: RQ Job instance to track progress for
18
"""
19
20
def callback(self, **kw):
21
"""
22
Automatically save progress data to job metadata.
23
24
This callback is triggered whenever the logger state is updated,
25
ensuring progress information is persistently stored in Redis
26
and accessible to external monitoring systems.
27
28
Parameters:
29
- **kw: State updates (automatically passed by logger)
30
"""
31
```
32
33
**Usage Example:**
34
35
```python
36
from rq import Worker, Queue
37
from proglog import RqWorkerProgressLogger
38
39
def long_running_task(job_id):
40
# Get the current RQ job
41
from rq import get_current_job
42
job = get_current_job()
43
44
# Create progress logger
45
logger = RqWorkerProgressLogger(job)
46
47
# Progress is automatically saved to job metadata
48
logger.state.update({"task": "starting", "progress": 0})
49
logger.callback()
50
51
for i in range(100):
52
# Update progress - automatically persisted
53
logger.state.update({"progress": i, "current_item": f"item_{i}"})
54
logger.callback(progress=i, current_item=f"item_{i}")
55
56
# Simulate work
57
time.sleep(0.1)
58
59
logger.state.update({"task": "completed", "progress": 100})
60
logger.callback()
61
62
# Queue the job
63
queue = Queue()
64
job = queue.enqueue(long_running_task, "task_123")
65
66
# Monitor progress from another process
67
print(job.meta.get("progress_data", {}))
68
```
69
70
### RQ Worker Bar Logger
71
72
Combined RQ worker and progress bar logger that provides both bar management and job metadata persistence.
73
74
```python { .api }
75
class RqWorkerBarLogger(RqWorkerProgressLogger, ProgressBarLogger):
76
def __init__(self, job, init_state=None, bars=None, ignored_bars=(),
77
logged_bars="all", min_time_interval=0):
78
"""
79
Initialize RQ worker progress bar logger.
80
81
Combines RqWorkerProgressLogger automatic persistence with
82
ProgressBarLogger bar management capabilities.
83
84
Parameters:
85
- job: RQ Job instance to track progress for
86
- init_state (dict, optional): Initial state dictionary
87
- bars: Bar configuration (None, list, tuple, or dict)
88
- ignored_bars (tuple/list): Bars to ignore (default: empty tuple)
89
- logged_bars ("all" or list): Bars to include in logs
90
- min_time_interval (float): Minimum seconds between bar updates
91
"""
92
```
93
94
**Usage Example:**
95
96
```python
97
from rq import Worker, Queue, get_current_job
98
from proglog import RqWorkerBarLogger
99
import time
100
101
def data_processing_job(data_files):
102
"""Process multiple data files with progress tracking."""
103
job = get_current_job()
104
105
# Create combined logger
106
logger = RqWorkerBarLogger(
107
job,
108
bars=["files", "records"],
109
min_time_interval=0.5 # Update every 500ms
110
)
111
112
# Process files with automatic progress persistence
113
for filename in logger.iter_bar(files=data_files):
114
logger(message=f"Processing {filename}")
115
116
# Load and process records
117
records = load_file(filename)
118
for record in logger.iter_bar(records=records):
119
process_record(record)
120
time.sleep(0.01)
121
122
logger(message=f"Completed {filename}")
123
124
logger(message="All files processed successfully")
125
126
def monitor_job_progress(job):
127
"""Monitor job progress from external process."""
128
while not job.is_finished:
129
progress = job.meta.get("progress_data", {})
130
131
if "bars" in progress:
132
bars = progress["bars"]
133
if "files" in bars:
134
files_bar = bars["files"]
135
print(f"Files: {files_bar.get('index', 0)}/{files_bar.get('total', '?')}")
136
137
if "records" in bars:
138
records_bar = bars["records"]
139
print(f"Records: {records_bar.get('index', 0)}/{records_bar.get('total', '?')}")
140
141
if "message" in progress:
142
print(f"Status: {progress['message']}")
143
144
time.sleep(1)
145
146
# Usage
147
queue = Queue()
148
files = ["data1.csv", "data2.csv", "data3.csv"]
149
job = queue.enqueue(data_processing_job, files)
150
151
# Monitor from another thread/process
152
monitor_job_progress(job)
153
```
154
155
### Progress Data Structure
156
157
The progress data automatically stored in job metadata follows this structure:
158
159
```python { .api }
160
# Job metadata structure (job.meta["progress_data"])
161
progress_data = {
162
# All current logger state
163
"task": str,
164
"progress": int,
165
"message": str,
166
# ... any other state fields
167
168
# Progress bar states (if using RqWorkerBarLogger)
169
"bars": {
170
"bar_name": {
171
"title": str,
172
"index": int,
173
"total": int,
174
"message": str,
175
"indent": int
176
}
177
# ... additional bars
178
}
179
}
180
```
181
182
**Accessing Progress Data:**
183
184
```python
185
from rq import Queue
186
187
# Get job by ID
188
queue = Queue()
189
job = queue.job("job-id-here")
190
191
# Access progress data
192
progress = job.meta.get("progress_data", {})
193
194
# Check overall progress
195
current_progress = progress.get("progress", 0)
196
status_message = progress.get("message", "No status")
197
198
# Check bar progress
199
if "bars" in progress:
200
for bar_name, bar_info in progress["bars"].items():
201
current = bar_info.get("index", 0)
202
total = bar_info.get("total", 0)
203
title = bar_info.get("title", bar_name)
204
print(f"{title}: {current}/{total}")
205
```
206
207
## Integration Patterns
208
209
### Background Job with Progress
210
211
```python
212
from rq import Queue, Worker
213
from proglog import RqWorkerBarLogger
214
215
def batch_processor(items, batch_size=10):
216
"""Process items in batches with progress tracking."""
217
job = get_current_job()
218
logger = RqWorkerBarLogger(job, bars=["batches", "items"])
219
220
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
221
222
for batch in logger.iter_bar(batches=batches):
223
logger(message=f"Processing batch of {len(batch)} items")
224
225
for item in logger.iter_bar(items=batch):
226
result = process_item(item)
227
time.sleep(0.1) # Simulate processing time
228
229
logger(message=f"Batch completed")
230
231
logger(message="All processing complete")
232
233
# Queue and monitor
234
queue = Queue()
235
job = queue.enqueue(batch_processor, list(range(100)))
236
```
237
238
### Web Dashboard Integration
239
240
```python
241
from flask import Flask, jsonify
242
from rq import Queue
243
244
app = Flask(__name__)
245
queue = Queue()
246
247
@app.route('/job/<job_id>/progress')
248
def get_job_progress(job_id):
249
"""API endpoint to get job progress."""
250
try:
251
job = queue.job(job_id)
252
progress_data = job.meta.get("progress_data", {})
253
254
return jsonify({
255
"status": job.get_status(),
256
"progress": progress_data.get("progress", 0),
257
"message": progress_data.get("message", ""),
258
"bars": progress_data.get("bars", {}),
259
"is_finished": job.is_finished
260
})
261
except Exception as e:
262
return jsonify({"error": str(e)}), 404
263
264
# Frontend can poll this endpoint for live progress updates
265
```
266
267
### Cleanup and Error Handling
268
269
```python
270
def robust_worker_task(data):
271
"""Worker task with proper error handling and cleanup."""
272
job = get_current_job()
273
logger = RqWorkerBarLogger(job)
274
275
try:
276
logger(message="Task started", progress=0)
277
278
for i, item in enumerate(logger.iter_bar(items=data)):
279
try:
280
result = process_item(item)
281
progress = int((i + 1) / len(data) * 100)
282
logger(progress=progress, message=f"Processed {i+1}/{len(data)}")
283
284
except Exception as item_error:
285
logger(message=f"Error processing item {i}: {item_error}")
286
continue
287
288
logger(message="Task completed successfully", progress=100)
289
290
except Exception as e:
291
logger(message=f"Task failed: {str(e)}", progress=-1)
292
raise
293
294
finally:
295
# Ensure final state is saved
296
logger.callback()
297
```