CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-firecrawl-py

Python SDK for Firecrawl API that enables web scraping, crawling, and content extraction with LLM-optimized output formats

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

monitoring.mddocs/

Job Monitoring

Real-time job monitoring using WebSocket connections for tracking long-running operations. Provides both synchronous and asynchronous monitoring interfaces for crawls, batch operations, and extractions.

Capabilities

Synchronous Job Monitoring

Monitor job progress using the synchronous Watcher class with iterator interface for real-time updates.

class Watcher:
    """Synchronous WebSocket job monitoring"""
    
    def __init__(self, client: FirecrawlClient):
        """
        Initialize watcher with Firecrawl client.
        
        Parameters:
        - client: FirecrawlClient instance for API access
        """
    
    def watch(self, job_id: str, job_type: str) -> Iterator[dict]:
        """
        Monitor job progress via WebSocket.
        
        Parameters:
        - job_id: str, job ID to monitor
        - job_type: str, type of job ("crawl", "batch_scrape", "extract")
        
        Returns:
        - Iterator[dict]: iterator yielding progress updates
        """
    
    def close(self) -> None:
        """Close WebSocket connection"""

Asynchronous Job Monitoring

Monitor job progress using the asynchronous AsyncWatcher class with async iterator interface.

class AsyncWatcher:
    """Asynchronous WebSocket job monitoring"""
    
    def __init__(self, client: AsyncFirecrawlClient):
        """
        Initialize async watcher with AsyncFirecrawl client.
        
        Parameters:
        - client: AsyncFirecrawlClient instance for API access
        """
    
    def watch(self, job_id: str, job_type: str) -> AsyncIterator[dict]:
        """
        Monitor job progress via async WebSocket.
        
        Parameters:
        - job_id: str, job ID to monitor
        - job_type: str, type of job ("crawl", "batch_scrape", "extract")
        
        Returns:
        - AsyncIterator[dict]: async iterator yielding progress updates
        """
    
    async def close(self) -> None:
        """Close WebSocket connection"""

Usage Examples

Basic Job Monitoring

from firecrawl import Firecrawl, Watcher, CrawlOptions

app = Firecrawl(api_key="your-api-key")

# Start a crawl job
crawl_id = app.start_crawl("https://example.com", 
                          CrawlOptions(limit=100))

# Monitor with Watcher
watcher = Watcher(app._v2_client)

print(f"Monitoring crawl job: {crawl_id}")
for update in watcher.watch(crawl_id, "crawl"):
    print(f"Progress: {update.get('completed', 0)}/{update.get('total', 0)}")
    print(f"Status: {update.get('status')}")
    
    if update.get('status') in ['completed', 'failed', 'cancelled']:
        break

watcher.close()
print("Monitoring completed")

Advanced Monitoring with Error Handling

from firecrawl import Firecrawl, Watcher
import time

app = Firecrawl(api_key="your-api-key")

# Start multiple jobs
jobs = []
for i in range(3):
    crawl_id = app.start_crawl(f"https://example{i+1}.com")
    jobs.append({"id": crawl_id, "type": "crawl", "url": f"https://example{i+1}.com"})

# Monitor all jobs
watcher = Watcher(app._v2_client)

for job in jobs:
    print(f"Starting monitoring for {job['url']} (ID: {job['id']})")
    
    try:
        for update in watcher.watch(job['id'], job['type']):
            status = update.get('status')
            completed = update.get('completed', 0)
            total = update.get('total', 0)
            
            print(f"Job {job['id']}: {status} - {completed}/{total}")
            
            if status == 'completed':
                print(f"✓ Job {job['id']} completed successfully")
                break
            elif status == 'failed':
                print(f"✗ Job {job['id']} failed")
                errors = update.get('errors', [])
                for error in errors:
                    print(f"  Error: {error}")
                break
            elif status == 'cancelled':
                print(f"⚠ Job {job['id']} was cancelled")
                break
                
    except Exception as e:
        print(f"Error monitoring job {job['id']}: {e}")

watcher.close()

Batch Job Monitoring

from firecrawl import Firecrawl, Watcher

app = Firecrawl(api_key="your-api-key")
watcher = Watcher(app._v2_client)

# Start batch scrape
urls = [f"https://example.com/page{i}" for i in range(1, 51)]
batch_id = app.start_batch_scrape(urls)

print(f"Monitoring batch job: {batch_id}")
start_time = time.time()

for update in watcher.watch(batch_id, "batch_scrape"):
    status = update.get('status')
    completed = update.get('completed', 0)
    total = update.get('total', 0)
    
    # Calculate progress percentage
    progress = (completed / total * 100) if total > 0 else 0
    
    # Calculate ETA
    elapsed = time.time() - start_time
    if completed > 0:
        eta = (elapsed / completed) * (total - completed)
        eta_str = f"{eta:.1f}s"
    else:
        eta_str = "calculating..."
    
    print(f"Batch Progress: {progress:.1f}% ({completed}/{total}) - ETA: {eta_str}")
    
    if status in ['completed', 'failed', 'cancelled']:
        break

