CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-smart-open

Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support

Pending
Overview
Eval results
Files

cloud-storage.mddocs/

Cloud Storage Integration

Access to major cloud storage platforms with native client optimizations and streaming capabilities. Smart-open provides direct integration with AWS S3, Google Cloud Storage, and Azure Blob Storage through their respective native SDKs.

Capabilities

Amazon S3 Operations

Comprehensive S3 integration with support for multipart uploads, parallel bucket iteration, and advanced client configurations.

# Main S3 functions
def open(bucket_id, key_id, mode, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE, 
         min_part_size=DEFAULT_PART_SIZE, multipart_upload=True, defer_seek=False, 
         client=None, client_kwargs=None, writebuffer=None):
    """Open S3 object for reading or writing.
    
    Parameters:
        bucket_id: str - S3 bucket name
        key_id: str - S3 object key
        mode: str - File mode ('rb' or 'wb')
        version_id: str - Specific object version (for reading)
        buffer_size: int - I/O buffer size (default: 128KB)
        min_part_size: int - Minimum multipart size (default: 50MB)
        multipart_upload: bool - Use multipart upload API for writes
        defer_seek: bool - Defer GetObject call until first read/seek
        client: boto3.S3.Client - Custom S3 client
        client_kwargs: dict - Additional client method parameters
        writebuffer: IO[bytes] - Custom write buffer
        
    Returns:
        Reader, MultipartWriter, or SinglepartWriter instance
    """

def iter_bucket(bucket_name, prefix='', accept_key=None, key_limit=None, 
                workers=16, retries=3, **session_kwargs):
    """Iterate over S3 bucket contents in parallel.
    
    Parameters:
        bucket_name: str - S3 bucket name
        prefix: str - Key prefix filter
        accept_key: callable - Function to filter keys (key) -> bool
        key_limit: int - Maximum number of keys to process
        workers: int - Number of parallel download workers
        retries: int - Number of retry attempts per download
        **session_kwargs: Additional boto3.Session() parameters
        
    Yields:
        tuple[str, bytes] - (key_name, content) pairs
    """

def parse_uri(uri_as_string):
    """Parse S3 URI into components.
    
    Returns:
        dict with keys: scheme, bucket_id, key_id, port, host, 
        ordinary_calling_format, access_id, access_secret
    """

S3 Classes

class Reader(io.BufferedIOBase):
    """S3 object reader with buffering and seeking support."""
    
    def to_boto3(self, resource): 
        """Convert to boto3 Object for direct boto3 operations."""

class MultipartWriter(io.BufferedIOBase):
    """S3 multipart upload writer for large objects."""
    
    def terminate(self): 
        """Terminate incomplete multipart upload."""
    
    def to_boto3(self, resource):
        """Convert to boto3 Object for direct boto3 operations."""

class SinglepartWriter(io.BufferedIOBase):
    """S3 single-part upload writer for smaller objects."""
    
    def terminate(self):
        """Cancel upload and clean up resources."""

class Retry:
    """S3 retry mechanism for handling transient errors."""
    
    def __init__(self, attempts=3, sleep_seconds=1.0, exceptions=(Exception,)):
        """Configure retry behavior for S3 operations."""

S3 Constants

# Part size constraints for multipart uploads
MIN_PART_SIZE = 5 * 1024 ** 2      # 5MB minimum part size
DEFAULT_PART_SIZE = 50 * 1024**2   # 50MB default part size
MAX_PART_SIZE = 5 * 1024 ** 3      # 5GB maximum part size

# Buffer size for S3 operations
DEFAULT_BUFFER_SIZE = 128 * 1024   # 128KB default buffer

# Supported S3 schemes
SCHEMES = ("s3", "s3n", "s3u", "s3a")

Google Cloud Storage Operations

Native GCS integration using google-cloud-storage client library.

