0
# GridFS Operations
1
2
GridFS support for storing and retrieving large files in MongoDB. Motor provides comprehensive GridFS functionality with streaming operations, metadata management, and both asyncio and Tornado framework support.
3
4
## Capabilities
5
6
### GridFS Bucket
7
8
The primary interface for GridFS operations, providing file upload, download, and management functionality.
9
10
```python { .api }
11
# AsyncIO GridFS Bucket
12
class AsyncIOMotorGridFSBucket:
13
def __init__(
14
self,
15
database: AsyncIOMotorDatabase,
16
bucket_name: str = 'fs',
17
chunk_size_bytes: int = 261120,
18
write_concern=None,
19
read_concern=None
20
):
21
"""
22
Create a GridFS bucket.
23
24
Parameters:
25
- database: The database to use for GridFS
26
- bucket_name: The bucket name (collection prefix)
27
- chunk_size_bytes: Default chunk size for uploads
28
- write_concern: Write concern for GridFS operations
29
- read_concern: Read concern for GridFS operations
30
"""
31
32
# Upload Operations
33
async def upload_from_stream(
34
self,
35
filename: str,
36
source,
37
chunk_size_bytes: Optional[int] = None,
38
metadata: Optional[Dict[str, Any]] = None,
39
session=None
40
) -> Any:
41
"""Upload a file from a source stream."""
42
43
async def upload_from_stream_with_id(
44
self,
45
file_id: Any,
46
filename: str,
47
source,
48
chunk_size_bytes: Optional[int] = None,
49
metadata: Optional[Dict[str, Any]] = None,
50
session=None
51
) -> None:
52
"""Upload a file with a specific ID."""
53
54
def open_upload_stream(
55
self,
56
filename: str,
57
chunk_size_bytes: Optional[int] = None,
58
metadata: Optional[Dict[str, Any]] = None,
59
session=None
60
) -> AsyncIOMotorGridIn:
61
"""Open an upload stream for writing."""
62
63
def open_upload_stream_with_id(
64
self,
65
file_id: Any,
66
filename: str,
67
chunk_size_bytes: Optional[int] = None,
68
metadata: Optional[Dict[str, Any]] = None,
69
session=None
70
) -> AsyncIOMotorGridIn:
71
"""Open an upload stream with a specific ID."""
72
73
# Download Operations
74
async def download_to_stream(
75
self,
76
file_id: Any,
77
destination,
78
session=None
79
) -> None:
80
"""Download a file by ID to a destination stream."""
81
82
async def download_to_stream_by_name(
83
self,
84
filename: str,
85
destination,
86
revision: int = -1,
87
session=None
88
) -> None:
89
"""Download a file by name to a destination stream."""
90
91
async def open_download_stream(
92
self,
93
file_id: Any,
94
session=None
95
) -> AsyncIOMotorGridOut:
96
"""Open a download stream by file ID."""
97
98
async def open_download_stream_by_name(
99
self,
100
filename: str,
101
revision: int = -1,
102
session=None
103
) -> AsyncIOMotorGridOut:
104
"""Open a download stream by filename."""
105
106
# File Management
107
async def delete(self, file_id: Any, session=None) -> None:
108
"""Delete a file by ID."""
109
110
async def rename(
111
self,
112
file_id: Any,
113
new_filename: str,
114
session=None
115
) -> None:
116
"""Rename a file."""
117
118
def find(
119
self,
120
filter: Optional[Dict[str, Any]] = None,
121
batch_size: int = 0,
122
limit: int = 0,
123
no_cursor_timeout: bool = False,
124
skip: int = 0,
125
sort: Optional[List[Tuple[str, int]]] = None,
126
session=None
127
) -> AsyncIOMotorGridOutCursor:
128
"""Find files matching the filter."""
129
130
# Tornado GridFS Bucket
131
class MotorGridFSBucket:
132
def __init__(
133
self,
134
database: MotorDatabase,
135
bucket_name: str = 'fs',
136
chunk_size_bytes: int = 261120,
137
write_concern=None,
138
read_concern=None
139
):
140
"""Create a GridFS bucket for Tornado applications."""
141
142
# Upload Operations
143
def upload_from_stream(
144
self,
145
filename: str,
146
source,
147
chunk_size_bytes: Optional[int] = None,
148
metadata: Optional[Dict[str, Any]] = None,
149
session=None
150
) -> tornado.concurrent.Future:
151
"""Upload a file from a source stream."""
152
153
def upload_from_stream_with_id(
154
self,
155
file_id: Any,
156
filename: str,
157
source,
158
chunk_size_bytes: Optional[int] = None,
159
metadata: Optional[Dict[str, Any]] = None,
160
session=None
161
) -> tornado.concurrent.Future:
162
"""Upload a file with a specific ID."""
163
164
def open_upload_stream(
165
self,
166
filename: str,
167
chunk_size_bytes: Optional[int] = None,
168
metadata: Optional[Dict[str, Any]] = None,
169
session=None
170
) -> MotorGridIn:
171
"""Open an upload stream for writing."""
172
173
def open_upload_stream_with_id(
174
self,
175
file_id: Any,
176
filename: str,
177
chunk_size_bytes: Optional[int] = None,
178
metadata: Optional[Dict[str, Any]] = None,
179
session=None
180
) -> MotorGridIn:
181
"""Open an upload stream with a specific ID."""
182
183
# Download Operations
184
def download_to_stream(
185
self,
186
file_id: Any,
187
destination,
188
session=None
189
) -> tornado.concurrent.Future:
190
"""Download a file by ID to a destination stream."""
191
192
def download_to_stream_by_name(
193
self,
194
filename: str,
195
destination,
196
revision: int = -1,
197
session=None
198
) -> tornado.concurrent.Future:
199
"""Download a file by name to a destination stream."""
200
201
def open_download_stream(
202
self,
203
file_id: Any,
204
session=None
205
) -> tornado.concurrent.Future:
206
"""Open a download stream by file ID."""
207
208
def open_download_stream_by_name(
209
self,
210
filename: str,
211
revision: int = -1,
212
session=None
213
) -> tornado.concurrent.Future:
214
"""Open a download stream by filename."""
215
216
# File Management
217
def delete(self, file_id: Any, session=None) -> tornado.concurrent.Future:
218
"""Delete a file by ID."""
219
220
def rename(
221
self,
222
file_id: Any,
223
new_filename: str,
224
session=None
225
) -> tornado.concurrent.Future:
226
"""Rename a file."""
227
228
def find(
229
self,
230
filter: Optional[Dict[str, Any]] = None,
231
batch_size: int = 0,
232
limit: int = 0,
233
no_cursor_timeout: bool = False,
234
skip: int = 0,
235
sort: Optional[List[Tuple[str, int]]] = None,
236
session=None
237
) -> MotorGridOutCursor:
238
"""Find files matching the filter."""
239
```
240
241
### GridFS Upload Stream
242
243
Stream interface for uploading files to GridFS with write operations and metadata management.
244
245
```python { .api }
246
# AsyncIO Upload Stream
247
class AsyncIOMotorGridIn:
248
# Properties (read-only after upload starts)
249
@property
250
def _id(self) -> Any:
251
"""The file's unique identifier."""
252
253
@property
254
def filename(self) -> str:
255
"""The file's name."""
256
257
@property
258
def name(self) -> str:
259
"""Alias for filename."""
260
261
@property
262
def content_type(self) -> Optional[str]:
263
"""The file's content type."""
264
265
@property
266
def length(self) -> int:
267
"""The current length of the file."""
268
269
@property
270
def chunk_size(self) -> int:
271
"""The chunk size for this file."""
272
273
@property
274
def upload_date(self) -> Optional[datetime.datetime]:
275
"""The upload date (available after close)."""
276
277
@property
278
def metadata(self) -> Optional[Dict[str, Any]]:
279
"""The file's metadata."""
280
281
@property
282
def closed(self) -> bool:
283
"""Whether the file is closed."""
284
285
# Write Operations
286
async def write(self, data: bytes) -> None:
287
"""Write data to the file."""
288
289
async def writelines(self, lines: List[bytes]) -> None:
290
"""Write multiple lines to the file."""
291
292
# Stream Management
293
async def close(self) -> None:
294
"""Close and finalize the file upload."""
295
296
async def abort(self) -> None:
297
"""Abort the upload and delete any chunks."""
298
299
# Metadata Management
300
async def set(self, name: str, value: Any) -> None:
301
"""Set a metadata field."""
302
303
# Stream Properties
304
def writable(self) -> bool:
305
"""Whether the stream is writable."""
306
307
# Tornado Upload Stream
308
class MotorGridIn:
309
# Properties (identical to AsyncIO version)
310
@property
311
def _id(self) -> Any: ...
312
@property
313
def filename(self) -> str: ...
314
@property
315
def name(self) -> str: ...
316
@property
317
def content_type(self) -> Optional[str]: ...
318
@property
319
def length(self) -> int: ...
320
@property
321
def chunk_size(self) -> int: ...
322
@property
323
def upload_date(self) -> Optional[datetime.datetime]: ...
324
@property
325
def metadata(self) -> Optional[Dict[str, Any]]: ...
326
@property
327
def closed(self) -> bool: ...
328
329
# Write Operations (return Tornado Futures)
330
def write(self, data: bytes) -> tornado.concurrent.Future: ...
331
def writelines(self, lines: List[bytes]) -> tornado.concurrent.Future: ...
332
def close(self) -> tornado.concurrent.Future: ...
333
def abort(self) -> tornado.concurrent.Future: ...
334
def set(self, name: str, value: Any) -> tornado.concurrent.Future: ...
335
def writable(self) -> bool: ...
336
```
337
338
### GridFS Download Stream
339
340
Stream interface for downloading files from GridFS with read operations and file metadata access.
341
342
```python { .api }
343
# AsyncIO Download Stream
344
class AsyncIOMotorGridOut:
345
# Properties (available after open())
346
@property
347
def _id(self) -> Any:
348
"""The file's unique identifier."""
349
350
@property
351
def filename(self) -> str:
352
"""The file's name."""
353
354
@property
355
def name(self) -> str:
356
"""Alias for filename."""
357
358
@property
359
def content_type(self) -> Optional[str]:
360
"""The file's content type."""
361
362
@property
363
def length(self) -> int:
364
"""The file's length in bytes."""
365
366
@property
367
def chunk_size(self) -> int:
368
"""The chunk size for this file."""
369
370
@property
371
def upload_date(self) -> datetime.datetime:
372
"""When the file was uploaded."""
373
374
@property
375
def metadata(self) -> Optional[Dict[str, Any]]:
376
"""The file's metadata."""
377
378
@property
379
def aliases(self) -> Optional[List[str]]:
380
"""The file's aliases (deprecated)."""
381
382
# Read Operations
383
async def open(self) -> AsyncIOMotorGridOut:
384
"""Open the file for reading (must be called first)."""
385
386
async def read(self, size: int = -1) -> bytes:
387
"""Read up to size bytes from the file."""
388
389
async def readchunk(self) -> bytes:
390
"""Read one chunk from the file."""
391
392
async def readline(self, size: int = -1) -> bytes:
393
"""Read one line from the file."""
394
395
# Stream Navigation
396
def seek(self, pos: int, whence: int = 0) -> int:
397
"""Seek to a position in the file."""
398
399
def tell(self) -> int:
400
"""Get the current position in the file."""
401
402
def close(self) -> None:
403
"""Close the file."""
404
405
# Stream Properties
406
def readable(self) -> bool:
407
"""Whether the stream is readable."""
408
409
def seekable(self) -> bool:
410
"""Whether the stream supports seeking."""
411
412
# Tornado Download Stream
413
class MotorGridOut:
414
# Properties (identical to AsyncIO version)
415
@property
416
def _id(self) -> Any: ...
417
@property
418
def filename(self) -> str: ...
419
@property
420
def name(self) -> str: ...
421
@property
422
def content_type(self) -> Optional[str]: ...
423
@property
424
def length(self) -> int: ...
425
@property
426
def chunk_size(self) -> int: ...
427
@property
428
def upload_date(self) -> datetime.datetime: ...
429
@property
430
def metadata(self) -> Optional[Dict[str, Any]]: ...
431
@property
432
def aliases(self) -> Optional[List[str]]: ...
433
434
# Read Operations (return Tornado Futures)
435
def open(self) -> tornado.concurrent.Future: ...
436
def read(self, size: int = -1) -> tornado.concurrent.Future: ...
437
def readchunk(self) -> tornado.concurrent.Future: ...
438
def readline(self, size: int = -1) -> tornado.concurrent.Future: ...
439
440
# Stream Navigation (synchronous)
441
def seek(self, pos: int, whence: int = 0) -> int: ...
442
def tell(self) -> int: ...
443
def close(self) -> None: ...
444
def readable(self) -> bool: ...
445
def seekable(self) -> bool: ...
446
```
447
448
### GridFS Cursor
449
450
Cursor for iterating over GridFS file metadata and accessing file information.
451
452
```python { .api }
453
# AsyncIO GridFS Cursor
454
class AsyncIOMotorGridOutCursor:
455
def limit(self, limit: int) -> AsyncIOMotorGridOutCursor:
456
"""Limit the number of results."""
457
458
def skip(self, skip: int) -> AsyncIOMotorGridOutCursor:
459
"""Skip a number of results."""
460
461
def sort(
462
self,
463
key_or_list: Union[str, List[Tuple[str, int]]],
464
direction: Optional[int] = None
465
) -> AsyncIOMotorGridOutCursor:
466
"""Sort the results."""
467
468
def batch_size(self, batch_size: int) -> AsyncIOMotorGridOutCursor:
469
"""Set the batch size."""
470
471
async def to_list(self, length: Optional[int] = None) -> List[AsyncIOMotorGridOut]:
472
"""Convert cursor to a list."""
473
474
def __aiter__(self) -> AsyncIOMotorGridOutCursor:
475
"""Async iterator protocol."""
476
477
async def __anext__(self) -> AsyncIOMotorGridOut:
478
"""Get next file."""
479
480
# Tornado GridFS Cursor
481
class MotorGridOutCursor:
482
def limit(self, limit: int) -> MotorGridOutCursor: ...
483
def skip(self, skip: int) -> MotorGridOutCursor: ...
484
def sort(
485
self,
486
key_or_list: Union[str, List[Tuple[str, int]]],
487
direction: Optional[int] = None
488
) -> MotorGridOutCursor: ...
489
def batch_size(self, batch_size: int) -> MotorGridOutCursor: ...
490
491
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
492
def next_object(self) -> tornado.concurrent.Future: ...
493
```
494
495
## Usage Examples
496
497
### AsyncIO File Upload and Download
498
499
```python
500
import asyncio
501
import motor.motor_asyncio
502
from io import BytesIO
503
504
async def gridfs_example():
505
client = motor.motor_asyncio.AsyncIOMotorClient()
506
db = client.test_database
507
508
# Create GridFS bucket
509
bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
510
511
# Upload from bytes
512
file_data = b"Hello, GridFS world!"
513
source = BytesIO(file_data)
514
515
file_id = await bucket.upload_from_stream(
516
"hello.txt",
517
source,
518
metadata={"type": "greeting", "author": "Motor"}
519
)
520
print(f"Uploaded file with ID: {file_id}")
521
522
# Download to bytes
523
destination = BytesIO()
524
await bucket.download_to_stream(file_id, destination)
525
526
downloaded_data = destination.getvalue()
527
print(f"Downloaded: {downloaded_data.decode()}")
528
529
# Stream upload
530
upload_stream = bucket.open_upload_stream(
531
"large_file.dat",
532
metadata={"description": "Large file example"}
533
)
534
535
# Write data in chunks
536
for i in range(10):
537
chunk = f"Chunk {i}\n".encode()
538
await upload_stream.write(chunk)
539
540
await upload_stream.close()
541
print(f"Uploaded large file with ID: {upload_stream._id}")
542
543
# Stream download
544
download_stream = await bucket.open_download_stream(upload_stream._id)
545
546
# Read file information
547
print(f"File: {download_stream.filename}")
548
print(f"Size: {download_stream.length} bytes")
549
print(f"Content Type: {download_stream.content_type}")
550
print(f"Upload Date: {download_stream.upload_date}")
551
print(f"Metadata: {download_stream.metadata}")
552
553
# Read data
554
data = await download_stream.read()
555
print(f"Downloaded data: {data.decode()}")
556
557
download_stream.close()
558
client.close()
559
560
asyncio.run(gridfs_example())
561
```
562
563
### File Management Operations
564
565
```python
566
import asyncio
567
import motor.motor_asyncio
568
569
async def file_management_example():
570
client = motor.motor_asyncio.AsyncIOMotorClient()
571
db = client.test_database
572
bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
573
574
# Find files
575
cursor = bucket.find({"metadata.type": "image"})
576
async for file_doc in cursor:
577
print(f"Found file: {file_doc.filename} (ID: {file_doc._id})")
578
579
# Find files with sorting and limiting
580
cursor = bucket.find().sort("uploadDate", -1).limit(5)
581
recent_files = await cursor.to_list(5)
582
583
for file_doc in recent_files:
584
print(f"Recent file: {file_doc.filename}")
585
586
# Rename a file
587
if recent_files:
588
file_id = recent_files[0]._id
589
await bucket.rename(file_id, "renamed_file.txt")
590
print(f"Renamed file {file_id}")
591
592
# Delete a file
593
if len(recent_files) > 1:
594
file_id = recent_files[1]._id
595
await bucket.delete(file_id)
596
print(f"Deleted file {file_id}")
597
598
client.close()
599
600
asyncio.run(file_management_example())
601
```
602
603
### Large File Streaming
604
605
```python
606
import asyncio
607
import motor.motor_asyncio
608
609
async def large_file_streaming():
610
client = motor.motor_asyncio.AsyncIOMotorClient()
611
db = client.test_database
612
bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
613
614
# Upload large file in chunks
615
upload_stream = bucket.open_upload_stream(
616
"video.mp4",
617
chunk_size_bytes=1024*1024, # 1MB chunks
618
metadata={"type": "video", "codec": "h264"}
619
)
620
621
# Simulate large file upload
622
total_size = 0
623
for i in range(100): # 100MB file simulation
624
chunk = b"X" * 1024 * 1024 # 1MB of data
625
await upload_stream.write(chunk)
626
total_size += len(chunk)
627
628
if i % 10 == 0:
629
print(f"Uploaded {total_size / (1024*1024):.1f}MB")
630
631
await upload_stream.close()
632
print(f"Upload complete. File ID: {upload_stream._id}")
633
634
# Stream download with progress
635
download_stream = await bucket.open_download_stream(upload_stream._id)
636
print(f"Downloading {download_stream.length / (1024*1024):.1f}MB file")
637
638
downloaded_size = 0
639
while downloaded_size < download_stream.length:
640
chunk = await download_stream.readchunk()
641
if not chunk:
642
break
643
644
downloaded_size += len(chunk)
645
progress = (downloaded_size / download_stream.length) * 100
646
647
if downloaded_size % (10 * 1024 * 1024) == 0: # Every 10MB
648
print(f"Downloaded {progress:.1f}%")
649
650
download_stream.close()
651
print("Download complete")
652
client.close()
653
654
asyncio.run(large_file_streaming())
655
```
656
657
## Types
658
659
```python { .api }
660
from typing import Any, Optional, Union, Dict, List, Tuple
661
from datetime import datetime
662
import io
663
664
# GridFS-specific types
665
GridFSFile = Dict[str, Any] # GridFS file document
666
ChunkData = bytes # File chunk data
667
FileId = Any # GridFS file identifier (usually ObjectId)
668
```