or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio-operations.mdchange-streams.mdclient-encryption.mdcursor-operations.mdgridfs-operations.mdindex.mdtornado-operations.mdweb-integration.md

gridfs-operations.mddocs/

0

# GridFS Operations

1

2

GridFS support for storing and retrieving large files in MongoDB. Motor provides comprehensive GridFS functionality with streaming operations, metadata management, and both asyncio and Tornado framework support.

3

4

## Capabilities

5

6

### GridFS Bucket

7

8

The primary interface for GridFS operations, providing file upload, download, and management functionality.

9

10

```python { .api }

11

# AsyncIO GridFS Bucket

12

class AsyncIOMotorGridFSBucket:

13

def __init__(

14

self,

15

database: AsyncIOMotorDatabase,

16

bucket_name: str = 'fs',

17

chunk_size_bytes: int = 261120,

18

write_concern=None,

19

read_concern=None

20

):

21

"""

22

Create a GridFS bucket.

23

24

Parameters:

25

- database: The database to use for GridFS

26

- bucket_name: The bucket name (collection prefix)

27

- chunk_size_bytes: Default chunk size for uploads

28

- write_concern: Write concern for GridFS operations

29

- read_concern: Read concern for GridFS operations

30

"""

31

32

# Upload Operations

33

async def upload_from_stream(

34

self,

35

filename: str,

36

source,

37

chunk_size_bytes: Optional[int] = None,

38

metadata: Optional[Dict[str, Any]] = None,

39

session=None

40

) -> Any:

41

"""Upload a file from a source stream."""

42

43

async def upload_from_stream_with_id(

44

self,

45

file_id: Any,

46

filename: str,

47

source,

48

chunk_size_bytes: Optional[int] = None,

49

metadata: Optional[Dict[str, Any]] = None,

50

session=None

51

) -> None:

52

"""Upload a file with a specific ID."""

53

54

def open_upload_stream(

55

self,

56

filename: str,

57

chunk_size_bytes: Optional[int] = None,

58

metadata: Optional[Dict[str, Any]] = None,

59

session=None

60

) -> AsyncIOMotorGridIn:

61

"""Open an upload stream for writing."""

62

63

def open_upload_stream_with_id(

64

self,

65

file_id: Any,

66

filename: str,

67

chunk_size_bytes: Optional[int] = None,

68

metadata: Optional[Dict[str, Any]] = None,

69

session=None

70

) -> AsyncIOMotorGridIn:

71

"""Open an upload stream with a specific ID."""

72

73

# Download Operations

74

async def download_to_stream(

75

self,

76

file_id: Any,

77

destination,

78

session=None

79

) -> None:

80

"""Download a file by ID to a destination stream."""

81

82

async def download_to_stream_by_name(

83

self,

84

filename: str,

85

destination,

86

revision: int = -1,

87

session=None

88

) -> None:

89

"""Download a file by name to a destination stream."""

90

91

async def open_download_stream(

92

self,

93

file_id: Any,

94

session=None

95

) -> AsyncIOMotorGridOut:

96

"""Open a download stream by file ID."""

97

98

async def open_download_stream_by_name(

99

self,

100

filename: str,

101

revision: int = -1,

102

session=None

103

) -> AsyncIOMotorGridOut:

104

"""Open a download stream by filename."""

105

106

# File Management

107

async def delete(self, file_id: Any, session=None) -> None:

108

"""Delete a file by ID."""

109

110

async def rename(

111

self,

112

file_id: Any,

113

new_filename: str,

114

session=None

115

) -> None:

116

"""Rename a file."""

117

118

def find(

119

self,

120

filter: Optional[Dict[str, Any]] = None,

121

batch_size: int = 0,

122

limit: int = 0,

123

no_cursor_timeout: bool = False,

124

skip: int = 0,

125

sort: Optional[List[Tuple[str, int]]] = None,

126

session=None

127

) -> AsyncIOMotorGridOutCursor:

128

"""Find files matching the filter."""

129

130

# Tornado GridFS Bucket

131

class MotorGridFSBucket:

132

def __init__(

133

self,

134

database: MotorDatabase,

135

bucket_name: str = 'fs',

136

chunk_size_bytes: int = 261120,

137

write_concern=None,

138

read_concern=None

139

):

140

"""Create a GridFS bucket for Tornado applications."""

141

142

# Upload Operations

143

def upload_from_stream(

144

self,

145

filename: str,

146

source,

147

chunk_size_bytes: Optional[int] = None,

148

metadata: Optional[Dict[str, Any]] = None,

149

session=None

150

) -> tornado.concurrent.Future:

151

"""Upload a file from a source stream."""

152

153

def upload_from_stream_with_id(

154

self,

155

file_id: Any,

156

filename: str,

157

source,

158

chunk_size_bytes: Optional[int] = None,

159

metadata: Optional[Dict[str, Any]] = None,

160

session=None

161

) -> tornado.concurrent.Future:

162

"""Upload a file with a specific ID."""

163

164

def open_upload_stream(

165

self,

166

filename: str,

167

chunk_size_bytes: Optional[int] = None,

168

metadata: Optional[Dict[str, Any]] = None,

169

session=None

170

) -> MotorGridIn:

171

"""Open an upload stream for writing."""

172

173

def open_upload_stream_with_id(

174

self,

175

file_id: Any,

176

filename: str,

177

chunk_size_bytes: Optional[int] = None,

178

metadata: Optional[Dict[str, Any]] = None,

179

session=None

180

) -> MotorGridIn:

181

"""Open an upload stream with a specific ID."""

182

183

# Download Operations

184

def download_to_stream(

185

self,

186

file_id: Any,

187

destination,

188

session=None

189

) -> tornado.concurrent.Future:

190

"""Download a file by ID to a destination stream."""

191

192

def download_to_stream_by_name(

193

self,

194

filename: str,

195

destination,

196

revision: int = -1,

197

session=None

198

) -> tornado.concurrent.Future:

199

"""Download a file by name to a destination stream."""

200

201

def open_download_stream(

202

self,

203

file_id: Any,

204

session=None

205

) -> tornado.concurrent.Future:

206

"""Open a download stream by file ID."""

207

208

def open_download_stream_by_name(

209

self,

210

filename: str,

211

revision: int = -1,

212

session=None

213

) -> tornado.concurrent.Future:

214

"""Open a download stream by filename."""

215

216

# File Management

217

def delete(self, file_id: Any, session=None) -> tornado.concurrent.Future:

218

"""Delete a file by ID."""

219

220

def rename(

221

self,

222

file_id: Any,

223

new_filename: str,

224

session=None

225

) -> tornado.concurrent.Future:

226

"""Rename a file."""

227

228

def find(

229

self,

230

filter: Optional[Dict[str, Any]] = None,

231

batch_size: int = 0,

232

limit: int = 0,

233

no_cursor_timeout: bool = False,

234

skip: int = 0,

235

sort: Optional[List[Tuple[str, int]]] = None,

236

session=None

237

) -> MotorGridOutCursor:

238

"""Find files matching the filter."""

239

```

