CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-firecrawl-py

Python SDK for Firecrawl API that enables web scraping, crawling, and content extraction with LLM-optimized output formats

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

usage.mddocs/

Usage & Statistics

Account usage monitoring including credit usage, token consumption, concurrency limits, and job queue status tracking. These functions help you monitor and optimize your Firecrawl API usage.

Capabilities

Credit Usage Tracking

Monitor your account's credit consumption and remaining balance for cost management and usage optimization.

def get_credit_usage() -> CreditUsage:
    """
    Get current credit usage statistics.
    
    Returns:
    - CreditUsage: detailed credit usage information including consumed and remaining credits
    """

Token Usage Monitoring

Track token consumption for AI-powered operations like extraction and content processing.

def get_token_usage() -> TokenUsage:
    """
    Get current token usage statistics.
    
    Returns:
    - TokenUsage: detailed token usage information for AI operations
    """

Concurrency Management

Monitor current concurrency limits and active job counts to optimize job scheduling and resource utilization.

def get_concurrency() -> ConcurrencyInfo:
    """
    Get current concurrency limits and usage.
    
    Returns:
    - ConcurrencyInfo: concurrency limits and current active job counts
    """

Queue Status Monitoring

Check the status of job queues to understand processing delays and system load.

def get_queue_status() -> QueueStatus:
    """
    Get current job queue status.
    
    Returns:
    - QueueStatus: information about job queues and processing delays
    """

Usage Examples

Basic Usage Monitoring

from firecrawl import Firecrawl

app = Firecrawl(api_key="your-api-key")

# Check credit usage
credit_usage = app.get_credit_usage()
print(f"Credits used: {credit_usage.used}")
print(f"Credits remaining: {credit_usage.remaining}")
print(f"Credits total: {credit_usage.total}")

# Check token usage
token_usage = app.get_token_usage()
print(f"Tokens used this month: {token_usage.used_this_month}")
print(f"Token limit: {token_usage.limit}")

# Check concurrency
concurrency = app.get_concurrency()
print(f"Active crawls: {concurrency.active_crawls}/{concurrency.max_crawls}")
print(f"Active scrapes: {concurrency.active_scrapes}/{concurrency.max_scrapes}")

# Check queue status
queue_status = app.get_queue_status()
print(f"Queue length: {queue_status.queue_length}")
print(f"Estimated wait time: {queue_status.estimated_wait_time}s")

Usage Monitoring Dashboard

from firecrawl import Firecrawl
import time
from datetime import datetime

