or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

basic-logging.mdindex.mdprogress-bars.mdworkers.md

workers.mddocs/

0

# Worker Integration

1

2

Redis Queue (RQ) worker integration that enables progress tracking in distributed job processing environments. These specialized loggers automatically persist progress state to job metadata, allowing external systems to monitor long-running background tasks.

3

4

## Capabilities

5

6

### RQ Worker Progress Logger

7

8

Basic progress logger that automatically saves state to Redis Queue job metadata.

9

10

```python { .api }

11

class RqWorkerProgressLogger:

12

def __init__(self, job):

13

"""

14

Initialize RQ worker progress logger.

15

16

Parameters:

17

- job: RQ Job instance to track progress for

18

"""

19

20

def callback(self, **kw):

21

"""

22

Automatically save progress data to job metadata.

23

24

This callback is triggered whenever the logger state is updated,

25

ensuring progress information is persistently stored in Redis

26

and accessible to external monitoring systems.

27

28

Parameters:

29

- **kw: State updates (automatically passed by logger)

30

"""

31

```

32

33

**Usage Example:**

34

35

```python

36

from rq import Worker, Queue

37

from proglog import RqWorkerProgressLogger

38

39

def long_running_task(job_id):

40

# Get the current RQ job

41

from rq import get_current_job

42

job = get_current_job()

43

44

# Create progress logger

45

logger = RqWorkerProgressLogger(job)

46

47

# Progress is automatically saved to job metadata

48

logger.state.update({"task": "starting", "progress": 0})

49

logger.callback()

50

51

for i in range(100):

52

# Update progress - automatically persisted

53

logger.state.update({"progress": i, "current_item": f"item_{i}"})

54

logger.callback(progress=i, current_item=f"item_{i}")

55

56

# Simulate work

57

time.sleep(0.1)

58

59

logger.state.update({"task": "completed", "progress": 100})

60

logger.callback()

61

62

# Queue the job

63

queue = Queue()

64

job = queue.enqueue(long_running_task, "task_123")

65

66

# Monitor progress from another process

67

print(job.meta.get("progress_data", {}))

68

```

69

70

### RQ Worker Bar Logger

71

72

Combined RQ worker and progress bar logger that provides both bar management and job metadata persistence.

73

74

```python { .api }

75

class RqWorkerBarLogger(RqWorkerProgressLogger, ProgressBarLogger):

76

def __init__(self, job, init_state=None, bars=None, ignored_bars=(),

77

logged_bars="all", min_time_interval=0):

78

"""

79

Initialize RQ worker progress bar logger.

80

81

Combines RqWorkerProgressLogger automatic persistence with

82

ProgressBarLogger bar management capabilities.

83

84

Parameters:

85

- job: RQ Job instance to track progress for

86

- init_state (dict, optional): Initial state dictionary

87

- bars: Bar configuration (None, list, tuple, or dict)

88

- ignored_bars (tuple/list): Bars to ignore (default: empty tuple)

89

- logged_bars ("all" or list): Bars to include in logs

90

- min_time_interval (float): Minimum seconds between bar updates

91

"""

92

```

93

94

**Usage Example:**

95

96

```python

97

from rq import Worker, Queue, get_current_job

98

from proglog import RqWorkerBarLogger

99

import time

100

101

def data_processing_job(data_files):

102

"""Process multiple data files with progress tracking."""

103

job = get_current_job()

104

105

# Create combined logger

106

logger = RqWorkerBarLogger(

107

job,

108

bars=["files", "records"],

109

min_time_interval=0.5 # Update every 500ms

110

)

111

112

# Process files with automatic progress persistence

113

for filename in logger.iter_bar(files=data_files):

114

logger(message=f"Processing {filename}")

115

116

# Load and process records

117

records = load_file(filename)

118

for record in logger.iter_bar(records=records):

119

process_record(record)

120

time.sleep(0.01)

121

122

logger(message=f"Completed {filename}")

123

124

logger(message="All files processed successfully")

125

126

def monitor_job_progress(job):

127

"""Monitor job progress from external process."""

128

while not job.is_finished:

129

progress = job.meta.get("progress_data", {})

130

131

if "bars" in progress:

132

bars = progress["bars"]

133

if "files" in bars:

134

files_bar = bars["files"]

135

print(f"Files: {files_bar.get('index', 0)}/{files_bar.get('total', '?')}")

136

137

if "records" in bars:

138

records_bar = bars["records"]

139

print(f"Records: {records_bar.get('index', 0)}/{records_bar.get('total', '?')}")

140

141

if "message" in progress:

142

print(f"Status: {progress['message']}")

143

144

time.sleep(1)

145

146

# Usage

147

queue = Queue()

148

files = ["data1.csv", "data2.csv", "data3.csv"]

149

job = queue.enqueue(data_processing_job, files)

150

151

# Monitor from another thread/process

152

monitor_job_progress(job)

153

```

