CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-smart-open

Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Advanced Usage

Helper functions for URI handling, byte ranges, parallel processing, and custom transport development. These utilities provide advanced functionality and extensibility for power users and library developers.

Capabilities

URI and URL Utilities

Advanced URI parsing and manipulation functions.

def safe_urlsplit(url):
    """URL split that handles question marks in S3/GS URLs.
    
    Parameters:
        url: str - URL to split, may contain question marks in path
        
    Returns:
        urllib.parse.SplitResult - Parsed URL components
        
    Notes:
        Handles special case where S3/GCS object keys contain '?' characters
        which would normally be interpreted as query string delimiters
    """

def make_range_string(start=None, stop=None):
    """Create HTTP byte range specifier string.
    
    Parameters:
        start: int - Starting byte position (inclusive)
        stop: int - Ending byte position (inclusive)
        
    Returns:
        str - Range string like 'bytes=0-1023' or 'bytes=1000-'
        
    Notes:
        Used for HTTP Range requests and partial content retrieval
    """

def parse_content_range(content_range):
    """Parse HTTP Content-Range header value.
    
    Parameters:
        content_range: str - Content-Range header value
        
    Returns:
        tuple - (units, start, stop, length) parsed from header
        
    Example:
        parse_content_range('bytes 0-1023/2048') -> ('bytes', 0, 1023, 2048)
    """

Utility Constants

# URL schemes requiring special handling
WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']

# Placeholder for question marks in URLs
QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///'

Function Introspection

Utilities for examining and validating function parameters.

def inspect_kwargs(callable_obj):
    """Inspect function signature for supported keyword arguments.
    
    Parameters:
        callable_obj: callable - Function or method to inspect
        
    Returns:
        dict - Mapping of argument names to their default values
        
    Notes:
        Used internally to validate transport_params against function signatures
    """

def check_kwargs(callable_obj, kwargs):
    """Filter kwargs to only include supported parameters, log warnings for unsupported.
    
    Parameters:
        callable_obj: callable - Function to check against
        kwargs: dict - Keyword arguments to filter
        
    Returns:
        dict - Filtered kwargs containing only supported parameters
        
    Notes:
        Logs warnings for unsupported kwargs that are filtered out
    """

def clamp(value, minval=0, maxval=None):
    """Clamp numeric value to specified range.
    
    Parameters:
        value: number - Value to clamp
        minval: number - Minimum allowed value
        maxval: number - Maximum allowed value (None for no limit)
        
    Returns:
        number - Clamped value within [minval, maxval] range
    """

Enhanced I/O Classes

Advanced file-like object wrappers with additional functionality.

class TextIOWrapper(io.TextIOWrapper):
    """Enhanced TextIOWrapper with improved exception handling.
    
    Provides better error reporting and handling for text mode operations
    over binary file objects from various transport layers.
    """

class FileLikeProxy(wrapt.ObjectProxy):
    """Proxy that manages relationships between inner and outer file objects.
    
    Coordinates operations between compression layers and transport layers,
    ensuring proper resource management and method delegation.
    """

Concurrency Utilities

Parallel processing support for I/O operations.

def create_pool(processes=1):
    """Create process or thread pool for parallel operations.
    
    Parameters:
        processes: int - Number of worker processes/threads
        
    Returns:
        Pool object with imap_unordered() method for parallel iteration
        
    Notes:
        Automatically selects between multiprocessing and threading based on availability
        Returns DummyPool for single-process fallback when multiprocessing unavailable
    """

class DummyPool:
    """Fallback pool implementation when multiprocessing is unavailable.
    
    Provides same interface as multiprocessing.Pool but executes sequentially.
    """
    
    def imap_unordered(self, func, iterable):
        """Sequential map implementation."""

class ConcurrentFuturesPool:
    """Thread-based pool using concurrent.futures.ThreadPoolExecutor.
    
    Alternative to multiprocessing for I/O-bound parallel operations.
    """
    
    def imap_unordered(self, func, iterable):
        """Parallel map using thread pool."""

