0
# Job Monitoring
1
2
Real-time job monitoring using WebSocket connections for tracking long-running operations. Provides both synchronous and asynchronous monitoring interfaces for crawls, batch operations, and extractions.
3
4
## Capabilities
5
6
### Synchronous Job Monitoring
7
8
Monitor job progress using the synchronous Watcher class with iterator interface for real-time updates.
9
10
```python { .api }
11
class Watcher:
12
"""Synchronous WebSocket job monitoring"""
13
14
def __init__(self, client: FirecrawlClient):
15
"""
16
Initialize watcher with Firecrawl client.
17
18
Parameters:
19
- client: FirecrawlClient instance for API access
20
"""
21
22
def watch(self, job_id: str, job_type: str) -> Iterator[dict]:
23
"""
24
Monitor job progress via WebSocket.
25
26
Parameters:
27
- job_id: str, job ID to monitor
28
- job_type: str, type of job ("crawl", "batch_scrape", "extract")
29
30
Returns:
31
- Iterator[dict]: iterator yielding progress updates
32
"""
33
34
def close(self) -> None:
35
"""Close WebSocket connection"""
36
```
37
38
### Asynchronous Job Monitoring
39
40
Monitor job progress using the asynchronous AsyncWatcher class with async iterator interface.
41
42
```python { .api }
43
class AsyncWatcher:
44
"""Asynchronous WebSocket job monitoring"""
45
46
def __init__(self, client: AsyncFirecrawlClient):
47
"""
48
Initialize async watcher with AsyncFirecrawl client.
49
50
Parameters:
51
- client: AsyncFirecrawlClient instance for API access
52
"""
53
54
def watch(self, job_id: str, job_type: str) -> AsyncIterator[dict]:
55
"""
56
Monitor job progress via async WebSocket.
57
58
Parameters:
59
- job_id: str, job ID to monitor
60
- job_type: str, type of job ("crawl", "batch_scrape", "extract")
61
62
Returns:
63
- AsyncIterator[dict]: async iterator yielding progress updates
64
"""
65
66
async def close(self) -> None:
67
"""Close WebSocket connection"""
68
```
69
70
## Usage Examples
71
72
### Basic Job Monitoring
73
74
```python
75
from firecrawl import Firecrawl, Watcher, CrawlOptions
76
77
app = Firecrawl(api_key="your-api-key")
78
79
# Start a crawl job
80
crawl_id = app.start_crawl("https://example.com",
81
CrawlOptions(limit=100))
82
83
# Monitor with Watcher
84
watcher = Watcher(app._v2_client)
85
86
print(f"Monitoring crawl job: {crawl_id}")
87
for update in watcher.watch(crawl_id, "crawl"):
88
print(f"Progress: {update.get('completed', 0)}/{update.get('total', 0)}")
89
print(f"Status: {update.get('status')}")
90
91
if update.get('status') in ['completed', 'failed', 'cancelled']:
92
break
93
94
watcher.close()
95
print("Monitoring completed")
96
```
97
98
### Advanced Monitoring with Error Handling
99
100
```python
101
from firecrawl import Firecrawl, Watcher
102
import time
103
104
app = Firecrawl(api_key="your-api-key")
105
106
# Start multiple jobs
107
jobs = []
108
for i in range(3):
109
crawl_id = app.start_crawl(f"https://example{i+1}.com")
110
jobs.append({"id": crawl_id, "type": "crawl", "url": f"https://example{i+1}.com"})
111
112
# Monitor all jobs
113
watcher = Watcher(app._v2_client)
114
115
for job in jobs:
116
print(f"Starting monitoring for {job['url']} (ID: {job['id']})")
117
118
try:
119
for update in watcher.watch(job['id'], job['type']):
120
status = update.get('status')
121
completed = update.get('completed', 0)
122
total = update.get('total', 0)
123
124
print(f"Job {job['id']}: {status} - {completed}/{total}")
125
126
if status == 'completed':
127
print(f"✓ Job {job['id']} completed successfully")
128
break
129
elif status == 'failed':
130
print(f"✗ Job {job['id']} failed")
131
errors = update.get('errors', [])
132
for error in errors:
133
print(f" Error: {error}")
134
break
135
elif status == 'cancelled':
136
print(f"⚠ Job {job['id']} was cancelled")
137
break
138
139
except Exception as e:
140
print(f"Error monitoring job {job['id']}: {e}")
141
142
watcher.close()
143
```
144
145
### Batch Job Monitoring
146
147
```python
148
from firecrawl import Firecrawl, Watcher
149
150
app = Firecrawl(api_key="your-api-key")
151
watcher = Watcher(app._v2_client)
152
153
# Start batch scrape
154
urls = [f"https://example.com/page{i}" for i in range(1, 51)]
155
batch_id = app.start_batch_scrape(urls)
156
157
print(f"Monitoring batch job: {batch_id}")
158
start_time = time.time()
159
160
for update in watcher.watch(batch_id, "batch_scrape"):
161
status = update.get('status')
162
completed = update.get('completed', 0)
163
total = update.get('total', 0)
164
165
# Calculate progress percentage
166
progress = (completed / total * 100) if total > 0 else 0
167
168
# Calculate ETA
169
elapsed = time.time() - start_time
170
if completed > 0:
171
eta = (elapsed / completed) * (total - completed)
172
eta_str = f"{eta:.1f}s"
173
else:
174
eta_str = "calculating..."
175
176
print(f"Batch Progress: {progress:.1f}% ({completed}/{total}) - ETA: {eta_str}")
177
178
if status in ['completed', 'failed', 'cancelled']:
179
break
180
181
watcher.close()
182
```
183
184
### Extraction Job Monitoring
185
186
```python
187
from firecrawl import Firecrawl, Watcher
188
189
app = Firecrawl(api_key="your-api-key")
190
watcher = Watcher(app._v2_client)
191
192
# Complex extraction schema
193
schema = {
194
"type": "object",
195
"properties": {
196
"products": {
197
"type": "array",
198
"items": {
199
"type": "object",
200
"properties": {
201
"name": {"type": "string"},
202
"price": {"type": "number"},
203
"description": {"type": "string"}
204
}
205
}
206
}
207
}
208
}
209
210
# Start extraction job
211
extract_id = app.start_extract("https://store.example.com", schema)
212
213
print(f"Monitoring extraction job: {extract_id}")
214
for update in watcher.watch(extract_id, "extract"):
215
status = update.get('status')
216
print(f"Extraction Status: {status}")
217
218
if status == 'completed':
219
data = update.get('data', {})
220
products = data.get('products', [])
221
print(f"✓ Extracted {len(products)} products")
222
break
223
elif status in ['failed', 'cancelled']:
224
print(f"✗ Extraction {status}")
225
break
226
227
watcher.close()
228
```
229
230
## Async Usage
231
232
### Basic Async Monitoring
233
234
```python
235
import asyncio
236
from firecrawl import AsyncFirecrawl, AsyncWatcher
237
238
async def monitor_async():
239
app = AsyncFirecrawl(api_key="your-api-key")
240
241
# Start crawl job
242
crawl_id = await app.start_crawl("https://example.com")
243
244
# Monitor with AsyncWatcher
245
async_watcher = AsyncWatcher(app._v2_client)
246
247
print(f"Monitoring crawl job: {crawl_id}")
248
async for update in async_watcher.watch(crawl_id, "crawl"):
249
print(f"Progress: {update.get('completed', 0)}/{update.get('total', 0)}")
250
print(f"Status: {update.get('status')}")
251
252
if update.get('status') in ['completed', 'failed', 'cancelled']:
253
break
254
255
await async_watcher.close()
256
print("Monitoring completed")
257
258
asyncio.run(monitor_async())
259
```
260
261
### Concurrent Job Monitoring
262
263
```python
264
import asyncio
265
from firecrawl import AsyncFirecrawl, AsyncWatcher
266
267
async def monitor_job(watcher, job_id, job_type, name):
268
"""Monitor a single job asynchronously"""
269
print(f"Starting monitoring for {name}")
270
271
async for update in watcher.watch(job_id, job_type):
272
status = update.get('status')
273
completed = update.get('completed', 0)
274
total = update.get('total', 0)
275
276
print(f"{name}: {status} - {completed}/{total}")
277
278
if status in ['completed', 'failed', 'cancelled']:
279
break
280
281
print(f"{name} monitoring completed")
282
283
async def monitor_multiple_jobs():
284
app = AsyncFirecrawl(api_key="your-api-key")
285
watcher = AsyncWatcher(app._v2_client)
286
287
# Start multiple jobs
288
crawl_id1 = await app.start_crawl("https://example1.com")
289
crawl_id2 = await app.start_crawl("https://example2.com")
290
batch_id = await app.start_batch_scrape([
291
"https://example3.com/page1",
292
"https://example3.com/page2"
293
])
294
295
# Monitor all jobs concurrently
296
await asyncio.gather(
297
monitor_job(watcher, crawl_id1, "crawl", "Crawl 1"),
298
monitor_job(watcher, crawl_id2, "crawl", "Crawl 2"),
299
monitor_job(watcher, batch_id, "batch_scrape", "Batch")
300
)
301
302
await watcher.close()
303
304
asyncio.run(monitor_multiple_jobs())
305
```
306
307
## Types
308
309
```python { .api }
310
class JobUpdate:
311
"""Structure of job progress updates"""
312
status: str # Current job status
313
job_id: str # Job identifier
314
completed: int # Number of completed items
315
total: int # Total number of items
316
data: Optional[dict] # Job results (when completed)
317
errors: Optional[List[str]] # Error messages (when failed)
318
timestamp: str # Update timestamp
319
320
class WatcherError(Exception):
321
"""Exception raised during job monitoring"""
322
job_id: str
323
message: str
324
```
325
326
## Connection Management
327
328
### Manual Connection Control
329
330
```python
331
from firecrawl import Firecrawl, Watcher
332
333
app = Firecrawl(api_key="your-api-key")
334
watcher = Watcher(app._v2_client)
335
336
try:
337
crawl_id = app.start_crawl("https://example.com")
338
339
# Monitor job
340
for update in watcher.watch(crawl_id, "crawl"):
341
print(f"Status: {update.get('status')}")
342
if update.get('status') in ['completed', 'failed', 'cancelled']:
343
break
344
345
finally:
346
# Always close the connection
347
watcher.close()
348
```
349
350
### Context Manager Usage
351
352
```python
353
from firecrawl import Firecrawl, Watcher
354
355
app = Firecrawl(api_key="your-api-key")
356
357
class WatcherContext:
358
def __init__(self, client):
359
self.watcher = Watcher(client)
360
361
def __enter__(self):
362
return self.watcher
363
364
def __exit__(self, exc_type, exc_val, exc_tb):
365
self.watcher.close()
366
367
# Usage with context manager
368
with WatcherContext(app._v2_client) as watcher:
369
crawl_id = app.start_crawl("https://example.com")
370
371
for update in watcher.watch(crawl_id, "crawl"):
372
print(f"Status: {update.get('status')}")
373
if update.get('status') in ['completed', 'failed', 'cancelled']:
374
break
375
# Connection automatically closed
376
```