watcher.close()

Extraction Job Monitoring

from firecrawl import Firecrawl, Watcher

app = Firecrawl(api_key="your-api-key")
watcher = Watcher(app._v2_client)

# Complex extraction schema
schema = {
    "type": "object",
    "properties": {
        "products": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "price": {"type": "number"},
                    "description": {"type": "string"}
                }
            }
        }
    }
}

# Start extraction job
extract_id = app.start_extract("https://store.example.com", schema)

print(f"Monitoring extraction job: {extract_id}")
for update in watcher.watch(extract_id, "extract"):
    status = update.get('status')
    print(f"Extraction Status: {status}")
    
    if status == 'completed':
        data = update.get('data', {})
        products = data.get('products', [])
        print(f"✓ Extracted {len(products)} products")
        break
    elif status in ['failed', 'cancelled']:
        print(f"✗ Extraction {status}")
        break

watcher.close()

Async Usage

Basic Async Monitoring

import asyncio
from firecrawl import AsyncFirecrawl, AsyncWatcher

async def monitor_async():
    app = AsyncFirecrawl(api_key="your-api-key")
    
    # Start crawl job
    crawl_id = await app.start_crawl("https://example.com")
    
    # Monitor with AsyncWatcher
    async_watcher = AsyncWatcher(app._v2_client)
    
    print(f"Monitoring crawl job: {crawl_id}")
    async for update in async_watcher.watch(crawl_id, "crawl"):
        print(f"Progress: {update.get('completed', 0)}/{update.get('total', 0)}")
        print(f"Status: {update.get('status')}")
        
        if update.get('status') in ['completed', 'failed', 'cancelled']:
            break
    
    await async_watcher.close()
    print("Monitoring completed")

asyncio.run(monitor_async())

Concurrent Job Monitoring

import asyncio
from firecrawl import AsyncFirecrawl, AsyncWatcher

async def monitor_job(watcher, job_id, job_type, name):
    """Monitor a single job asynchronously"""
    print(f"Starting monitoring for {name}")
    
    async for update in watcher.watch(job_id, job_type):
        status = update.get('status')
        completed = update.get('completed', 0)
        total = update.get('total', 0)
        
        print(f"{name}: {status} - {completed}/{total}")
        
        if status in ['completed', 'failed', 'cancelled']:
            break
    
    print(f"{name} monitoring completed")

async def monitor_multiple_jobs():
    app = AsyncFirecrawl(api_key="your-api-key")
    watcher = AsyncWatcher(app._v2_client)
    
    # Start multiple jobs
    crawl_id1 = await app.start_crawl("https://example1.com")
    crawl_id2 = await app.start_crawl("https://example2.com")
    batch_id = await app.start_batch_scrape([
        "https://example3.com/page1",
        "https://example3.com/page2"
    ])
    
    # Monitor all jobs concurrently
    await asyncio.gather(
        monitor_job(watcher, crawl_id1, "crawl", "Crawl 1"),
        monitor_job(watcher, crawl_id2, "crawl", "Crawl 2"),
        monitor_job(watcher, batch_id, "batch_scrape", "Batch")
    )
    
    await watcher.close()

asyncio.run(monitor_multiple_jobs())

Types

class JobUpdate:
    """Structure of job progress updates"""
    status: str  # Current job status
    job_id: str  # Job identifier
    completed: int  # Number of completed items
    total: int  # Total number of items
    data: Optional[dict]  # Job results (when completed)
    errors: Optional[List[str]]  # Error messages (when failed)
    timestamp: str  # Update timestamp

class WatcherError(Exception):
    """Exception raised during job monitoring"""
    job_id: str
    message: str

Connection Management

Manual Connection Control

from firecrawl import Firecrawl, Watcher

app = Firecrawl(api_key="your-api-key")
watcher = Watcher(app._v2_client)

try:
    crawl_id = app.start_crawl("https://example.com")
    
    # Monitor job
    for update in watcher.watch(crawl_id, "crawl"):
        print(f"Status: {update.get('status')}")
        if update.get('status') in ['completed', 'failed', 'cancelled']:
            break
            
finally:
    # Always close the connection
    watcher.close()

Context Manager Usage

from firecrawl import Firecrawl, Watcher

app = Firecrawl(api_key="your-api-key")

class WatcherContext:
    def __init__(self, client):
        self.watcher = Watcher(client)
    
    def __enter__(self):
        return self.watcher
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.watcher.close()

# Usage with context manager
with WatcherContext(app._v2_client) as watcher:
    crawl_id = app.start_crawl("https://example.com")
    
    for update in watcher.watch(crawl_id, "crawl"):
        print(f"Status: {update.get('status')}")
        if update.get('status') in ['completed', 'failed', 'cancelled']:
            break
# Connection automatically closed

Install with Tessl CLI

npx tessl i tessl/pypi-firecrawl-py

docs

batch.md

crawling.md

extraction.md

index.md

monitoring.md

scraping.md

usage.md

v1-api.md

tile.json