Python SDK for Firecrawl API that enables web scraping, crawling, and content extraction with LLM-optimized output formats
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Account usage monitoring including credit usage, token consumption, concurrency limits, and job queue status tracking. These functions help you monitor and optimize your Firecrawl API usage.
Monitor your account's credit consumption and remaining balance for cost management and usage optimization.
def get_credit_usage() -> CreditUsage:
"""
Get current credit usage statistics.
Returns:
- CreditUsage: detailed credit usage information including consumed and remaining credits
"""Track token consumption for AI-powered operations like extraction and content processing.
def get_token_usage() -> TokenUsage:
"""
Get current token usage statistics.
Returns:
- TokenUsage: detailed token usage information for AI operations
"""Monitor current concurrency limits and active job counts to optimize job scheduling and resource utilization.
def get_concurrency() -> ConcurrencyInfo:
"""
Get current concurrency limits and usage.
Returns:
- ConcurrencyInfo: concurrency limits and current active job counts
"""Check the status of job queues to understand processing delays and system load.
def get_queue_status() -> QueueStatus:
"""
Get current job queue status.
Returns:
- QueueStatus: information about job queues and processing delays
"""from firecrawl import Firecrawl
app = Firecrawl(api_key="your-api-key")
# Check credit usage
credit_usage = app.get_credit_usage()
print(f"Credits used: {credit_usage.used}")
print(f"Credits remaining: {credit_usage.remaining}")
print(f"Credits total: {credit_usage.total}")
# Check token usage
token_usage = app.get_token_usage()
print(f"Tokens used this month: {token_usage.used_this_month}")
print(f"Token limit: {token_usage.limit}")
# Check concurrency
concurrency = app.get_concurrency()
print(f"Active crawls: {concurrency.active_crawls}/{concurrency.max_crawls}")
print(f"Active scrapes: {concurrency.active_scrapes}/{concurrency.max_scrapes}")
# Check queue status
queue_status = app.get_queue_status()
print(f"Queue length: {queue_status.queue_length}")
print(f"Estimated wait time: {queue_status.estimated_wait_time}s")from firecrawl import Firecrawl
import time
from datetime import datetime
def print_usage_dashboard(app):
"""Print a comprehensive usage dashboard"""
print("=" * 60)
print(f"FIRECRAWL USAGE DASHBOARD - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
# Credit usage
try:
credits = app.get_credit_usage()
print(f"💰 CREDITS:")
print(f" Used: {credits.used:,}")
print(f" Remaining: {credits.remaining:,}")
print(f" Total: {credits.total:,}")
usage_percent = (credits.used / credits.total * 100) if credits.total > 0 else 0
print(f" Usage: {usage_percent:.1f}%")
except Exception as e:
print(f"💰 CREDITS: Error - {e}")
print()
# Token usage
try:
tokens = app.get_token_usage()
print(f"🎯 TOKENS:")
print(f" Used this month: {tokens.used_this_month:,}")
print(f" Monthly limit: {tokens.limit:,}")
if tokens.limit > 0:
token_percent = (tokens.used_this_month / tokens.limit * 100)
print(f" Usage: {token_percent:.1f}%")
except Exception as e:
print(f"🎯 TOKENS: Error - {e}")
print()
# Concurrency
try:
concurrency = app.get_concurrency()
print(f"⚡ CONCURRENCY:")
print(f" Active crawls: {concurrency.active_crawls}/{concurrency.max_crawls}")
print(f" Active scrapes: {concurrency.active_scrapes}/{concurrency.max_scrapes}")
print(f" Active extracts: {concurrency.active_extracts}/{concurrency.max_extracts}")
print(f" Active batch: {concurrency.active_batch}/{concurrency.max_batch}")
except Exception as e:
print(f"⚡ CONCURRENCY: Error - {e}")
print()
# Queue status
try:
queue = app.get_queue_status()
print(f"🚦 QUEUE STATUS:")
print(f" Queue length: {queue.queue_length}")
print(f" Estimated wait: {queue.estimated_wait_time}s")
print(f" Processing rate: {queue.processing_rate} jobs/min")
except Exception as e:
print(f"🚦 QUEUE STATUS: Error - {e}")
print("=" * 60)
# Usage monitoring loop
app = Firecrawl(api_key="your-api-key")
while True:
print_usage_dashboard(app)
time.sleep(300) # Update every 5 minutesfrom firecrawl import Firecrawl, CrawlOptions
def check_resources_before_job(app, estimated_pages=100):
"""Check if sufficient resources are available before starting a large job"""
credits = app.get_credit_usage()
concurrency = app.get_concurrency()
# Estimate credit cost (rough estimate)
estimated_cost = estimated_pages * 1 # Assume 1 credit per page
print(f"Pre-job resource check:")
print(f"Estimated pages: {estimated_pages}")
print(f"Estimated cost: {estimated_cost} credits")
print(f"Available credits: {credits.remaining}")
# Check credit availability
if credits.remaining < estimated_cost:
print("❌ Insufficient credits for this job")
return False
# Check concurrency availability
if concurrency.active_crawls >= concurrency.max_crawls:
print("❌ No crawl slots available")
return False
print("✅ Resources available, proceeding with job")
return True
app = Firecrawl(api_key="your-api-key")
# Check resources before large crawl
if check_resources_before_job(app, estimated_pages=500):
crawl_options = CrawlOptions(limit=500)
crawl_id = app.start_crawl("https://example.com", crawl_options)
print(f"Started crawl: {crawl_id}")
else:
print("Job not started due to insufficient resources")from firecrawl import Firecrawl
import time
def optimize_job_scheduling(app, urls_to_process):
"""Schedule jobs based on current resource availability"""
concurrency = app.get_concurrency()
queue = app.get_queue_status()
# Calculate optimal batch size based on concurrency
available_slots = concurrency.max_scrapes - concurrency.active_scrapes
# Adjust batch size based on queue length
if queue.queue_length > 100:
batch_size = min(available_slots // 2, 10) # Conservative approach
elif queue.queue_length > 50:
batch_size = min(available_slots, 20) # Moderate approach
else:
batch_size = min(available_slots, 50) # Aggressive approach
print(f"Optimized batch size: {batch_size}")
print(f"Queue length: {queue.queue_length}")
print(f"Available slots: {available_slots}")
# Process URLs in optimized batches
for i in range(0, len(urls_to_process), batch_size):
batch = urls_to_process[i:i+batch_size]
# Check resources before each batch
concurrency = app.get_concurrency()
if concurrency.active_scrapes >= concurrency.max_scrapes:
print("Waiting for slots to become available...")
time.sleep(30)
continue
# Start batch
batch_id = app.start_batch_scrape(batch)
print(f"Started batch {i//batch_size + 1}: {batch_id}")
# Brief pause between batches
time.sleep(5)
app = Firecrawl(api_key="your-api-key")
urls = [f"https://example.com/page{i}" for i in range(1, 201)]
optimize_job_scheduling(app, urls)class CreditUsage:
"""Credit usage information"""
used: int # Credits consumed
remaining: int # Credits remaining
total: int # Total credits in plan
reset_date: str # When credits reset (for subscription plans)
class TokenUsage:
"""Token usage information"""
used_this_month: int # Tokens used in current month
limit: int # Monthly token limit
used_today: int # Tokens used today
reset_date: str # When usage resets
class ConcurrencyInfo:
"""Concurrency limits and current usage"""
max_crawls: int # Maximum concurrent crawls
active_crawls: int # Currently active crawls
max_scrapes: int # Maximum concurrent scrapes
active_scrapes: int # Currently active scrapes
max_extracts: int # Maximum concurrent extractions
active_extracts: int # Currently active extractions
max_batch: int # Maximum concurrent batch operations
active_batch: int # Currently active batch operations
class QueueStatus:
"""Job queue status information"""
queue_length: int # Number of jobs in queue
estimated_wait_time: int # Estimated wait time in seconds
processing_rate: float # Jobs processed per minute
priority_queue_length: int # Priority jobs in queuefrom firecrawl import Firecrawl
import time
def handle_rate_limits(app, operation_func, *args, **kwargs):
"""Execute operation with rate limit handling"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
return operation_func(*args, **kwargs)
except Exception as e:
if "rate limit" in str(e).lower():
if attempt < max_retries - 1:
print(f"Rate limited, waiting {retry_delay}s before retry {attempt + 1}")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
raise e
else:
raise e
app = Firecrawl(api_key="your-api-key")
# Usage with rate limit handling
result = handle_rate_limits(app, app.scrape, "https://example.com")from firecrawl import Firecrawl
class UsageMonitor:
def __init__(self, app, credit_threshold=0.8, token_threshold=0.9):
self.app = app
self.credit_threshold = credit_threshold
self.token_threshold = token_threshold
def check_usage_alerts(self):
"""Check for usage threshold alerts"""
alerts = []
# Check credit usage
credits = self.app.get_credit_usage()
if credits.total > 0:
credit_usage_ratio = credits.used / credits.total
if credit_usage_ratio >= self.credit_threshold:
alerts.append(f"⚠️ Credit usage at {credit_usage_ratio:.1%}")
# Check token usage
tokens = self.app.get_token_usage()
if tokens.limit > 0:
token_usage_ratio = tokens.used_this_month / tokens.limit
if token_usage_ratio >= self.token_threshold:
alerts.append(f"⚠️ Token usage at {token_usage_ratio:.1%}")
# Check concurrency
concurrency = self.app.get_concurrency()
if concurrency.active_crawls >= concurrency.max_crawls * 0.9:
alerts.append("⚠️ Crawl concurrency near limit")
return alerts
app = Firecrawl(api_key="your-api-key")
monitor = UsageMonitor(app)
# Check for alerts before major operations
alerts = monitor.check_usage_alerts()
if alerts:
print("Usage alerts:")
for alert in alerts:
print(f" {alert}")All usage monitoring functions have async equivalents:
import asyncio
from firecrawl import AsyncFirecrawl
async def monitor_usage_async():
app = AsyncFirecrawl(api_key="your-api-key")
# Async usage monitoring
credits = await app.get_credit_usage()
tokens = await app.get_token_usage()
concurrency = await app.get_concurrency()
queue_status = await app.get_queue_status()
print(f"Credits remaining: {credits.remaining}")
print(f"Active jobs: {concurrency.active_crawls + concurrency.active_scrapes}")
asyncio.run(monitor_usage_async())Install with Tessl CLI
npx tessl i tessl/pypi-firecrawl-py