240

241

### GridFS Upload Stream

242

243

Stream interface for uploading files to GridFS with write operations and metadata management.

244

245

```python { .api }

246

# AsyncIO Upload Stream

247

class AsyncIOMotorGridIn:

248

# Properties (read-only after upload starts)

249

@property

250

def _id(self) -> Any:

251

"""The file's unique identifier."""

252

253

@property

254

def filename(self) -> str:

255

"""The file's name."""

256

257

@property

258

def name(self) -> str:

259

"""Alias for filename."""

260

261

@property

262

def content_type(self) -> Optional[str]:

263

"""The file's content type."""

264

265

@property

266

def length(self) -> int:

267

"""The current length of the file."""

268

269

@property

270

def chunk_size(self) -> int:

271

"""The chunk size for this file."""

272

273

@property

274

def upload_date(self) -> Optional[datetime.datetime]:

275

"""The upload date (available after close)."""

276

277

@property

278

def metadata(self) -> Optional[Dict[str, Any]]:

279

"""The file's metadata."""

280

281

@property

282

def closed(self) -> bool:

283

"""Whether the file is closed."""

284

285

# Write Operations

286

async def write(self, data: bytes) -> None:

287

"""Write data to the file."""

288

289

async def writelines(self, lines: List[bytes]) -> None:

290

"""Write multiple lines to the file."""

291

292

# Stream Management

293

async def close(self) -> None:

294

"""Close and finalize the file upload."""

295

296

async def abort(self) -> None:

297

"""Abort the upload and delete any chunks."""

298

299

# Metadata Management

300

async def set(self, name: str, value: Any) -> None:

301

"""Set a metadata field."""

302

303

# Stream Properties

304

def writable(self) -> bool:

305

"""Whether the stream is writable."""

306

307

# Tornado Upload Stream

308

class MotorGridIn:

309

# Properties (identical to AsyncIO version)

310

@property

311

def _id(self) -> Any: ...

312

@property

313

def filename(self) -> str: ...

314

@property

315

def name(self) -> str: ...

316

@property

317

def content_type(self) -> Optional[str]: ...

318

@property

319

def length(self) -> int: ...

320

@property

321

def chunk_size(self) -> int: ...

322

@property

323

def upload_date(self) -> Optional[datetime.datetime]: ...

324

@property

325

def metadata(self) -> Optional[Dict[str, Any]]: ...

326

@property

327

def closed(self) -> bool: ...

328

329

# Write Operations (return Tornado Futures)

330

def write(self, data: bytes) -> tornado.concurrent.Future: ...

331

def writelines(self, lines: List[bytes]) -> tornado.concurrent.Future: ...

332

def close(self) -> tornado.concurrent.Future: ...

333

def abort(self) -> tornado.concurrent.Future: ...

334

def set(self, name: str, value: Any) -> tornado.concurrent.Future: ...

335

def writable(self) -> bool: ...

336

```

