0
# Cloud Storage Integration
1
2
Access to major cloud storage platforms with native client optimizations and streaming capabilities. Smart-open provides direct integration with AWS S3, Google Cloud Storage, and Azure Blob Storage through their respective native SDKs.
3
4
## Capabilities
5
6
### Amazon S3 Operations
7
8
Comprehensive S3 integration with support for multipart uploads, parallel bucket iteration, and advanced client configurations.
9
10
```python { .api }
11
# Main S3 functions
12
def open(bucket_id, key_id, mode, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
13
min_part_size=DEFAULT_PART_SIZE, multipart_upload=True, defer_seek=False,
14
client=None, client_kwargs=None, writebuffer=None):
15
"""Open S3 object for reading or writing.
16
17
Parameters:
18
bucket_id: str - S3 bucket name
19
key_id: str - S3 object key
20
mode: str - File mode ('rb' or 'wb')
21
version_id: str - Specific object version (for reading)
22
buffer_size: int - I/O buffer size (default: 128KB)
23
min_part_size: int - Minimum multipart size (default: 50MB)
24
multipart_upload: bool - Use multipart upload API for writes
25
defer_seek: bool - Defer GetObject call until first read/seek
26
client: boto3.S3.Client - Custom S3 client
27
client_kwargs: dict - Additional client method parameters
28
writebuffer: IO[bytes] - Custom write buffer
29
30
Returns:
31
Reader, MultipartWriter, or SinglepartWriter instance
32
"""
33
34
def iter_bucket(bucket_name, prefix='', accept_key=None, key_limit=None,
35
workers=16, retries=3, **session_kwargs):
36
"""Iterate over S3 bucket contents in parallel.
37
38
Parameters:
39
bucket_name: str - S3 bucket name
40
prefix: str - Key prefix filter
41
accept_key: callable - Function to filter keys (key) -> bool
42
key_limit: int - Maximum number of keys to process
43
workers: int - Number of parallel download workers
44
retries: int - Number of retry attempts per download
45
**session_kwargs: Additional boto3.Session() parameters
46
47
Yields:
48
tuple[str, bytes] - (key_name, content) pairs
49
"""
50
51
def parse_uri(uri_as_string):
52
"""Parse S3 URI into components.
53
54
Returns:
55
dict with keys: scheme, bucket_id, key_id, port, host,
56
ordinary_calling_format, access_id, access_secret
57
"""
58
```
59
60
### S3 Classes
61
62
```python { .api }
63
class Reader(io.BufferedIOBase):
64
"""S3 object reader with buffering and seeking support."""
65
66
def to_boto3(self, resource):
67
"""Convert to boto3 Object for direct boto3 operations."""
68
69
class MultipartWriter(io.BufferedIOBase):
70
"""S3 multipart upload writer for large objects."""
71
72
def terminate(self):
73
"""Terminate incomplete multipart upload."""
74
75
def to_boto3(self, resource):
76
"""Convert to boto3 Object for direct boto3 operations."""
77
78
class SinglepartWriter(io.BufferedIOBase):
79
"""S3 single-part upload writer for smaller objects."""
80
81
def terminate(self):
82
"""Cancel upload and clean up resources."""
83
84
class Retry:
85
"""S3 retry mechanism for handling transient errors."""
86
87
def __init__(self, attempts=3, sleep_seconds=1.0, exceptions=(Exception,)):
88
"""Configure retry behavior for S3 operations."""
89
```
90
91
### S3 Constants
92
93
```python { .api }
94
# Part size constraints for multipart uploads
95
MIN_PART_SIZE = 5 * 1024 ** 2 # 5MB minimum part size
96
DEFAULT_PART_SIZE = 50 * 1024**2 # 50MB default part size
97
MAX_PART_SIZE = 5 * 1024 ** 3 # 5GB maximum part size
98
99
# Buffer size for S3 operations
100
DEFAULT_BUFFER_SIZE = 128 * 1024 # 128KB default buffer
101
102
# Supported S3 schemes
103
SCHEMES = ("s3", "s3n", "s3u", "s3a")
104
```
105
106
### Google Cloud Storage Operations
107
108
Native GCS integration using google-cloud-storage client library.
109
110
```python { .api }
111
def open(bucket_id, blob_id, mode, buffer_size=None,
112
min_part_size=50*1024**2, client=None, get_blob_kwargs=None,
113
blob_properties=None, blob_open_kwargs=None):
114
"""Open GCS blob for reading or writing.
115
116
Parameters:
117
bucket_id: str - GCS bucket name
118
blob_id: str - Blob name/path
119
mode: str - File mode
120
buffer_size: int - I/O buffer size
121
min_part_size: int - Minimum part size for resumable uploads
122
client: google.cloud.storage.Client - Custom GCS client
123
get_blob_kwargs: dict - Additional kwargs for bucket.get_blob()
124
blob_properties: dict - Properties to set on blob
125
blob_open_kwargs: dict - Additional kwargs for blob.open()
126
127
Returns:
128
Reader or Writer instance
129
"""
130
131
def parse_uri(uri_as_string):
132
"""Parse GCS URI into components.
133
134
Returns:
135
dict with keys: scheme, bucket_id, blob_id
136
"""
137
```
138
139
### GCS Classes
140
141
```python { .api }
142
class Reader:
143
"""GCS blob reader."""
144
145
class Writer:
146
"""GCS blob writer with resumable upload support."""
147
```
148
149
### GCS Constants
150
151
```python { .api }
152
# GCS-specific configuration
153
SCHEME = "gs"
154
155
# Part size configuration
156
DEFAULT_MIN_PART_SIZE = 50 * 1024**2 # 50MB minimum part size for resumable uploads
157
```
158
159
### Azure Blob Storage Operations
160
161
Azure Blob Storage integration using azure-storage-blob SDK.
162
163
```python { .api }
164
def open(container_id, blob_id, mode, client=None, blob_kwargs=None,
165
buffer_size=4*1024**2, min_part_size=64*1024**2, max_concurrency=1):
166
"""Open Azure blob for reading or writing.
167
168
Parameters:
169
container_id: str - Azure container name
170
blob_id: str - Blob name/path
171
mode: str - File mode ('rb' or 'wb')
172
client: azure.storage.blob.BlobServiceClient - Custom Azure client
173
blob_kwargs: dict - Additional parameters for BlobClient.commit_block_list
174
buffer_size: int - I/O buffer size (default: 4MB)
175
min_part_size: int - Minimum part size for multipart uploads (default: 64MB)
176
max_concurrency: int - Number of parallel connections (default: 1)
177
178
Returns:
179
Reader or Writer instance
180
"""
181
182
def parse_uri(uri_as_string):
183
"""Parse Azure blob URI into components.
184
185
Returns:
186
dict with keys: scheme, container_id, blob_id
187
"""
188
```
189
190
### Azure Classes
191
192
```python { .api }
193
class Reader(io.BufferedIOBase):
194
"""Azure blob reader."""
195
196
class Writer(io.BufferedIOBase):
197
"""Azure blob writer."""
198
```
199
200
### Azure Constants
201
202
```python { .api }
203
# Azure-specific configuration
204
SCHEME = "azure"
205
206
# Buffer and part size defaults
207
DEFAULT_BUFFER_SIZE = 4 * 1024**2 # 4MB default buffer size
208
DEFAULT_MIN_PART_SIZE = 64 * 1024**2 # 64MB minimum part size for multipart uploads
209
DEFAULT_MAX_CONCURRENCY = 1 # Default number of parallel connections
210
```
211
212
## Usage Examples
213
214
### AWS S3 Examples
215
216
```python
217
from smart_open import open
218
from smart_open.s3 import iter_bucket
219
220
# Basic S3 operations
221
with open('s3://my-bucket/data.txt') as f:
222
content = f.read()
223
224
# Write to S3 with custom parameters
225
transport_params = {
226
'min_part_size': 100 * 1024 * 1024, # 100MB parts
227
'multipart_upload': True,
228
'client_kwargs': {'region_name': 'us-west-2'}
229
}
230
with open('s3://bucket/large-file.dat', 'wb', transport_params=transport_params) as f:
231
f.write(large_data)
232
233
# Iterate over bucket contents
234
for key, content in iter_bucket('my-bucket', prefix='data/', workers=8):
235
print(f"Key: {key}, Size: {len(content)} bytes")
236
237
# Direct S3 module usage
238
from smart_open.s3 import open as s3_open
239
240
with s3_open('my-bucket', 'path/to/file.txt', 'rb',
241
client_kwargs={'region_name': 'eu-west-1'}) as f:
242
data = f.read()
243
```
244
245
### Google Cloud Storage Examples
246
247
```python
248
# Basic GCS operations
249
with open('gs://my-bucket/data.json') as f:
250
data = json.load(f)
251
252
# Write to GCS with blob properties
253
transport_params = {
254
'blob_properties': {
255
'content_type': 'application/json',
256
'metadata': {'source': 'smart-open'}
257
}
258
}
259
with open('gs://bucket/output.json', 'w', transport_params=transport_params) as f:
260
json.dump(data, f)
261
262
# Direct GCS module usage
263
from smart_open.gcs import open as gcs_open
264
265
with gcs_open('my-bucket', 'path/file.txt', 'rb') as f:
266
content = f.read()
267
```
268
269
### Azure Blob Storage Examples
270
271
```python
272
# Basic Azure operations
273
with open('azure://container/blob.txt') as f:
274
text = f.read()
275
276
# Write to Azure with custom client
277
from azure.storage.blob import BlobServiceClient
278
279
client = BlobServiceClient(account_url="https://account.blob.core.windows.net",
280
credential="access_key")
281
transport_params = {'client': client}
282
283
with open('azure://container/output.txt', 'w', transport_params=transport_params) as f:
284
f.write('Hello Azure!')
285
286
# Direct Azure module usage
287
from smart_open.azure import open as azure_open
288
289
with azure_open('container', 'blob-name', 'rb') as f:
290
binary_data = f.read()
291
```
292
293
## Authentication
294
295
### AWS S3 Authentication
296
297
```python
298
# Using AWS credentials (recommended)
299
# Set via environment variables, AWS config, or IAM roles
300
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
301
302
# Using transport_params with custom session
303
import boto3
304
session = boto3.Session(
305
aws_access_key_id='your-key',
306
aws_secret_access_key='your-secret',
307
region_name='us-east-1'
308
)
309
transport_params = {'session': session}
310
311
with open('s3://bucket/file.txt', transport_params=transport_params) as f:
312
data = f.read()
313
314
# URL-embedded credentials (not recommended for production)
315
with open('s3://access_key:secret_key@bucket/file.txt') as f:
316
data = f.read()
317
```
318
319
### Google Cloud Authentication
320
321
```python
322
# Using service account key file
323
import os
324
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service-account.json'
325
326
# Using explicit client
327
from google.cloud import storage
328
client = storage.Client.from_service_account_json('/path/to/key.json')
329
transport_params = {'client': client}
330
331
with open('gs://bucket/file.txt', transport_params=transport_params) as f:
332
data = f.read()
333
```
334
335
### Azure Authentication
336
337
```python
338
# Using connection string
339
from azure.storage.blob import BlobServiceClient
340
client = BlobServiceClient.from_connection_string("connection_string")
341
transport_params = {'client': client}
342
343
# Using account key
344
client = BlobServiceClient(
345
account_url="https://account.blob.core.windows.net",
346
credential="account_key"
347
)
348
transport_params = {'client': client}
349
350
with open('azure://container/file.txt', transport_params=transport_params) as f:
351
data = f.read()
352
```
353
354
## Performance Optimization
355
356
### S3 Performance Tips
357
358
```python
359
# Use multipart uploads for large files
360
transport_params = {
361
'multipart_upload': True,
362
'min_part_size': 100 * 1024 * 1024, # 100MB parts
363
'buffer_size': 1024 * 1024 # 1MB buffer
364
}
365
366
# Parallel bucket processing
367
for key, content in iter_bucket('bucket', workers=32, retries=5):
368
process_content(key, content)
369
370
# Custom S3 client with connection pooling
371
import boto3
372
from botocore.config import Config
373
374
config = Config(
375
max_pool_connections=50,
376
retries={'max_attempts': 10}
377
)
378
client = boto3.client('s3', config=config)
379
transport_params = {'client': client}
380
```
381
382
### GCS Performance Tips
383
384
```python
385
# Use resumable uploads for large files
386
transport_params = {
387
'min_part_size': 50 * 1024 * 1024, # 50MB minimum
388
'blob_open_kwargs': {'timeout': 300}
389
}
390
391
# Custom client with retry configuration
392
from google.cloud import storage
393
from google.api_core import retry
394
395
client = storage.Client()
396
transport_params = {
397
'client': client,
398
'blob_open_kwargs': {
399
'retry': retry.Retry(deadline=300)
400
}
401
}
402
```
403
404
## Error Handling and Retries
405
406
```python
407
from smart_open import open
408
import boto3
409
from botocore.exceptions import ClientError
410
411
try:
412
with open('s3://bucket/file.txt') as f:
413
data = f.read()
414
except ClientError as e:
415
error_code = e.response['Error']['Code']
416
if error_code == 'NoSuchKey':
417
print("File not found")
418
elif error_code == 'AccessDenied':
419
print("Permission denied")
420
else:
421
print(f"AWS error: {error_code}")
422
except Exception as e:
423
print(f"Other error: {e}")
424
425
# Custom retry configuration for S3
426
from smart_open.s3 import Retry
427
retry_config = Retry(
428
attempts=5,
429
sleep_seconds=2.0,
430
exceptions=(ClientError,)
431
)
432
```