def open(bucket_id, blob_id, mode, buffer_size=None, 
         min_part_size=50*1024**2, client=None, get_blob_kwargs=None,
         blob_properties=None, blob_open_kwargs=None):
    """Open GCS blob for reading or writing.
    
    Parameters:
        bucket_id: str - GCS bucket name
        blob_id: str - Blob name/path
        mode: str - File mode
        buffer_size: int - I/O buffer size
        min_part_size: int - Minimum part size for resumable uploads
        client: google.cloud.storage.Client - Custom GCS client
        get_blob_kwargs: dict - Additional kwargs for bucket.get_blob()
        blob_properties: dict - Properties to set on blob
        blob_open_kwargs: dict - Additional kwargs for blob.open()
        
    Returns:
        Reader or Writer instance
    """

def parse_uri(uri_as_string):
    """Parse GCS URI into components.
    
    Returns:
        dict with keys: scheme, bucket_id, blob_id
    """

GCS Classes

class Reader:
    """GCS blob reader."""

class Writer:
    """GCS blob writer with resumable upload support."""

GCS Constants

# GCS-specific configuration
SCHEME = "gs"

# Part size configuration
DEFAULT_MIN_PART_SIZE = 50 * 1024**2    # 50MB minimum part size for resumable uploads

Azure Blob Storage Operations

Azure Blob Storage integration using azure-storage-blob SDK.

def open(container_id, blob_id, mode, client=None, blob_kwargs=None,
         buffer_size=4*1024**2, min_part_size=64*1024**2, max_concurrency=1):
    """Open Azure blob for reading or writing.
    
    Parameters:
        container_id: str - Azure container name
        blob_id: str - Blob name/path
        mode: str - File mode ('rb' or 'wb')
        client: azure.storage.blob.BlobServiceClient - Custom Azure client
        blob_kwargs: dict - Additional parameters for BlobClient.commit_block_list
        buffer_size: int - I/O buffer size (default: 4MB)
        min_part_size: int - Minimum part size for multipart uploads (default: 64MB)
        max_concurrency: int - Number of parallel connections (default: 1)
        
    Returns:
        Reader or Writer instance
    """

def parse_uri(uri_as_string):
    """Parse Azure blob URI into components.
    
    Returns:
        dict with keys: scheme, container_id, blob_id
    """

Azure Classes

class Reader(io.BufferedIOBase):
    """Azure blob reader."""

class Writer(io.BufferedIOBase):
    """Azure blob writer."""

Azure Constants

# Azure-specific configuration
SCHEME = "azure"

# Buffer and part size defaults
DEFAULT_BUFFER_SIZE = 4 * 1024**2       # 4MB default buffer size
DEFAULT_MIN_PART_SIZE = 64 * 1024**2    # 64MB minimum part size for multipart uploads
DEFAULT_MAX_CONCURRENCY = 1             # Default number of parallel connections

Usage Examples

AWS S3 Examples

from smart_open import open
from smart_open.s3 import iter_bucket

# Basic S3 operations
with open('s3://my-bucket/data.txt') as f:
    content = f.read()

# Write to S3 with custom parameters
transport_params = {
    'min_part_size': 100 * 1024 * 1024,  # 100MB parts
    'multipart_upload': True,
    'client_kwargs': {'region_name': 'us-west-2'}
}
with open('s3://bucket/large-file.dat', 'wb', transport_params=transport_params) as f:
    f.write(large_data)

# Iterate over bucket contents
for key, content in iter_bucket('my-bucket', prefix='data/', workers=8):
    print(f"Key: {key}, Size: {len(content)} bytes")

# Direct S3 module usage
from smart_open.s3 import open as s3_open

with s3_open('my-bucket', 'path/to/file.txt', 'rb', 
            client_kwargs={'region_name': 'eu-west-1'}) as f:
    data = f.read()

Google Cloud Storage Examples

# Basic GCS operations
with open('gs://my-bucket/data.json') as f:
    data = json.load(f)

# Write to GCS with blob properties
transport_params = {
    'blob_properties': {
        'content_type': 'application/json',
        'metadata': {'source': 'smart-open'}
    }
}
with open('gs://bucket/output.json', 'w', transport_params=transport_params) as f:
    json.dump(data, f)