154

155

### Progress Data Structure

156

157

The progress data automatically stored in job metadata follows this structure:

158

159

```python { .api }

160

# Job metadata structure (job.meta["progress_data"])

161

progress_data = {

162

# All current logger state

163

"task": str,

164

"progress": int,

165

"message": str,

166

# ... any other state fields

167

168

# Progress bar states (if using RqWorkerBarLogger)

169

"bars": {

170

"bar_name": {

171

"title": str,

172

"index": int,

173

"total": int,

174

"message": str,

175

"indent": int

176

}

177

# ... additional bars

178

}

179

}

180

```

181

182

**Accessing Progress Data:**

183

184

```python

185

from rq import Queue

186

187

# Get job by ID

188

queue = Queue()

189

job = queue.job("job-id-here")

190

191

# Access progress data

192

progress = job.meta.get("progress_data", {})

193

194

# Check overall progress

195

current_progress = progress.get("progress", 0)

196

status_message = progress.get("message", "No status")

197

198

# Check bar progress

199

if "bars" in progress:

200

for bar_name, bar_info in progress["bars"].items():

201

current = bar_info.get("index", 0)

202

total = bar_info.get("total", 0)

203

title = bar_info.get("title", bar_name)

204

print(f"{title}: {current}/{total}")

205

```

206

207

## Integration Patterns

208

209

### Background Job with Progress

210

211

```python

212

from rq import Queue, Worker

213

from proglog import RqWorkerBarLogger

214

215

def batch_processor(items, batch_size=10):

216

"""Process items in batches with progress tracking."""

217

job = get_current_job()

218

logger = RqWorkerBarLogger(job, bars=["batches", "items"])

219

220

batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]

221

222

for batch in logger.iter_bar(batches=batches):

223

logger(message=f"Processing batch of {len(batch)} items")

224

225

for item in logger.iter_bar(items=batch):

226

result = process_item(item)

227

time.sleep(0.1) # Simulate processing time

228

229

logger(message=f"Batch completed")

230

231

logger(message="All processing complete")

232

233

# Queue and monitor

234

queue = Queue()

235

job = queue.enqueue(batch_processor, list(range(100)))

236

```

237

238

### Web Dashboard Integration

239

240

```python

241

from flask import Flask, jsonify

242

from rq import Queue

243

244

app = Flask(__name__)

245

queue = Queue()

246

247

@app.route('/job/<job_id>/progress')

248

def get_job_progress(job_id):

249

"""API endpoint to get job progress."""

250

try:

251

job = queue.job(job_id)

252

progress_data = job.meta.get("progress_data", {})

253

254

return jsonify({

255

"status": job.get_status(),

256

"progress": progress_data.get("progress", 0),

257

"message": progress_data.get("message", ""),

258

"bars": progress_data.get("bars", {}),

259

"is_finished": job.is_finished

260

})

261

except Exception as e:

262

return jsonify({"error": str(e)}), 404

263

264

# Frontend can poll this endpoint for live progress updates

265

```

266

267

### Cleanup and Error Handling

268

269

```python

270

def robust_worker_task(data):

271

"""Worker task with proper error handling and cleanup."""

272

job = get_current_job()

273

logger = RqWorkerBarLogger(job)

274

275

try:

276

logger(message="Task started", progress=0)

277

278

for i, item in enumerate(logger.iter_bar(items=data)):

279

try:

280

result = process_item(item)

281

progress = int((i + 1) / len(data) * 100)

282

logger(progress=progress, message=f"Processed {i+1}/{len(data)}")

283

284

except Exception as item_error:

285

logger(message=f"Error processing item {i}: {item_error}")

286

continue

287

288

logger(message="Task completed successfully", progress=100)

289

290

except Exception as e:

291

logger(message=f"Task failed: {str(e)}", progress=-1)

292

raise

293

294

finally:

295

# Ensure final state is saved

296

logger.callback()

297

```