0
# Usage & Statistics
1
2
Account usage monitoring including credit usage, token consumption, concurrency limits, and job queue status tracking. These functions help you monitor and optimize your Firecrawl API usage.
3
4
## Capabilities
5
6
### Credit Usage Tracking
7
8
Monitor your account's credit consumption and remaining balance for cost management and usage optimization.
9
10
```python { .api }
11
def get_credit_usage() -> CreditUsage:
12
"""
13
Get current credit usage statistics.
14
15
Returns:
16
- CreditUsage: detailed credit usage information including consumed and remaining credits
17
"""
18
```
19
20
### Token Usage Monitoring
21
22
Track token consumption for AI-powered operations like extraction and content processing.
23
24
```python { .api }
25
def get_token_usage() -> TokenUsage:
26
"""
27
Get current token usage statistics.
28
29
Returns:
30
- TokenUsage: detailed token usage information for AI operations
31
"""
32
```
33
34
### Concurrency Management
35
36
Monitor current concurrency limits and active job counts to optimize job scheduling and resource utilization.
37
38
```python { .api }
39
def get_concurrency() -> ConcurrencyInfo:
40
"""
41
Get current concurrency limits and usage.
42
43
Returns:
44
- ConcurrencyInfo: concurrency limits and current active job counts
45
"""
46
```
47
48
### Queue Status Monitoring
49
50
Check the status of job queues to understand processing delays and system load.
51
52
```python { .api }
53
def get_queue_status() -> QueueStatus:
54
"""
55
Get current job queue status.
56
57
Returns:
58
- QueueStatus: information about job queues and processing delays
59
"""
60
```
61
62
## Usage Examples
63
64
### Basic Usage Monitoring
65
66
```python
67
from firecrawl import Firecrawl
68
69
app = Firecrawl(api_key="your-api-key")
70
71
# Check credit usage
72
credit_usage = app.get_credit_usage()
73
print(f"Credits used: {credit_usage.used}")
74
print(f"Credits remaining: {credit_usage.remaining}")
75
print(f"Credits total: {credit_usage.total}")
76
77
# Check token usage
78
token_usage = app.get_token_usage()
79
print(f"Tokens used this month: {token_usage.used_this_month}")
80
print(f"Token limit: {token_usage.limit}")
81
82
# Check concurrency
83
concurrency = app.get_concurrency()
84
print(f"Active crawls: {concurrency.active_crawls}/{concurrency.max_crawls}")
85
print(f"Active scrapes: {concurrency.active_scrapes}/{concurrency.max_scrapes}")
86
87
# Check queue status
88
queue_status = app.get_queue_status()
89
print(f"Queue length: {queue_status.queue_length}")
90
print(f"Estimated wait time: {queue_status.estimated_wait_time}s")
91
```
92
93
### Usage Monitoring Dashboard
94
95
```python
96
from firecrawl import Firecrawl
97
import time
98
from datetime import datetime
99
100
def print_usage_dashboard(app):
101
"""Print a comprehensive usage dashboard"""
102
print("=" * 60)
103
print(f"FIRECRAWL USAGE DASHBOARD - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
104
print("=" * 60)
105
106
# Credit usage
107
try:
108
credits = app.get_credit_usage()
109
print(f"π° CREDITS:")
110
print(f" Used: {credits.used:,}")
111
print(f" Remaining: {credits.remaining:,}")
112
print(f" Total: {credits.total:,}")
113
usage_percent = (credits.used / credits.total * 100) if credits.total > 0 else 0
114
print(f" Usage: {usage_percent:.1f}%")
115
except Exception as e:
116
print(f"π° CREDITS: Error - {e}")
117
118
print()
119
120
# Token usage
121
try:
122
tokens = app.get_token_usage()
123
print(f"π― TOKENS:")
124
print(f" Used this month: {tokens.used_this_month:,}")
125
print(f" Monthly limit: {tokens.limit:,}")
126
if tokens.limit > 0:
127
token_percent = (tokens.used_this_month / tokens.limit * 100)
128
print(f" Usage: {token_percent:.1f}%")
129
except Exception as e:
130
print(f"π― TOKENS: Error - {e}")
131
132
print()
133
134
# Concurrency
135
try:
136
concurrency = app.get_concurrency()
137
print(f"β‘ CONCURRENCY:")
138
print(f" Active crawls: {concurrency.active_crawls}/{concurrency.max_crawls}")
139
print(f" Active scrapes: {concurrency.active_scrapes}/{concurrency.max_scrapes}")
140
print(f" Active extracts: {concurrency.active_extracts}/{concurrency.max_extracts}")
141
print(f" Active batch: {concurrency.active_batch}/{concurrency.max_batch}")
142
except Exception as e:
143
print(f"β‘ CONCURRENCY: Error - {e}")
144
145
print()
146
147
# Queue status
148
try:
149
queue = app.get_queue_status()
150
print(f"π¦ QUEUE STATUS:")
151
print(f" Queue length: {queue.queue_length}")
152
print(f" Estimated wait: {queue.estimated_wait_time}s")
153
print(f" Processing rate: {queue.processing_rate} jobs/min")
154
except Exception as e:
155
print(f"π¦ QUEUE STATUS: Error - {e}")
156
157
print("=" * 60)
158
159
# Usage monitoring loop
160
app = Firecrawl(api_key="your-api-key")
161
162
while True:
163
print_usage_dashboard(app)
164
time.sleep(300) # Update every 5 minutes
165
```
166
167
### Pre-Job Usage Check
168
169
```python
170
from firecrawl import Firecrawl, CrawlOptions
171
172
def check_resources_before_job(app, estimated_pages=100):
173
"""Check if sufficient resources are available before starting a large job"""
174
175
credits = app.get_credit_usage()
176
concurrency = app.get_concurrency()
177
178
# Estimate credit cost (rough estimate)
179
estimated_cost = estimated_pages * 1 # Assume 1 credit per page
180
181
print(f"Pre-job resource check:")
182
print(f"Estimated pages: {estimated_pages}")
183
print(f"Estimated cost: {estimated_cost} credits")
184
print(f"Available credits: {credits.remaining}")
185
186
# Check credit availability
187
if credits.remaining < estimated_cost:
188
print("β Insufficient credits for this job")
189
return False
190
191
# Check concurrency availability
192
if concurrency.active_crawls >= concurrency.max_crawls:
193
print("β No crawl slots available")
194
return False
195
196
print("β Resources available, proceeding with job")
197
return True
198
199
app = Firecrawl(api_key="your-api-key")
200
201
# Check resources before large crawl
202
if check_resources_before_job(app, estimated_pages=500):
203
crawl_options = CrawlOptions(limit=500)
204
crawl_id = app.start_crawl("https://example.com", crawl_options)
205
print(f"Started crawl: {crawl_id}")
206
else:
207
print("Job not started due to insufficient resources")
208
```
209
210
### Usage Optimization
211
212
```python
213
from firecrawl import Firecrawl
214
import time
215
216
def optimize_job_scheduling(app, urls_to_process):
217
"""Schedule jobs based on current resource availability"""
218
219
concurrency = app.get_concurrency()
220
queue = app.get_queue_status()
221
222
# Calculate optimal batch size based on concurrency
223
available_slots = concurrency.max_scrapes - concurrency.active_scrapes
224
225
# Adjust batch size based on queue length
226
if queue.queue_length > 100:
227
batch_size = min(available_slots // 2, 10) # Conservative approach
228
elif queue.queue_length > 50:
229
batch_size = min(available_slots, 20) # Moderate approach
230
else:
231
batch_size = min(available_slots, 50) # Aggressive approach
232
233
print(f"Optimized batch size: {batch_size}")
234
print(f"Queue length: {queue.queue_length}")
235
print(f"Available slots: {available_slots}")
236
237
# Process URLs in optimized batches
238
for i in range(0, len(urls_to_process), batch_size):
239
batch = urls_to_process[i:i+batch_size]
240
241
# Check resources before each batch
242
concurrency = app.get_concurrency()
243
if concurrency.active_scrapes >= concurrency.max_scrapes:
244
print("Waiting for slots to become available...")
245
time.sleep(30)
246
continue
247
248
# Start batch
249
batch_id = app.start_batch_scrape(batch)
250
print(f"Started batch {i//batch_size + 1}: {batch_id}")
251
252
# Brief pause between batches
253
time.sleep(5)
254
255
app = Firecrawl(api_key="your-api-key")
256
urls = [f"https://example.com/page{i}" for i in range(1, 201)]
257
optimize_job_scheduling(app, urls)
258
```
259
260
## Types
261
262
```python { .api }
263
class CreditUsage:
264
"""Credit usage information"""
265
used: int # Credits consumed
266
remaining: int # Credits remaining
267
total: int # Total credits in plan
268
reset_date: str # When credits reset (for subscription plans)
269
270
class TokenUsage:
271
"""Token usage information"""
272
used_this_month: int # Tokens used in current month
273
limit: int # Monthly token limit
274
used_today: int # Tokens used today
275
reset_date: str # When usage resets
276
277
class ConcurrencyInfo:
278
"""Concurrency limits and current usage"""
279
max_crawls: int # Maximum concurrent crawls
280
active_crawls: int # Currently active crawls
281
max_scrapes: int # Maximum concurrent scrapes
282
active_scrapes: int # Currently active scrapes
283
max_extracts: int # Maximum concurrent extractions
284
active_extracts: int # Currently active extractions
285
max_batch: int # Maximum concurrent batch operations
286
active_batch: int # Currently active batch operations
287
288
class QueueStatus:
289
"""Job queue status information"""
290
queue_length: int # Number of jobs in queue
291
estimated_wait_time: int # Estimated wait time in seconds
292
processing_rate: float # Jobs processed per minute
293
priority_queue_length: int # Priority jobs in queue
294
```
295
296
## Rate Limiting and Best Practices
297
298
### Handling Rate Limits
299
300
```python
301
from firecrawl import Firecrawl
302
import time
303
304
def handle_rate_limits(app, operation_func, *args, **kwargs):
305
"""Execute operation with rate limit handling"""
306
max_retries = 3
307
retry_delay = 1
308
309
for attempt in range(max_retries):
310
try:
311
return operation_func(*args, **kwargs)
312
except Exception as e:
313
if "rate limit" in str(e).lower():
314
if attempt < max_retries - 1:
315
print(f"Rate limited, waiting {retry_delay}s before retry {attempt + 1}")
316
time.sleep(retry_delay)
317
retry_delay *= 2 # Exponential backoff
318
else:
319
raise e
320
else:
321
raise e
322
323
app = Firecrawl(api_key="your-api-key")
324
325
# Usage with rate limit handling
326
result = handle_rate_limits(app, app.scrape, "https://example.com")
327
```
328
329
### Usage Alerts
330
331
```python
332
from firecrawl import Firecrawl
333
334
class UsageMonitor:
335
def __init__(self, app, credit_threshold=0.8, token_threshold=0.9):
336
self.app = app
337
self.credit_threshold = credit_threshold
338
self.token_threshold = token_threshold
339
340
def check_usage_alerts(self):
341
"""Check for usage threshold alerts"""
342
alerts = []
343
344
# Check credit usage
345
credits = self.app.get_credit_usage()
346
if credits.total > 0:
347
credit_usage_ratio = credits.used / credits.total
348
if credit_usage_ratio >= self.credit_threshold:
349
alerts.append(f"β οΈ Credit usage at {credit_usage_ratio:.1%}")
350
351
# Check token usage
352
tokens = self.app.get_token_usage()
353
if tokens.limit > 0:
354
token_usage_ratio = tokens.used_this_month / tokens.limit
355
if token_usage_ratio >= self.token_threshold:
356
alerts.append(f"β οΈ Token usage at {token_usage_ratio:.1%}")
357
358
# Check concurrency
359
concurrency = self.app.get_concurrency()
360
if concurrency.active_crawls >= concurrency.max_crawls * 0.9:
361
alerts.append("β οΈ Crawl concurrency near limit")
362
363
return alerts
364
365
app = Firecrawl(api_key="your-api-key")
366
monitor = UsageMonitor(app)
367
368
# Check for alerts before major operations
369
alerts = monitor.check_usage_alerts()
370
if alerts:
371
print("Usage alerts:")
372
for alert in alerts:
373
print(f" {alert}")
374
```
375
376
## Async Usage
377
378
All usage monitoring functions have async equivalents:
379
380
```python
381
import asyncio
382
from firecrawl import AsyncFirecrawl
383
384
async def monitor_usage_async():
385
app = AsyncFirecrawl(api_key="your-api-key")
386
387
# Async usage monitoring
388
credits = await app.get_credit_usage()
389
tokens = await app.get_token_usage()
390
concurrency = await app.get_concurrency()
391
queue_status = await app.get_queue_status()
392
393
print(f"Credits remaining: {credits.remaining}")
394
print(f"Active jobs: {concurrency.active_crawls + concurrency.active_scrapes}")
395
396
asyncio.run(monitor_usage_async())
397
```