# Direct GCS module usage
from smart_open.gcs import open as gcs_open

with gcs_open('my-bucket', 'path/file.txt', 'rb') as f:
    content = f.read()

Azure Blob Storage Examples

# Basic Azure operations
with open('azure://container/blob.txt') as f:
    text = f.read()

# Write to Azure with custom client
from azure.storage.blob import BlobServiceClient

client = BlobServiceClient(account_url="https://account.blob.core.windows.net",
                          credential="access_key")
transport_params = {'client': client}

with open('azure://container/output.txt', 'w', transport_params=transport_params) as f:
    f.write('Hello Azure!')

# Direct Azure module usage
from smart_open.azure import open as azure_open

with azure_open('container', 'blob-name', 'rb') as f:
    binary_data = f.read()

Authentication

AWS S3 Authentication

# Using AWS credentials (recommended)
# Set via environment variables, AWS config, or IAM roles
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN

# Using transport_params with custom session
import boto3
session = boto3.Session(
    aws_access_key_id='your-key',
    aws_secret_access_key='your-secret',
    region_name='us-east-1'
)
transport_params = {'session': session}

with open('s3://bucket/file.txt', transport_params=transport_params) as f:
    data = f.read()

# URL-embedded credentials (not recommended for production)
with open('s3://access_key:secret_key@bucket/file.txt') as f:
    data = f.read()

Google Cloud Authentication

# Using service account key file
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service-account.json'

# Using explicit client
from google.cloud import storage
client = storage.Client.from_service_account_json('/path/to/key.json')
transport_params = {'client': client}

with open('gs://bucket/file.txt', transport_params=transport_params) as f:
    data = f.read()

Azure Authentication

# Using connection string
from azure.storage.blob import BlobServiceClient
client = BlobServiceClient.from_connection_string("connection_string")
transport_params = {'client': client}

# Using account key
client = BlobServiceClient(
    account_url="https://account.blob.core.windows.net",
    credential="account_key"
)
transport_params = {'client': client}

with open('azure://container/file.txt', transport_params=transport_params) as f:
    data = f.read()

Performance Optimization

S3 Performance Tips

# Use multipart uploads for large files
transport_params = {
    'multipart_upload': True,
    'min_part_size': 100 * 1024 * 1024,  # 100MB parts
    'buffer_size': 1024 * 1024  # 1MB buffer
}

# Parallel bucket processing
for key, content in iter_bucket('bucket', workers=32, retries=5):
    process_content(key, content)

# Custom S3 client with connection pooling
import boto3
from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 10}
)
client = boto3.client('s3', config=config)
transport_params = {'client': client}

GCS Performance Tips

# Use resumable uploads for large files
transport_params = {
    'min_part_size': 50 * 1024 * 1024,  # 50MB minimum
    'blob_open_kwargs': {'timeout': 300}
}

# Custom client with retry configuration
from google.cloud import storage
from google.api_core import retry

client = storage.Client()
transport_params = {
    'client': client,
    'blob_open_kwargs': {
        'retry': retry.Retry(deadline=300)
    }
}

Error Handling and Retries

from smart_open import open
import boto3
from botocore.exceptions import ClientError

try:
    with open('s3://bucket/file.txt') as f:
        data = f.read()
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'NoSuchKey':
        print("File not found")
    elif error_code == 'AccessDenied':
        print("Permission denied")
    else:
        print(f"AWS error: {error_code}")
except Exception as e:
    print(f"Other error: {e}")

# Custom retry configuration for S3
from smart_open.s3 import Retry
retry_config = Retry(
    attempts=5,
    sleep_seconds=2.0,
    exceptions=(ClientError,)
)

Install with Tessl CLI

npx tessl i tessl/pypi-smart-open

docs

big-data.md

cloud-storage.md

compression.md

core-operations.md

index.md

network-access.md

utilities.md

tile.json