337

338

### GridFS Download Stream

339

340

Stream interface for downloading files from GridFS with read operations and file metadata access.

341

342

```python { .api }

343

# AsyncIO Download Stream

344

class AsyncIOMotorGridOut:

345

# Properties (available after open())

346

@property

347

def _id(self) -> Any:

348

"""The file's unique identifier."""

349

350

@property

351

def filename(self) -> str:

352

"""The file's name."""

353

354

@property

355

def name(self) -> str:

356

"""Alias for filename."""

357

358

@property

359

def content_type(self) -> Optional[str]:

360

"""The file's content type."""

361

362

@property

363

def length(self) -> int:

364

"""The file's length in bytes."""

365

366

@property

367

def chunk_size(self) -> int:

368

"""The chunk size for this file."""

369

370

@property

371

def upload_date(self) -> datetime.datetime:

372

"""When the file was uploaded."""

373

374

@property

375

def metadata(self) -> Optional[Dict[str, Any]]:

376

"""The file's metadata."""

377

378

@property

379

def aliases(self) -> Optional[List[str]]:

380

"""The file's aliases (deprecated)."""

381

382

# Read Operations

383

async def open(self) -> AsyncIOMotorGridOut:

384

"""Open the file for reading (must be called first)."""

385

386

async def read(self, size: int = -1) -> bytes:

387

"""Read up to size bytes from the file."""

388

389

async def readchunk(self) -> bytes:

390

"""Read one chunk from the file."""

391

392

async def readline(self, size: int = -1) -> bytes:

393

"""Read one line from the file."""

394

395

# Stream Navigation

396

def seek(self, pos: int, whence: int = 0) -> int:

397

"""Seek to a position in the file."""

398

399

def tell(self) -> int:

400

"""Get the current position in the file."""

401

402

def close(self) -> None:

403

"""Close the file."""

404

405

# Stream Properties

406

def readable(self) -> bool:

407

"""Whether the stream is readable."""

408

409

def seekable(self) -> bool:

410

"""Whether the stream supports seeking."""

411

412

# Tornado Download Stream

413

class MotorGridOut:

414

# Properties (identical to AsyncIO version)

415

@property

416

def _id(self) -> Any: ...

417

@property

418

def filename(self) -> str: ...

419

@property

420

def name(self) -> str: ...

421

@property

422

def content_type(self) -> Optional[str]: ...

423

@property

424

def length(self) -> int: ...

425

@property

426

def chunk_size(self) -> int: ...

427

@property

428

def upload_date(self) -> datetime.datetime: ...

429

@property

430

def metadata(self) -> Optional[Dict[str, Any]]: ...

431

@property

432

def aliases(self) -> Optional[List[str]]: ...

433

434

# Read Operations (return Tornado Futures)

435

def open(self) -> tornado.concurrent.Future: ...

436

def read(self, size: int = -1) -> tornado.concurrent.Future: ...

437

def readchunk(self) -> tornado.concurrent.Future: ...

438

def readline(self, size: int = -1) -> tornado.concurrent.Future: ...

439

440

# Stream Navigation (synchronous)

441

def seek(self, pos: int, whence: int = 0) -> int: ...

442

def tell(self) -> int: ...

443

def close(self) -> None: ...

444

def readable(self) -> bool: ...

445

def seekable(self) -> bool: ...

446

```

