Python SDK for Firecrawl API that enables web scraping, crawling, and content extraction with LLM-optimized output formats
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Real-time job monitoring using WebSocket connections for tracking long-running operations. Provides both synchronous and asynchronous monitoring interfaces for crawls, batch operations, and extractions.
Monitor job progress using the synchronous Watcher class with iterator interface for real-time updates.
class Watcher:
"""Synchronous WebSocket job monitoring"""
def __init__(self, client: FirecrawlClient):
"""
Initialize watcher with Firecrawl client.
Parameters:
- client: FirecrawlClient instance for API access
"""
def watch(self, job_id: str, job_type: str) -> Iterator[dict]:
"""
Monitor job progress via WebSocket.
Parameters:
- job_id: str, job ID to monitor
- job_type: str, type of job ("crawl", "batch_scrape", "extract")
Returns:
- Iterator[dict]: iterator yielding progress updates
"""
def close(self) -> None:
"""Close WebSocket connection"""Monitor job progress using the asynchronous AsyncWatcher class with async iterator interface.
class AsyncWatcher:
"""Asynchronous WebSocket job monitoring"""
def __init__(self, client: AsyncFirecrawlClient):
"""
Initialize async watcher with AsyncFirecrawl client.
Parameters:
- client: AsyncFirecrawlClient instance for API access
"""
def watch(self, job_id: str, job_type: str) -> AsyncIterator[dict]:
"""
Monitor job progress via async WebSocket.
Parameters:
- job_id: str, job ID to monitor
- job_type: str, type of job ("crawl", "batch_scrape", "extract")
Returns:
- AsyncIterator[dict]: async iterator yielding progress updates
"""
async def close(self) -> None:
"""Close WebSocket connection"""from firecrawl import Firecrawl, Watcher, CrawlOptions
app = Firecrawl(api_key="your-api-key")
# Start a crawl job
crawl_id = app.start_crawl("https://example.com",
CrawlOptions(limit=100))
# Monitor with Watcher
watcher = Watcher(app._v2_client)
print(f"Monitoring crawl job: {crawl_id}")
for update in watcher.watch(crawl_id, "crawl"):
print(f"Progress: {update.get('completed', 0)}/{update.get('total', 0)}")
print(f"Status: {update.get('status')}")
if update.get('status') in ['completed', 'failed', 'cancelled']:
break
watcher.close()
print("Monitoring completed")from firecrawl import Firecrawl, Watcher
import time
app = Firecrawl(api_key="your-api-key")
# Start multiple jobs
jobs = []
for i in range(3):
crawl_id = app.start_crawl(f"https://example{i+1}.com")
jobs.append({"id": crawl_id, "type": "crawl", "url": f"https://example{i+1}.com"})
# Monitor all jobs
watcher = Watcher(app._v2_client)
for job in jobs:
print(f"Starting monitoring for {job['url']} (ID: {job['id']})")
try:
for update in watcher.watch(job['id'], job['type']):
status = update.get('status')
completed = update.get('completed', 0)
total = update.get('total', 0)
print(f"Job {job['id']}: {status} - {completed}/{total}")
if status == 'completed':
print(f"✓ Job {job['id']} completed successfully")
break
elif status == 'failed':
print(f"✗ Job {job['id']} failed")
errors = update.get('errors', [])
for error in errors:
print(f" Error: {error}")
break
elif status == 'cancelled':
print(f"⚠ Job {job['id']} was cancelled")
break
except Exception as e:
print(f"Error monitoring job {job['id']}: {e}")
watcher.close()from firecrawl import Firecrawl, Watcher
app = Firecrawl(api_key="your-api-key")
watcher = Watcher(app._v2_client)
# Start batch scrape
urls = [f"https://example.com/page{i}" for i in range(1, 51)]
batch_id = app.start_batch_scrape(urls)
print(f"Monitoring batch job: {batch_id}")
start_time = time.time()
for update in watcher.watch(batch_id, "batch_scrape"):
status = update.get('status')
completed = update.get('completed', 0)
total = update.get('total', 0)
# Calculate progress percentage
progress = (completed / total * 100) if total > 0 else 0
# Calculate ETA
elapsed = time.time() - start_time
if completed > 0:
eta = (elapsed / completed) * (total - completed)
eta_str = f"{eta:.1f}s"
else:
eta_str = "calculating..."
print(f"Batch Progress: {progress:.1f}% ({completed}/{total}) - ETA: {eta_str}")
if status in ['completed', 'failed', 'cancelled']:
break
watcher.close()from firecrawl import Firecrawl, Watcher
app = Firecrawl(api_key="your-api-key")
watcher = Watcher(app._v2_client)
# Complex extraction schema
schema = {
"type": "object",
"properties": {
"products": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"price": {"type": "number"},
"description": {"type": "string"}
}
}
}
}
}
# Start extraction job
extract_id = app.start_extract("https://store.example.com", schema)
print(f"Monitoring extraction job: {extract_id}")
for update in watcher.watch(extract_id, "extract"):
status = update.get('status')
print(f"Extraction Status: {status}")
if status == 'completed':
data = update.get('data', {})
products = data.get('products', [])
print(f"✓ Extracted {len(products)} products")
break
elif status in ['failed', 'cancelled']:
print(f"✗ Extraction {status}")
break
watcher.close()import asyncio
from firecrawl import AsyncFirecrawl, AsyncWatcher
async def monitor_async():
app = AsyncFirecrawl(api_key="your-api-key")
# Start crawl job
crawl_id = await app.start_crawl("https://example.com")
# Monitor with AsyncWatcher
async_watcher = AsyncWatcher(app._v2_client)
print(f"Monitoring crawl job: {crawl_id}")
async for update in async_watcher.watch(crawl_id, "crawl"):
print(f"Progress: {update.get('completed', 0)}/{update.get('total', 0)}")
print(f"Status: {update.get('status')}")
if update.get('status') in ['completed', 'failed', 'cancelled']:
break
await async_watcher.close()
print("Monitoring completed")
asyncio.run(monitor_async())import asyncio
from firecrawl import AsyncFirecrawl, AsyncWatcher
async def monitor_job(watcher, job_id, job_type, name):
"""Monitor a single job asynchronously"""
print(f"Starting monitoring for {name}")
async for update in watcher.watch(job_id, job_type):
status = update.get('status')
completed = update.get('completed', 0)
total = update.get('total', 0)
print(f"{name}: {status} - {completed}/{total}")
if status in ['completed', 'failed', 'cancelled']:
break
print(f"{name} monitoring completed")
async def monitor_multiple_jobs():
app = AsyncFirecrawl(api_key="your-api-key")
watcher = AsyncWatcher(app._v2_client)
# Start multiple jobs
crawl_id1 = await app.start_crawl("https://example1.com")
crawl_id2 = await app.start_crawl("https://example2.com")
batch_id = await app.start_batch_scrape([
"https://example3.com/page1",
"https://example3.com/page2"
])
# Monitor all jobs concurrently
await asyncio.gather(
monitor_job(watcher, crawl_id1, "crawl", "Crawl 1"),
monitor_job(watcher, crawl_id2, "crawl", "Crawl 2"),
monitor_job(watcher, batch_id, "batch_scrape", "Batch")
)
await watcher.close()
asyncio.run(monitor_multiple_jobs())class JobUpdate:
"""Structure of job progress updates"""
status: str # Current job status
job_id: str # Job identifier
completed: int # Number of completed items
total: int # Total number of items
data: Optional[dict] # Job results (when completed)
errors: Optional[List[str]] # Error messages (when failed)
timestamp: str # Update timestamp
class WatcherError(Exception):
"""Exception raised during job monitoring"""
job_id: str
message: strfrom firecrawl import Firecrawl, Watcher
app = Firecrawl(api_key="your-api-key")
watcher = Watcher(app._v2_client)
try:
crawl_id = app.start_crawl("https://example.com")
# Monitor job
for update in watcher.watch(crawl_id, "crawl"):
print(f"Status: {update.get('status')}")
if update.get('status') in ['completed', 'failed', 'cancelled']:
break
finally:
# Always close the connection
watcher.close()from firecrawl import Firecrawl, Watcher
app = Firecrawl(api_key="your-api-key")
class WatcherContext:
def __init__(self, client):
self.watcher = Watcher(client)
def __enter__(self):
return self.watcher
def __exit__(self, exc_type, exc_val, exc_tb):
self.watcher.close()
# Usage with context manager
with WatcherContext(app._v2_client) as watcher:
crawl_id = app.start_crawl("https://example.com")
for update in watcher.watch(crawl_id, "crawl"):
print(f"Status: {update.get('status')}")
if update.get('status') in ['completed', 'failed', 'cancelled']:
break
# Connection automatically closedInstall with Tessl CLI
npx tessl i tessl/pypi-firecrawl-py