def print_usage_dashboard(app):
    """Print a comprehensive usage dashboard"""
    print("=" * 60)
    print(f"FIRECRAWL USAGE DASHBOARD - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 60)
    
    # Credit usage
    try:
        credits = app.get_credit_usage()
        print(f"💰 CREDITS:")
        print(f"   Used: {credits.used:,}")
        print(f"   Remaining: {credits.remaining:,}")
        print(f"   Total: {credits.total:,}")
        usage_percent = (credits.used / credits.total * 100) if credits.total > 0 else 0
        print(f"   Usage: {usage_percent:.1f}%")
    except Exception as e:
        print(f"💰 CREDITS: Error - {e}")
    
    print()
    
    # Token usage
    try:
        tokens = app.get_token_usage()
        print(f"🎯 TOKENS:")
        print(f"   Used this month: {tokens.used_this_month:,}")
        print(f"   Monthly limit: {tokens.limit:,}")
        if tokens.limit > 0:
            token_percent = (tokens.used_this_month / tokens.limit * 100)
            print(f"   Usage: {token_percent:.1f}%")
    except Exception as e:
        print(f"🎯 TOKENS: Error - {e}")
    
    print()
    
    # Concurrency
    try:
        concurrency = app.get_concurrency()
        print(f"⚡ CONCURRENCY:")
        print(f"   Active crawls: {concurrency.active_crawls}/{concurrency.max_crawls}")
        print(f"   Active scrapes: {concurrency.active_scrapes}/{concurrency.max_scrapes}")
        print(f"   Active extracts: {concurrency.active_extracts}/{concurrency.max_extracts}")
        print(f"   Active batch: {concurrency.active_batch}/{concurrency.max_batch}")
    except Exception as e:
        print(f"⚡ CONCURRENCY: Error - {e}")
    
    print()
    
    # Queue status
    try:
        queue = app.get_queue_status()
        print(f"🚦 QUEUE STATUS:")
        print(f"   Queue length: {queue.queue_length}")
        print(f"   Estimated wait: {queue.estimated_wait_time}s")
        print(f"   Processing rate: {queue.processing_rate} jobs/min")
    except Exception as e:
        print(f"🚦 QUEUE STATUS: Error - {e}")
    
    print("=" * 60)

# Usage monitoring loop
app = Firecrawl(api_key="your-api-key")

while True:
    print_usage_dashboard(app)
    time.sleep(300)  # Update every 5 minutes

Pre-Job Usage Check

from firecrawl import Firecrawl, CrawlOptions

def check_resources_before_job(app, estimated_pages=100):
    """Check if sufficient resources are available before starting a large job"""
    
    credits = app.get_credit_usage()
    concurrency = app.get_concurrency()
    
    # Estimate credit cost (rough estimate)
    estimated_cost = estimated_pages * 1  # Assume 1 credit per page
    
    print(f"Pre-job resource check:")
    print(f"Estimated pages: {estimated_pages}")
    print(f"Estimated cost: {estimated_cost} credits")
    print(f"Available credits: {credits.remaining}")
    
    # Check credit availability
    if credits.remaining < estimated_cost:
        print("❌ Insufficient credits for this job")
        return False
    
    # Check concurrency availability
    if concurrency.active_crawls >= concurrency.max_crawls:
        print("❌ No crawl slots available")
        return False
    
    print("✅ Resources available, proceeding with job")
    return True

app = Firecrawl(api_key="your-api-key")

# Check resources before large crawl
if check_resources_before_job(app, estimated_pages=500):
    crawl_options = CrawlOptions(limit=500)
    crawl_id = app.start_crawl("https://example.com", crawl_options)
    print(f"Started crawl: {crawl_id}")
else:
    print("Job not started due to insufficient resources")

Usage Optimization

from firecrawl import Firecrawl
import time

def optimize_job_scheduling(app, urls_to_process):
    """Schedule jobs based on current resource availability"""
    
    concurrency = app.get_concurrency()
    queue = app.get_queue_status()
    
    # Calculate optimal batch size based on concurrency
    available_slots = concurrency.max_scrapes - concurrency.active_scrapes
    
    # Adjust batch size based on queue length
    if queue.queue_length > 100:
        batch_size = min(available_slots // 2, 10)  # Conservative approach
    elif queue.queue_length > 50:
        batch_size = min(available_slots, 20)  # Moderate approach
    else:
        batch_size = min(available_slots, 50)  # Aggressive approach
    
    print(f"Optimized batch size: {batch_size}")
    print(f"Queue length: {queue.queue_length}")
    print(f"Available slots: {available_slots}")
    
    # Process URLs in optimized batches
    for i in range(0, len(urls_to_process), batch_size):
        batch = urls_to_process[i:i+batch_size]
        
        # Check resources before each batch
        concurrency = app.get_concurrency()
        if concurrency.active_scrapes >= concurrency.max_scrapes:
            print("Waiting for slots to become available...")
            time.sleep(30)
            continue
        
        # Start batch
        batch_id = app.start_batch_scrape(batch)
        print(f"Started batch {i//batch_size + 1}: {batch_id}")
        
        # Brief pause between batches
        time.sleep(5)

app = Firecrawl(api_key="your-api-key")
urls = [f"https://example.com/page{i}" for i in range(1, 201)]
optimize_job_scheduling(app, urls)

Types

class CreditUsage:
    """Credit usage information"""
    used: int  # Credits consumed
    remaining: int  # Credits remaining
    total: int  # Total credits in plan
    reset_date: str  # When credits reset (for subscription plans)
    
class TokenUsage:
    """Token usage information"""
    used_this_month: int  # Tokens used in current month
    limit: int  # Monthly token limit
    used_today: int  # Tokens used today
    reset_date: str  # When usage resets
    
class ConcurrencyInfo:
    """Concurrency limits and current usage"""
    max_crawls: int  # Maximum concurrent crawls
    active_crawls: int  # Currently active crawls
    max_scrapes: int  # Maximum concurrent scrapes
    active_scrapes: int  # Currently active scrapes
    max_extracts: int  # Maximum concurrent extractions
    active_extracts: int  # Currently active extractions
    max_batch: int  # Maximum concurrent batch operations
    active_batch: int  # Currently active batch operations
    
class QueueStatus:
    """Job queue status information"""
    queue_length: int  # Number of jobs in queue
    estimated_wait_time: int  # Estimated wait time in seconds
    processing_rate: float  # Jobs processed per minute
    priority_queue_length: int  # Priority jobs in queue

Rate Limiting and Best Practices

Handling Rate Limits

from firecrawl import Firecrawl
import time

def handle_rate_limits(app, operation_func, *args, **kwargs):
    """Execute operation with rate limit handling"""
    max_retries = 3
    retry_delay = 1
    
    for attempt in range(max_retries):
        try:
            return operation_func(*args, **kwargs)
        except Exception as e:
            if "rate limit" in str(e).lower():
                if attempt < max_retries - 1:
                    print(f"Rate limited, waiting {retry_delay}s before retry {attempt + 1}")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff
                else:
                    raise e
            else:
                raise e

app = Firecrawl(api_key="your-api-key")

# Usage with rate limit handling
result = handle_rate_limits(app, app.scrape, "https://example.com")

Usage Alerts

from firecrawl import Firecrawl

class UsageMonitor:
    def __init__(self, app, credit_threshold=0.8, token_threshold=0.9):
        self.app = app
        self.credit_threshold = credit_threshold
        self.token_threshold = token_threshold
    
    def check_usage_alerts(self):
        """Check for usage threshold alerts"""
        alerts = []
        
        # Check credit usage
        credits = self.app.get_credit_usage()
        if credits.total > 0:
            credit_usage_ratio = credits.used / credits.total
            if credit_usage_ratio >= self.credit_threshold:
                alerts.append(f"⚠️ Credit usage at {credit_usage_ratio:.1%}")
        
        # Check token usage
        tokens = self.app.get_token_usage()
        if tokens.limit > 0:
            token_usage_ratio = tokens.used_this_month / tokens.limit
            if token_usage_ratio >= self.token_threshold:
                alerts.append(f"⚠️ Token usage at {token_usage_ratio:.1%}")
        
        # Check concurrency
        concurrency = self.app.get_concurrency()
        if concurrency.active_crawls >= concurrency.max_crawls * 0.9:
            alerts.append("⚠️ Crawl concurrency near limit")
        
        return alerts

app = Firecrawl(api_key="your-api-key")
monitor = UsageMonitor(app)

# Check for alerts before major operations
alerts = monitor.check_usage_alerts()
if alerts:
    print("Usage alerts:")
    for alert in alerts:
        print(f"  {alert}")

Async Usage

All usage monitoring functions have async equivalents:

import asyncio
from firecrawl import AsyncFirecrawl

async def monitor_usage_async():
    app = AsyncFirecrawl(api_key="your-api-key")
    
    # Async usage monitoring
    credits = await app.get_credit_usage()
    tokens = await app.get_token_usage()
    concurrency = await app.get_concurrency()
    queue_status = await app.get_queue_status()
    
    print(f"Credits remaining: {credits.remaining}")
    print(f"Active jobs: {concurrency.active_crawls + concurrency.active_scrapes}")

asyncio.run(monitor_usage_async())

Install with Tessl CLI

npx tessl i tessl/pypi-firecrawl-py

docs

batch.md

crawling.md

extraction.md

index.md

monitoring.md

scraping.md

usage.md

v1-api.md

tile.json