447

448

### GridFS Cursor

449

450

Cursor for iterating over GridFS file metadata and accessing file information.

451

452

```python { .api }

453

# AsyncIO GridFS Cursor

454

class AsyncIOMotorGridOutCursor:

455

def limit(self, limit: int) -> AsyncIOMotorGridOutCursor:

456

"""Limit the number of results."""

457

458

def skip(self, skip: int) -> AsyncIOMotorGridOutCursor:

459

"""Skip a number of results."""

460

461

def sort(

462

self,

463

key_or_list: Union[str, List[Tuple[str, int]]],

464

direction: Optional[int] = None

465

) -> AsyncIOMotorGridOutCursor:

466

"""Sort the results."""

467

468

def batch_size(self, batch_size: int) -> AsyncIOMotorGridOutCursor:

469

"""Set the batch size."""

470

471

async def to_list(self, length: Optional[int] = None) -> List[AsyncIOMotorGridOut]:

472

"""Convert cursor to a list."""

473

474

def __aiter__(self) -> AsyncIOMotorGridOutCursor:

475

"""Async iterator protocol."""

476

477

async def __anext__(self) -> AsyncIOMotorGridOut:

478

"""Get next file."""

479

480

# Tornado GridFS Cursor

481

class MotorGridOutCursor:

482

def limit(self, limit: int) -> MotorGridOutCursor: ...

483

def skip(self, skip: int) -> MotorGridOutCursor: ...

484

def sort(

485

self,

486

key_or_list: Union[str, List[Tuple[str, int]]],

487

direction: Optional[int] = None

488

) -> MotorGridOutCursor: ...

489

def batch_size(self, batch_size: int) -> MotorGridOutCursor: ...

490

491

def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...

492

def next_object(self) -> tornado.concurrent.Future: ...

493

```

494

495

## Usage Examples

496

497

### AsyncIO File Upload and Download

498

499

```python

500

import asyncio

501

import motor.motor_asyncio

502

from io import BytesIO

503

504

async def gridfs_example():

505

client = motor.motor_asyncio.AsyncIOMotorClient()

506

db = client.test_database

507

508

# Create GridFS bucket

509

bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)

510

511

# Upload from bytes

512

file_data = b"Hello, GridFS world!"

513

source = BytesIO(file_data)

514

515

file_id = await bucket.upload_from_stream(

516

"hello.txt",

517

source,

518

metadata={"type": "greeting", "author": "Motor"}

519

)

520

print(f"Uploaded file with ID: {file_id}")

521

522

# Download to bytes

523

destination = BytesIO()

524

await bucket.download_to_stream(file_id, destination)

525

526

downloaded_data = destination.getvalue()

527

print(f"Downloaded: {downloaded_data.decode()}")

528

529

# Stream upload

530

upload_stream = bucket.open_upload_stream(

531

"large_file.dat",

532

metadata={"description": "Large file example"}

533

)

534

535

# Write data in chunks

536

for i in range(10):

537

chunk = f"Chunk {i}\n".encode()

538

await upload_stream.write(chunk)

539

540

await upload_stream.close()

541

print(f"Uploaded large file with ID: {upload_stream._id}")

542

543

# Stream download

544

download_stream = await bucket.open_download_stream(upload_stream._id)

545

546

# Read file information

547

print(f"File: {download_stream.filename}")

548

print(f"Size: {download_stream.length} bytes")

549

print(f"Content Type: {download_stream.content_type}")

550

print(f"Upload Date: {download_stream.upload_date}")

551

print(f"Metadata: {download_stream.metadata}")

552

553

# Read data

554

data = await download_stream.read()

555

print(f"Downloaded data: {data.decode()}")

556

557

download_stream.close()

558

client.close()

559

560

asyncio.run(gridfs_example())

561

```