Byte Buffer Operations

Efficient byte buffer for network I/O operations.

class ByteBuffer:
    """Efficient byte buffer for streaming network I/O.
    
    Provides buffering layer between network reads and application consumption,
    optimizing for both small reads and bulk operations.
    """
    
    def fill(self, reader):
        """Fill buffer from reader function.
        
        Parameters:
            reader: callable - Function that returns bytes when called
        """
    
    def read(self, size=-1):
        """Read bytes from buffer.
        
        Parameters:
            size: int - Number of bytes to read (-1 for all available)
            
        Returns:
            bytes - Data read from buffer
        """
    
    def peek(self):
        """Peek at buffer contents without consuming.
        
        Returns:
            bytes - Current buffer contents
        """
    
    def readline(self, terminator=b'\n'):
        """Read line from buffer up to terminator.
        
        Parameters:
            terminator: bytes - Line terminator to search for
            
        Returns:
            bytes - Line including terminator
        """
    
    def empty(self):
        """Empty the buffer, discarding all contents."""

Transport System

Transport registration and management system.

def register_transport(submodule):
    """Register transport module for URI schemes.
    
    Parameters:
        submodule: module or str - Transport module or module name to register
        
    Notes:
        Module must have SCHEME/SCHEMES attribute and open, open_uri, parse_uri functions
        Automatically handles import errors for optional dependencies
    """

def get_transport(scheme):
    """Get transport module for URI scheme.
    
    Parameters:
        scheme: str - URI scheme (e.g., 's3', 'http', 'ftp')
        
    Returns:
        module - Transport module implementing the scheme
        
    Raises:
        ImportError - If required dependencies for scheme are missing
        NotImplementedError - If scheme is not supported
    """

# Transport registry constants
NO_SCHEME = ''  # Used for local file operations
SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys()))  # All registered schemes

Core Constants

Shared constants used throughout the library.

# Binary mode constants
READ_BINARY = 'rb'
WRITE_BINARY = 'wb' 
BINARY_MODES = (READ_BINARY, WRITE_BINARY)
BINARY_NEWLINE = b'\n'

# Seek operation constants
WHENCE_START = 0    # Seek from beginning of file
WHENCE_CURRENT = 1  # Seek from current position
WHENCE_END = 2      # Seek from end of file
WHENCE_CHOICES = (WHENCE_START, WHENCE_CURRENT, WHENCE_END)

Usage Examples

URI Manipulation

from smart_open.utils import safe_urlsplit, make_range_string, parse_content_range

# Handle URLs with question marks in path (common in S3/GCS)
problematic_url = 's3://bucket/file?with?questions.txt'
parsed = safe_urlsplit(problematic_url)
print(f"Scheme: {parsed.scheme}, Path: {parsed.path}")

# Create HTTP range requests
range_header = make_range_string(0, 1023)  # "bytes=0-1023"
range_header_open = make_range_string(1000)  # "bytes=1000-"

# Parse Content-Range responses
content_range = "bytes 0-1023/2048"
units, start, stop, total = parse_content_range(content_range)
print(f"Retrieved bytes {start}-{stop} of {total} total")

Parameter Validation

from smart_open.utils import inspect_kwargs, check_kwargs

def my_transport_function(uri, mode, custom_param=None, buffer_size=8192):
    """Example transport function."""
    pass

# Inspect function signature
supported_params = inspect_kwargs(my_transport_function)
print(f"Supported parameters: {list(supported_params.keys())}")

# Filter user-provided parameters
user_params = {
    'custom_param': 'value',
    'buffer_size': 4096,
    'unsupported_param': 'ignored'  # This will be filtered out
}
valid_params = check_kwargs(my_transport_function, user_params)
# valid_params = {'custom_param': 'value', 'buffer_size': 4096}

