Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support
—
Access to major cloud storage platforms with native client optimizations and streaming capabilities. Smart-open provides direct integration with AWS S3, Google Cloud Storage, and Azure Blob Storage through their respective native SDKs.
Comprehensive S3 integration with support for multipart uploads, parallel bucket iteration, and advanced client configurations.
# Main S3 functions
def open(bucket_id, key_id, mode, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_PART_SIZE, multipart_upload=True, defer_seek=False,
client=None, client_kwargs=None, writebuffer=None):
"""Open S3 object for reading or writing.
Parameters:
bucket_id: str - S3 bucket name
key_id: str - S3 object key
mode: str - File mode ('rb' or 'wb')
version_id: str - Specific object version (for reading)
buffer_size: int - I/O buffer size (default: 128KB)
min_part_size: int - Minimum multipart size (default: 50MB)
multipart_upload: bool - Use multipart upload API for writes
defer_seek: bool - Defer GetObject call until first read/seek
client: boto3.S3.Client - Custom S3 client
client_kwargs: dict - Additional client method parameters
writebuffer: IO[bytes] - Custom write buffer
Returns:
Reader, MultipartWriter, or SinglepartWriter instance
"""
def iter_bucket(bucket_name, prefix='', accept_key=None, key_limit=None,
workers=16, retries=3, **session_kwargs):
"""Iterate over S3 bucket contents in parallel.
Parameters:
bucket_name: str - S3 bucket name
prefix: str - Key prefix filter
accept_key: callable - Function to filter keys (key) -> bool
key_limit: int - Maximum number of keys to process
workers: int - Number of parallel download workers
retries: int - Number of retry attempts per download
**session_kwargs: Additional boto3.Session() parameters
Yields:
tuple[str, bytes] - (key_name, content) pairs
"""
def parse_uri(uri_as_string):
"""Parse S3 URI into components.
Returns:
dict with keys: scheme, bucket_id, key_id, port, host,
ordinary_calling_format, access_id, access_secret
"""class Reader(io.BufferedIOBase):
"""S3 object reader with buffering and seeking support."""
def to_boto3(self, resource):
"""Convert to boto3 Object for direct boto3 operations."""
class MultipartWriter(io.BufferedIOBase):
"""S3 multipart upload writer for large objects."""
def terminate(self):
"""Terminate incomplete multipart upload."""
def to_boto3(self, resource):
"""Convert to boto3 Object for direct boto3 operations."""
class SinglepartWriter(io.BufferedIOBase):
"""S3 single-part upload writer for smaller objects."""
def terminate(self):
"""Cancel upload and clean up resources."""
class Retry:
"""S3 retry mechanism for handling transient errors."""
def __init__(self, attempts=3, sleep_seconds=1.0, exceptions=(Exception,)):
"""Configure retry behavior for S3 operations."""# Part size constraints for multipart uploads
MIN_PART_SIZE = 5 * 1024 ** 2 # 5MB minimum part size
DEFAULT_PART_SIZE = 50 * 1024**2 # 50MB default part size
MAX_PART_SIZE = 5 * 1024 ** 3 # 5GB maximum part size
# Buffer size for S3 operations
DEFAULT_BUFFER_SIZE = 128 * 1024 # 128KB default buffer
# Supported S3 schemes
SCHEMES = ("s3", "s3n", "s3u", "s3a")Native GCS integration using google-cloud-storage client library.
def open(bucket_id, blob_id, mode, buffer_size=None,
min_part_size=50*1024**2, client=None, get_blob_kwargs=None,
blob_properties=None, blob_open_kwargs=None):
"""Open GCS blob for reading or writing.
Parameters:
bucket_id: str - GCS bucket name
blob_id: str - Blob name/path
mode: str - File mode
buffer_size: int - I/O buffer size
min_part_size: int - Minimum part size for resumable uploads
client: google.cloud.storage.Client - Custom GCS client
get_blob_kwargs: dict - Additional kwargs for bucket.get_blob()
blob_properties: dict - Properties to set on blob
blob_open_kwargs: dict - Additional kwargs for blob.open()
Returns:
Reader or Writer instance
"""
def parse_uri(uri_as_string):
"""Parse GCS URI into components.
Returns:
dict with keys: scheme, bucket_id, blob_id
"""class Reader:
"""GCS blob reader."""
class Writer:
"""GCS blob writer with resumable upload support."""# GCS-specific configuration
SCHEME = "gs"
# Part size configuration
DEFAULT_MIN_PART_SIZE = 50 * 1024**2 # 50MB minimum part size for resumable uploadsAzure Blob Storage integration using azure-storage-blob SDK.
def open(container_id, blob_id, mode, client=None, blob_kwargs=None,
buffer_size=4*1024**2, min_part_size=64*1024**2, max_concurrency=1):
"""Open Azure blob for reading or writing.
Parameters:
container_id: str - Azure container name
blob_id: str - Blob name/path
mode: str - File mode ('rb' or 'wb')
client: azure.storage.blob.BlobServiceClient - Custom Azure client
blob_kwargs: dict - Additional parameters for BlobClient.commit_block_list
buffer_size: int - I/O buffer size (default: 4MB)
min_part_size: int - Minimum part size for multipart uploads (default: 64MB)
max_concurrency: int - Number of parallel connections (default: 1)
Returns:
Reader or Writer instance
"""
def parse_uri(uri_as_string):
"""Parse Azure blob URI into components.
Returns:
dict with keys: scheme, container_id, blob_id
"""class Reader(io.BufferedIOBase):
"""Azure blob reader."""
class Writer(io.BufferedIOBase):
"""Azure blob writer."""# Azure-specific configuration
SCHEME = "azure"
# Buffer and part size defaults
DEFAULT_BUFFER_SIZE = 4 * 1024**2 # 4MB default buffer size
DEFAULT_MIN_PART_SIZE = 64 * 1024**2 # 64MB minimum part size for multipart uploads
DEFAULT_MAX_CONCURRENCY = 1 # Default number of parallel connectionsfrom smart_open import open
from smart_open.s3 import iter_bucket
# Basic S3 operations
with open('s3://my-bucket/data.txt') as f:
content = f.read()
# Write to S3 with custom parameters
transport_params = {
'min_part_size': 100 * 1024 * 1024, # 100MB parts
'multipart_upload': True,
'client_kwargs': {'region_name': 'us-west-2'}
}
with open('s3://bucket/large-file.dat', 'wb', transport_params=transport_params) as f:
f.write(large_data)
# Iterate over bucket contents
for key, content in iter_bucket('my-bucket', prefix='data/', workers=8):
print(f"Key: {key}, Size: {len(content)} bytes")
# Direct S3 module usage
from smart_open.s3 import open as s3_open
with s3_open('my-bucket', 'path/to/file.txt', 'rb',
client_kwargs={'region_name': 'eu-west-1'}) as f:
data = f.read()# Basic GCS operations
with open('gs://my-bucket/data.json') as f:
data = json.load(f)
# Write to GCS with blob properties
transport_params = {
'blob_properties': {
'content_type': 'application/json',
'metadata': {'source': 'smart-open'}
}
}
with open('gs://bucket/output.json', 'w', transport_params=transport_params) as f:
json.dump(data, f)
# Direct GCS module usage
from smart_open.gcs import open as gcs_open
with gcs_open('my-bucket', 'path/file.txt', 'rb') as f:
content = f.read()# Basic Azure operations
with open('azure://container/blob.txt') as f:
text = f.read()
# Write to Azure with custom client
from azure.storage.blob import BlobServiceClient
client = BlobServiceClient(account_url="https://account.blob.core.windows.net",
credential="access_key")
transport_params = {'client': client}
with open('azure://container/output.txt', 'w', transport_params=transport_params) as f:
f.write('Hello Azure!')
# Direct Azure module usage
from smart_open.azure import open as azure_open
with azure_open('container', 'blob-name', 'rb') as f:
binary_data = f.read()# Using AWS credentials (recommended)
# Set via environment variables, AWS config, or IAM roles
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
# Using transport_params with custom session
import boto3
session = boto3.Session(
aws_access_key_id='your-key',
aws_secret_access_key='your-secret',
region_name='us-east-1'
)
transport_params = {'session': session}
with open('s3://bucket/file.txt', transport_params=transport_params) as f:
data = f.read()
# URL-embedded credentials (not recommended for production)
with open('s3://access_key:secret_key@bucket/file.txt') as f:
data = f.read()# Using service account key file
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service-account.json'
# Using explicit client
from google.cloud import storage
client = storage.Client.from_service_account_json('/path/to/key.json')
transport_params = {'client': client}
with open('gs://bucket/file.txt', transport_params=transport_params) as f:
data = f.read()# Using connection string
from azure.storage.blob import BlobServiceClient
client = BlobServiceClient.from_connection_string("connection_string")
transport_params = {'client': client}
# Using account key
client = BlobServiceClient(
account_url="https://account.blob.core.windows.net",
credential="account_key"
)
transport_params = {'client': client}
with open('azure://container/file.txt', transport_params=transport_params) as f:
data = f.read()# Use multipart uploads for large files
transport_params = {
'multipart_upload': True,
'min_part_size': 100 * 1024 * 1024, # 100MB parts
'buffer_size': 1024 * 1024 # 1MB buffer
}
# Parallel bucket processing
for key, content in iter_bucket('bucket', workers=32, retries=5):
process_content(key, content)
# Custom S3 client with connection pooling
import boto3
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 10}
)
client = boto3.client('s3', config=config)
transport_params = {'client': client}# Use resumable uploads for large files
transport_params = {
'min_part_size': 50 * 1024 * 1024, # 50MB minimum
'blob_open_kwargs': {'timeout': 300}
}
# Custom client with retry configuration
from google.cloud import storage
from google.api_core import retry
client = storage.Client()
transport_params = {
'client': client,
'blob_open_kwargs': {
'retry': retry.Retry(deadline=300)
}
}from smart_open import open
import boto3
from botocore.exceptions import ClientError
try:
with open('s3://bucket/file.txt') as f:
data = f.read()
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchKey':
print("File not found")
elif error_code == 'AccessDenied':
print("Permission denied")
else:
print(f"AWS error: {error_code}")
except Exception as e:
print(f"Other error: {e}")
# Custom retry configuration for S3
from smart_open.s3 import Retry
retry_config = Retry(
attempts=5,
sleep_seconds=2.0,
exceptions=(ClientError,)
)Install with Tessl CLI
npx tessl i tessl/pypi-smart-open