562

563

### File Management Operations

564

565

```python

566

import asyncio

567

import motor.motor_asyncio

568

569

async def file_management_example():

570

client = motor.motor_asyncio.AsyncIOMotorClient()

571

db = client.test_database

572

bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)

573

574

# Find files

575

cursor = bucket.find({"metadata.type": "image"})

576

async for file_doc in cursor:

577

print(f"Found file: {file_doc.filename} (ID: {file_doc._id})")

578

579

# Find files with sorting and limiting

580

cursor = bucket.find().sort("uploadDate", -1).limit(5)

581

recent_files = await cursor.to_list(5)

582

583

for file_doc in recent_files:

584

print(f"Recent file: {file_doc.filename}")

585

586

# Rename a file

587

if recent_files:

588

file_id = recent_files[0]._id

589

await bucket.rename(file_id, "renamed_file.txt")

590

print(f"Renamed file {file_id}")

591

592

# Delete a file

593

if len(recent_files) > 1:

594

file_id = recent_files[1]._id

595

await bucket.delete(file_id)

596

print(f"Deleted file {file_id}")

597

598

client.close()

599

600

asyncio.run(file_management_example())

601

```

602

603

### Large File Streaming

604

605

```python

606

import asyncio

607

import motor.motor_asyncio

608

609

async def large_file_streaming():

610

client = motor.motor_asyncio.AsyncIOMotorClient()

611

db = client.test_database

612

bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)

613

614

# Upload large file in chunks

615

upload_stream = bucket.open_upload_stream(

616

"video.mp4",

617

chunk_size_bytes=1024*1024, # 1MB chunks

618

metadata={"type": "video", "codec": "h264"}

619

)

620

621

# Simulate large file upload

622

total_size = 0

623

for i in range(100): # 100MB file simulation

624

chunk = b"X" * 1024 * 1024 # 1MB of data

625

await upload_stream.write(chunk)

626

total_size += len(chunk)

627

628

if i % 10 == 0:

629

print(f"Uploaded {total_size / (1024*1024):.1f}MB")

630

631

await upload_stream.close()

632

print(f"Upload complete. File ID: {upload_stream._id}")

633

634

# Stream download with progress

635

download_stream = await bucket.open_download_stream(upload_stream._id)

636

print(f"Downloading {download_stream.length / (1024*1024):.1f}MB file")

637

638

downloaded_size = 0

639

while downloaded_size < download_stream.length:

640

chunk = await download_stream.readchunk()

641

if not chunk:

642

break

643

644

downloaded_size += len(chunk)

645

progress = (downloaded_size / download_stream.length) * 100

646

647

if downloaded_size % (10 * 1024 * 1024) == 0: # Every 10MB

648

print(f"Downloaded {progress:.1f}%")

649

650

download_stream.close()

651

print("Download complete")

652

client.close()

653

654

asyncio.run(large_file_streaming())

655

```

656

657

## Types

658

659

```python { .api }

660

from typing import Any, Optional, Union, Dict, List, Tuple

661

from datetime import datetime

662

import io

663

664

# GridFS-specific types

665

GridFSFile = Dict[str, Any] # GridFS file document

666

ChunkData = bytes # File chunk data

667

FileId = Any # GridFS file identifier (usually ObjectId)

668

```