0
# Storage
1
2
Persistent storage solutions for managing crawled data, key-value pairs, and request queues. Crawlee provides three main storage types designed for different data persistence needs in web scraping workflows.
3
4
## Capabilities
5
6
### Dataset
7
8
Storage for structured data with built-in export capabilities to various formats including JSON, CSV, and Excel. Ideal for storing scraped data records.
9
10
```python { .api }
11
class Dataset:
12
def __init__(self, id: str | None = None, name: str | None = None): ...
13
14
async def push_data(
15
self,
16
data: dict | list[dict],
17
*,
18
limit: int | None = None
19
) -> None:
20
"""
21
Store structured data records.
22
23
Args:
24
data: Dictionary or list of dictionaries to store
25
limit: Maximum number of records to store (None for unlimited)
26
"""
27
28
async def get_data(
29
self,
30
*,
31
offset: int = 0,
32
limit: int | None = None,
33
clean: bool = False,
34
desc: bool = False,
35
fields: list[str] | None = None
36
) -> DatasetData:
37
"""
38
Retrieve stored data with pagination and filtering.
39
40
Args:
41
offset: Number of records to skip
42
limit: Maximum number of records to return
43
clean: Remove empty records and standardize data
44
desc: Return records in descending order
45
fields: Specific fields to include in results
46
47
Returns:
48
DatasetData object containing items and metadata
49
"""
50
51
async def export_to(
52
self,
53
path: str,
54
*,
55
format: Literal["json", "csv", "xlsx"] = "json",
56
**kwargs
57
) -> None:
58
"""
59
Export dataset to file in specified format.
60
61
Args:
62
path: File path for export
63
format: Output format (json, csv, xlsx)
64
"""
65
66
async def drop(self) -> None:
67
"""Delete the dataset and all its data."""
68
69
async def get_info(self) -> DatasetInfo:
70
"""Get dataset metadata and statistics."""
71
72
@property
73
def id(self) -> str: ...
74
75
@property
76
def name(self) -> str | None: ...
77
```
78
79
```python { .api }
80
class DatasetData:
81
items: list[dict]
82
total: int
83
offset: int
84
count: int
85
limit: int | None
86
```
87
88
```python { .api }
89
class DatasetInfo:
90
id: str
91
name: str | None
92
title: str | None
93
created_at: datetime
94
modified_at: datetime
95
accessed_at: datetime
96
item_count: int
97
clean_item_count: int
98
```
99
100
### Key-Value Store
101
102
Storage for arbitrary data including binary files, configuration objects, and intermediate processing results. Supports any serializable data type.
103
104
```python { .api }
105
class KeyValueStore:
106
def __init__(self, id: str | None = None, name: str | None = None): ...
107
108
async def set_value(
109
self,
110
key: str,
111
value: any,
112
*,
113
content_type: str | None = None
114
) -> None:
115
"""
116
Store a value under the specified key.
117
118
Args:
119
key: Storage key
120
value: Data to store (any serializable type)
121
content_type: MIME type for the stored content
122
"""
123
124
async def get_value(self, key: str) -> any:
125
"""
126
Retrieve value by key.
127
128
Args:
129
key: Storage key
130
131
Returns:
132
Stored value or None if key doesn't exist
133
"""
134
135
async def delete(self, key: str) -> None:
136
"""Delete a key-value pair."""
137
138
async def list_keys(
139
self,
140
*,
141
limit: int | None = None,
142
exclusive_start_key: str | None = None
143
) -> KeyValueStoreListPage:
144
"""
145
List stored keys with pagination.
146
147
Args:
148
limit: Maximum number of keys to return
149
exclusive_start_key: Start listing after this key
150
151
Returns:
152
Page of keys with metadata
153
"""
154
155
async def drop(self) -> None:
156
"""Delete the store and all its contents."""
157
158
async def get_info(self) -> KeyValueStoreInfo:
159
"""Get store metadata and statistics."""
160
161
@property
162
def id(self) -> str: ...
163
164
@property
165
def name(self) -> str | None: ...
166
```
167
168
```python { .api }
169
class KeyValueStoreListPage:
170
items: list[KeyValueStoreKey]
171
total: int
172
offset: int
173
count: int
174
limit: int | None
175
```
176
177
```python { .api }
178
class KeyValueStoreKey:
179
key: str
180
size: int
181
```
182
183
```python { .api }
184
class KeyValueStoreInfo:
185
id: str
186
name: str | None
187
title: str | None
188
created_at: datetime
189
modified_at: datetime
190
accessed_at: datetime
191
```
192
193
### Request Queue
194
195
FIFO queue for managing crawling requests with support for request deduplication, retry logic, and distributed processing.
196
197
```python { .api }
198
class RequestQueue:
199
def __init__(self, id: str | None = None, name: str | None = None): ...
200
201
async def add_request(
202
self,
203
request: str | Request,
204
*,
205
forefront: bool = False
206
) -> RequestQueueOperationInfo:
207
"""
208
Add request to the queue.
209
210
Args:
211
request: URL string or Request object
212
forefront: Add to front of queue for priority processing
213
214
Returns:
215
Information about the add operation
216
"""
217
218
async def add_requests_batched(
219
self,
220
requests: list[str | Request],
221
*,
222
forefront: bool = False
223
) -> BatchAddRequestsResult:
224
"""
225
Add multiple requests efficiently in batch.
226
227
Args:
228
requests: List of URL strings or Request objects
229
forefront: Add to front of queue for priority processing
230
231
Returns:
232
Batch operation results
233
"""
234
235
async def fetch_next_request(self) -> Request | None:
236
"""
237
Get next request from queue for processing.
238
239
Returns:
240
Request object or None if queue is empty
241
"""
242
243
async def mark_request_as_handled(self, request: Request) -> None:
244
"""Mark request as successfully processed."""
245
246
async def reclaim_request(self, request: Request) -> None:
247
"""Return request to queue for retry after failure."""
248
249
async def is_empty(self) -> bool:
250
"""Check if queue has no pending requests."""
251
252
async def is_finished(self) -> bool:
253
"""Check if all requests have been processed."""
254
255
async def drop(self) -> None:
256
"""Delete the queue and all its requests."""
257
258
async def get_info(self) -> RequestQueueInfo:
259
"""Get queue metadata and statistics."""
260
261
@property
262
def id(self) -> str: ...
263
264
@property
265
def name(self) -> str | None: ...
266
```
267
268
```python { .api }
269
class RequestQueueOperationInfo:
270
request_id: str
271
was_already_present: bool
272
was_already_handled: bool
273
```
274
275
```python { .api }
276
class BatchAddRequestsResult:
277
processed_requests: list[ProcessedRequest]
278
unprocessed_requests: list[str | Request]
279
```
280
281
```python { .api }
282
class ProcessedRequest:
283
unique_key: str
284
was_already_present: bool
285
was_already_handled: bool
286
request_id: str
287
```
288
289
```python { .api }
290
class RequestQueueInfo:
291
id: str
292
name: str | None
293
title: str | None
294
created_at: datetime
295
modified_at: datetime
296
accessed_at: datetime
297
total_request_count: int
298
handled_request_count: int
299
pending_request_count: int
300
```
301
302
## Storage Clients
303
304
### Storage Client Interface
305
306
Abstract base class defining the interface for different storage backend implementations.
307
308
```python { .api }
309
class StorageClient:
310
async def dataset_exists(self, id: str) -> bool: ...
311
async def dataset_list(self) -> list[DatasetInfo]: ...
312
async def dataset_get_data(
313
self,
314
id: str,
315
**kwargs
316
) -> DatasetData: ...
317
async def dataset_push_data(
318
self,
319
id: str,
320
data: dict | list[dict]
321
) -> None: ...
322
async def dataset_delete(self, id: str) -> None: ...
323
324
async def key_value_store_exists(self, id: str) -> bool: ...
325
async def key_value_store_list(self) -> list[KeyValueStoreInfo]: ...
326
async def key_value_store_get_record(
327
self,
328
id: str,
329
key: str
330
) -> KeyValueStoreRecord | None: ...
331
async def key_value_store_set_record(
332
self,
333
id: str,
334
key: str,
335
value: any,
336
content_type: str | None = None
337
) -> None: ...
338
async def key_value_store_delete_record(
339
self,
340
id: str,
341
key: str
342
) -> None: ...
343
async def key_value_store_list_keys(
344
self,
345
id: str,
346
**kwargs
347
) -> KeyValueStoreListPage: ...
348
async def key_value_store_delete(self, id: str) -> None: ...
349
350
async def request_queue_exists(self, id: str) -> bool: ...
351
async def request_queue_list(self) -> list[RequestQueueInfo]: ...
352
async def request_queue_add_request(
353
self,
354
id: str,
355
request: Request,
356
forefront: bool = False
357
) -> RequestQueueOperationInfo: ...
358
async def request_queue_get_request(
359
self,
360
id: str
361
) -> Request | None: ...
362
async def request_queue_update_request(
363
self,
364
id: str,
365
request: Request
366
) -> None: ...
367
async def request_queue_delete(self, id: str) -> None: ...
368
```
369
370
### Memory Storage Client
371
372
In-memory storage implementation for development, testing, and temporary data storage.
373
374
```python { .api }
375
class MemoryStorageClient(StorageClient):
376
def __init__(self): ...
377
```
378
379
## Usage Examples
380
381
### Dataset Usage
382
383
```python
384
import asyncio
385
from crawlee.storages import Dataset
386
387
async def main():
388
# Create or get existing dataset
389
dataset = await Dataset.open('my-results')
390
391
# Store single record
392
await dataset.push_data({
393
'url': 'https://example.com',
394
'title': 'Example Page',
395
'price': 19.99
396
})
397
398
# Store multiple records
399
products = [
400
{'name': 'Product 1', 'price': 10.00},
401
{'name': 'Product 2', 'price': 15.00}
402
]
403
await dataset.push_data(products)
404
405
# Retrieve data with pagination
406
data = await dataset.get_data(limit=10, offset=0)
407
print(f"Found {data.total} total records")
408
409
# Export to file
410
await dataset.export_to('results.csv', format='csv')
411
412
asyncio.run(main())
413
```
414
415
### Key-Value Store Usage
416
417
```python
418
import asyncio
419
from crawlee.storages import KeyValueStore
420
421
async def main():
422
# Create or get existing store
423
store = await KeyValueStore.open('my-store')
424
425
# Store configuration
426
config = {'timeout': 30, 'retries': 3}
427
await store.set_value('config', config)
428
429
# Store binary data
430
with open('image.jpg', 'rb') as f:
431
await store.set_value('logo', f.read(), content_type='image/jpeg')
432
433
# Retrieve data
434
saved_config = await store.get_value('config')
435
print(f"Timeout: {saved_config['timeout']}")
436
437
# List all keys
438
keys_page = await store.list_keys(limit=100)
439
for item in keys_page.items:
440
print(f"Key: {item.key}, Size: {item.size} bytes")
441
442
asyncio.run(main())
443
```
444
445
### Request Queue Usage
446
447
```python
448
import asyncio
449
from crawlee.storages import RequestQueue
450
from crawlee import Request
451
452
async def main():
453
# Create or get existing queue
454
queue = await RequestQueue.open('my-queue')
455
456
# Add single request
457
await queue.add_request('https://example.com')
458
459
# Add request with custom data
460
request = Request('https://example.com/products', user_data={'category': 'electronics'})
461
await queue.add_request(request)
462
463
# Add multiple requests
464
urls = ['https://example.com/page1', 'https://example.com/page2']
465
await queue.add_requests_batched(urls)
466
467
# Process requests
468
while not await queue.is_empty():
469
request = await queue.fetch_next_request()
470
if request:
471
print(f"Processing: {request.url}")
472
473
# Simulate processing
474
try:
475
# Process request here...
476
await queue.mark_request_as_handled(request)
477
except Exception:
478
# Return to queue for retry
479
await queue.reclaim_request(request)
480
481
print("Queue processing complete!")
482
483
asyncio.run(main())
484
```