Parallel Processing

from smart_open.concurrency import create_pool
from smart_open import open

def process_file(uri):
    """Process a single file and return results."""
    with open(uri, 'rb') as f:
        content = f.read()
        return len(content)  # Example: return file size

# Parallel file processing
file_uris = [
    's3://bucket/file1.txt',
    's3://bucket/file2.txt', 
    'gs://bucket/file3.txt',
    'azure://container/file4.txt'
]

# Process files in parallel
with create_pool(processes=4) as pool:
    results = list(pool.imap_unordered(process_file, file_uris))

print(f"Processed {len(results)} files, total size: {sum(results)} bytes")

# Sequential fallback when multiprocessing unavailable
from smart_open.concurrency import DummyPool

with DummyPool() as pool:
    results = list(pool.imap_unordered(process_file, file_uris))

Byte Buffer Usage

from smart_open.bytebuffer import ByteBuffer
import socket

# Network reading with buffering
def read_from_socket(sock):
    buffer = ByteBuffer()
    
    # Fill buffer from socket
    def socket_reader():
        try:
            return sock.recv(4096)
        except socket.timeout:
            return b''
    
    buffer.fill(socket_reader)
    
    # Read lines from buffer
    while True:
        try:
            line = buffer.readline()
            if not line:
                break
            yield line.decode('utf-8').strip()
        except Exception:
            break

# Usage with smart-open
from smart_open import open

with open('http://example.com/stream-data.txt', 'rb') as f:
    buffer = ByteBuffer()
    
    # Fill buffer from HTTP stream
    buffer.fill(lambda: f.read(8192))
    
    # Process line by line
    while True:
        line = buffer.readline()
        if not line:
            break
        process_line(line)

Custom Transport Development

from smart_open.transport import register_transport

# Create custom transport module
class CustomTransport:
    SCHEME = 'custom'
    
    @staticmethod
    def parse_uri(uri_as_string):
        """Parse custom URI format."""
        # Implementation specific to custom scheme
        return {'scheme': 'custom', 'path': uri_as_string[9:]}  # Remove 'custom://'
    
    @staticmethod
    def open_uri(uri, mode, transport_params):
        """Open custom URI with transport parameters."""
        parsed = CustomTransport.parse_uri(uri)
        return CustomTransport.open(parsed['path'], mode)
    
    @staticmethod
    def open(path, mode):
        """Open custom resource."""
        # Custom implementation
        if 'r' in mode:
            return CustomReader(path)
        else:
            return CustomWriter(path)

class CustomReader:
    def __init__(self, path):
        self.path = path
        self._closed = False
    
    def read(self, size=-1):
        """Read from custom source."""
        return f"Data from {self.path}".encode()
    
    def close(self):
        self._closed = True

class CustomWriter:
    def __init__(self, path):
        self.path = path
        self._closed = False
    
    def write(self, data):
        """Write to custom destination."""
        print(f"Writing to {self.path}: {data}")
        return len(data)
    
    def close(self):
        self._closed = True

# Register the custom transport
register_transport(CustomTransport)

# Now custom:// URLs work with smart-open
from smart_open import open

with open('custom://my-resource', 'rb') as f:
    data = f.read()

with open('custom://output-resource', 'wb') as f:
    f.write(b'Hello custom transport!')

Advanced HTTP Operations

from smart_open.utils import make_range_string
from smart_open import open
import requests

# Partial file downloads using Range requests
def download_file_range(url, start_byte, end_byte):
    """Download specific byte range from HTTP resource."""
    range_header = make_range_string(start_byte, end_byte)
    transport_params = {
        'headers': {'Range': range_header}
    }
    
    with open(url, 'rb', transport_params=transport_params) as f:
        return f.read()

# Download file in chunks
def chunked_download(url, chunk_size=1024*1024):
    """Download large file in chunks to avoid memory issues."""
    # First, get file size
    response = requests.head(url)
    content_length = int(response.headers.get('Content-Length', 0))
    
    chunks = []
    for start in range(0, content_length, chunk_size):
        end = min(start + chunk_size - 1, content_length - 1)
        chunk = download_file_range(url, start, end)
        chunks.append(chunk)
    
    return b''.join(chunks)

# Usage
large_file_data = chunked_download('https://example.com/large-file.dat')

Error Handling and Debugging

from smart_open.utils import check_kwargs
from smart_open.transport import get_transport
import logging

# Enable debug logging for transport operations
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('smart_open.transport')

def debug_transport_params(uri, transport_params):
    """Debug transport parameter compatibility."""
    from urllib.parse import urlparse
    
    scheme = urlparse(uri).scheme or ''
    
    try:
        transport = get_transport(scheme)
        
        # Check if transport_params are compatible with transport.open
        if hasattr(transport, 'open'):
            valid_params = check_kwargs(transport.open, transport_params)
            invalid_params = set(transport_params.keys()) - set(valid_params.keys())
            
            if invalid_params:
                logger.warning(f"Invalid transport_params for {scheme}: {invalid_params}")
            
            return valid_params
    except Exception as e:
        logger.error(f"Transport parameter validation failed: {e}")
        return transport_params

# Usage
uri = 's3://my-bucket/file.txt'
params = {
    'buffer_size': 1024*1024,
    'multipart_upload': True,
    'invalid_param': 'ignored'
}

valid_params = debug_transport_params(uri, params)
print(f"Valid parameters: {valid_params}")

Performance Monitoring

import time
import functools
from smart_open import open

def timing_decorator(func):
    """Decorator to measure function execution time."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
        return result
    return wrapper

@timing_decorator
def benchmark_read(uri, chunk_size=8192):
    """Benchmark file reading performance."""
    total_bytes = 0
    with open(uri, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            total_bytes += len(chunk)
    return total_bytes

# Compare different chunk sizes
uris_to_test = [
    's3://test-bucket/large-file.dat',
    'gs://test-bucket/large-file.dat',
    'azure://container/large-file.dat'
]

chunk_sizes = [4096, 8192, 16384, 32768, 65536]

for uri in uris_to_test:
    print(f"\nTesting {uri}:")
    for chunk_size in chunk_sizes:
        total_bytes = benchmark_read(uri, chunk_size)
        print(f"  Chunk size {chunk_size}: {total_bytes} bytes")

Best Practices

Transport Parameter Validation

Always validate transport parameters to avoid runtime surprises:

from smart_open.utils import check_kwargs
from smart_open.s3 import open as s3_open

# Validate parameters before use
proposed_params = {
    'buffer_size': 1024*1024,
    'multipart_upload': True,
    'typo_in_parameter_name': 'ignored'
}

valid_params = check_kwargs(s3_open, proposed_params)
# Use valid_params instead of proposed_params

URI Handling

Use safe_urlsplit for URLs that might contain special characters:

from smart_open.utils import safe_urlsplit

# Safer than urllib.parse.urlsplit for cloud storage URLs
uri = 's3://bucket/file?with?questions.txt'
parsed = safe_urlsplit(uri)

Parallel Processing

Choose appropriate parallelism based on I/O characteristics:

# CPU-bound: Use multiprocessing
with create_pool(processes=cpu_count()) as pool:
    results = list(pool.imap_unordered(cpu_intensive_func, items))

# I/O-bound: Use threading
from smart_open.concurrency import ConcurrentFuturesPool
with ConcurrentFuturesPool(max_workers=20) as pool:
    results = list(pool.imap_unordered(io_intensive_func, items))

Install with Tessl CLI

npx tessl i tessl/pypi-smart-open

docs

big-data.md

cloud-storage.md

compression.md

core-operations.md

index.md

network-access.md

utilities.md

tile.json