Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support
—
Helper functions for URI handling, byte ranges, parallel processing, and custom transport development. These utilities provide advanced functionality and extensibility for power users and library developers.
Advanced URI parsing and manipulation functions.
def safe_urlsplit(url):
"""URL split that handles question marks in S3/GS URLs.
Parameters:
url: str - URL to split, may contain question marks in path
Returns:
urllib.parse.SplitResult - Parsed URL components
Notes:
Handles special case where S3/GCS object keys contain '?' characters
which would normally be interpreted as query string delimiters
"""
def make_range_string(start=None, stop=None):
"""Create HTTP byte range specifier string.
Parameters:
start: int - Starting byte position (inclusive)
stop: int - Ending byte position (inclusive)
Returns:
str - Range string like 'bytes=0-1023' or 'bytes=1000-'
Notes:
Used for HTTP Range requests and partial content retrieval
"""
def parse_content_range(content_range):
"""Parse HTTP Content-Range header value.
Parameters:
content_range: str - Content-Range header value
Returns:
tuple - (units, start, stop, length) parsed from header
Example:
parse_content_range('bytes 0-1023/2048') -> ('bytes', 0, 1023, 2048)
"""# URL schemes requiring special handling
WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']
# Placeholder for question marks in URLs
QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///'Utilities for examining and validating function parameters.
def inspect_kwargs(callable_obj):
"""Inspect function signature for supported keyword arguments.
Parameters:
callable_obj: callable - Function or method to inspect
Returns:
dict - Mapping of argument names to their default values
Notes:
Used internally to validate transport_params against function signatures
"""
def check_kwargs(callable_obj, kwargs):
"""Filter kwargs to only include supported parameters, log warnings for unsupported.
Parameters:
callable_obj: callable - Function to check against
kwargs: dict - Keyword arguments to filter
Returns:
dict - Filtered kwargs containing only supported parameters
Notes:
Logs warnings for unsupported kwargs that are filtered out
"""
def clamp(value, minval=0, maxval=None):
"""Clamp numeric value to specified range.
Parameters:
value: number - Value to clamp
minval: number - Minimum allowed value
maxval: number - Maximum allowed value (None for no limit)
Returns:
number - Clamped value within [minval, maxval] range
"""Advanced file-like object wrappers with additional functionality.
class TextIOWrapper(io.TextIOWrapper):
"""Enhanced TextIOWrapper with improved exception handling.
Provides better error reporting and handling for text mode operations
over binary file objects from various transport layers.
"""
class FileLikeProxy(wrapt.ObjectProxy):
"""Proxy that manages relationships between inner and outer file objects.
Coordinates operations between compression layers and transport layers,
ensuring proper resource management and method delegation.
"""Parallel processing support for I/O operations.
def create_pool(processes=1):
"""Create process or thread pool for parallel operations.
Parameters:
processes: int - Number of worker processes/threads
Returns:
Pool object with imap_unordered() method for parallel iteration
Notes:
Automatically selects between multiprocessing and threading based on availability
Returns DummyPool for single-process fallback when multiprocessing unavailable
"""
class DummyPool:
"""Fallback pool implementation when multiprocessing is unavailable.
Provides same interface as multiprocessing.Pool but executes sequentially.
"""
def imap_unordered(self, func, iterable):
"""Sequential map implementation."""
class ConcurrentFuturesPool:
"""Thread-based pool using concurrent.futures.ThreadPoolExecutor.
Alternative to multiprocessing for I/O-bound parallel operations.
"""
def imap_unordered(self, func, iterable):
"""Parallel map using thread pool."""Efficient byte buffer for network I/O operations.
class ByteBuffer:
"""Efficient byte buffer for streaming network I/O.
Provides buffering layer between network reads and application consumption,
optimizing for both small reads and bulk operations.
"""
def fill(self, reader):
"""Fill buffer from reader function.
Parameters:
reader: callable - Function that returns bytes when called
"""
def read(self, size=-1):
"""Read bytes from buffer.
Parameters:
size: int - Number of bytes to read (-1 for all available)
Returns:
bytes - Data read from buffer
"""
def peek(self):
"""Peek at buffer contents without consuming.
Returns:
bytes - Current buffer contents
"""
def readline(self, terminator=b'\n'):
"""Read line from buffer up to terminator.
Parameters:
terminator: bytes - Line terminator to search for
Returns:
bytes - Line including terminator
"""
def empty(self):
"""Empty the buffer, discarding all contents."""Transport registration and management system.
def register_transport(submodule):
"""Register transport module for URI schemes.
Parameters:
submodule: module or str - Transport module or module name to register
Notes:
Module must have SCHEME/SCHEMES attribute and open, open_uri, parse_uri functions
Automatically handles import errors for optional dependencies
"""
def get_transport(scheme):
"""Get transport module for URI scheme.
Parameters:
scheme: str - URI scheme (e.g., 's3', 'http', 'ftp')
Returns:
module - Transport module implementing the scheme
Raises:
ImportError - If required dependencies for scheme are missing
NotImplementedError - If scheme is not supported
"""
# Transport registry constants
NO_SCHEME = '' # Used for local file operations
SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys())) # All registered schemesShared constants used throughout the library.
# Binary mode constants
READ_BINARY = 'rb'
WRITE_BINARY = 'wb'
BINARY_MODES = (READ_BINARY, WRITE_BINARY)
BINARY_NEWLINE = b'\n'
# Seek operation constants
WHENCE_START = 0 # Seek from beginning of file
WHENCE_CURRENT = 1 # Seek from current position
WHENCE_END = 2 # Seek from end of file
WHENCE_CHOICES = (WHENCE_START, WHENCE_CURRENT, WHENCE_END)from smart_open.utils import safe_urlsplit, make_range_string, parse_content_range
# Handle URLs with question marks in path (common in S3/GCS)
problematic_url = 's3://bucket/file?with?questions.txt'
parsed = safe_urlsplit(problematic_url)
print(f"Scheme: {parsed.scheme}, Path: {parsed.path}")
# Create HTTP range requests
range_header = make_range_string(0, 1023) # "bytes=0-1023"
range_header_open = make_range_string(1000) # "bytes=1000-"
# Parse Content-Range responses
content_range = "bytes 0-1023/2048"
units, start, stop, total = parse_content_range(content_range)
print(f"Retrieved bytes {start}-{stop} of {total} total")from smart_open.utils import inspect_kwargs, check_kwargs
def my_transport_function(uri, mode, custom_param=None, buffer_size=8192):
"""Example transport function."""
pass
# Inspect function signature
supported_params = inspect_kwargs(my_transport_function)
print(f"Supported parameters: {list(supported_params.keys())}")
# Filter user-provided parameters
user_params = {
'custom_param': 'value',
'buffer_size': 4096,
'unsupported_param': 'ignored' # This will be filtered out
}
valid_params = check_kwargs(my_transport_function, user_params)
# valid_params = {'custom_param': 'value', 'buffer_size': 4096}from smart_open.concurrency import create_pool
from smart_open import open
def process_file(uri):
"""Process a single file and return results."""
with open(uri, 'rb') as f:
content = f.read()
return len(content) # Example: return file size
# Parallel file processing
file_uris = [
's3://bucket/file1.txt',
's3://bucket/file2.txt',
'gs://bucket/file3.txt',
'azure://container/file4.txt'
]
# Process files in parallel
with create_pool(processes=4) as pool:
results = list(pool.imap_unordered(process_file, file_uris))
print(f"Processed {len(results)} files, total size: {sum(results)} bytes")
# Sequential fallback when multiprocessing unavailable
from smart_open.concurrency import DummyPool
with DummyPool() as pool:
results = list(pool.imap_unordered(process_file, file_uris))from smart_open.bytebuffer import ByteBuffer
import socket
# Network reading with buffering
def read_from_socket(sock):
buffer = ByteBuffer()
# Fill buffer from socket
def socket_reader():
try:
return sock.recv(4096)
except socket.timeout:
return b''
buffer.fill(socket_reader)
# Read lines from buffer
while True:
try:
line = buffer.readline()
if not line:
break
yield line.decode('utf-8').strip()
except Exception:
break
# Usage with smart-open
from smart_open import open
with open('http://example.com/stream-data.txt', 'rb') as f:
buffer = ByteBuffer()
# Fill buffer from HTTP stream
buffer.fill(lambda: f.read(8192))
# Process line by line
while True:
line = buffer.readline()
if not line:
break
process_line(line)from smart_open.transport import register_transport
# Create custom transport module
class CustomTransport:
SCHEME = 'custom'
@staticmethod
def parse_uri(uri_as_string):
"""Parse custom URI format."""
# Implementation specific to custom scheme
return {'scheme': 'custom', 'path': uri_as_string[9:]} # Remove 'custom://'
@staticmethod
def open_uri(uri, mode, transport_params):
"""Open custom URI with transport parameters."""
parsed = CustomTransport.parse_uri(uri)
return CustomTransport.open(parsed['path'], mode)
@staticmethod
def open(path, mode):
"""Open custom resource."""
# Custom implementation
if 'r' in mode:
return CustomReader(path)
else:
return CustomWriter(path)
class CustomReader:
def __init__(self, path):
self.path = path
self._closed = False
def read(self, size=-1):
"""Read from custom source."""
return f"Data from {self.path}".encode()
def close(self):
self._closed = True
class CustomWriter:
def __init__(self, path):
self.path = path
self._closed = False
def write(self, data):
"""Write to custom destination."""
print(f"Writing to {self.path}: {data}")
return len(data)
def close(self):
self._closed = True
# Register the custom transport
register_transport(CustomTransport)
# Now custom:// URLs work with smart-open
from smart_open import open
with open('custom://my-resource', 'rb') as f:
data = f.read()
with open('custom://output-resource', 'wb') as f:
f.write(b'Hello custom transport!')from smart_open.utils import make_range_string
from smart_open import open
import requests
# Partial file downloads using Range requests
def download_file_range(url, start_byte, end_byte):
"""Download specific byte range from HTTP resource."""
range_header = make_range_string(start_byte, end_byte)
transport_params = {
'headers': {'Range': range_header}
}
with open(url, 'rb', transport_params=transport_params) as f:
return f.read()
# Download file in chunks
def chunked_download(url, chunk_size=1024*1024):
"""Download large file in chunks to avoid memory issues."""
# First, get file size
response = requests.head(url)
content_length = int(response.headers.get('Content-Length', 0))
chunks = []
for start in range(0, content_length, chunk_size):
end = min(start + chunk_size - 1, content_length - 1)
chunk = download_file_range(url, start, end)
chunks.append(chunk)
return b''.join(chunks)
# Usage
large_file_data = chunked_download('https://example.com/large-file.dat')from smart_open.utils import check_kwargs
from smart_open.transport import get_transport
import logging
# Enable debug logging for transport operations
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('smart_open.transport')
def debug_transport_params(uri, transport_params):
"""Debug transport parameter compatibility."""
from urllib.parse import urlparse
scheme = urlparse(uri).scheme or ''
try:
transport = get_transport(scheme)
# Check if transport_params are compatible with transport.open
if hasattr(transport, 'open'):
valid_params = check_kwargs(transport.open, transport_params)
invalid_params = set(transport_params.keys()) - set(valid_params.keys())
if invalid_params:
logger.warning(f"Invalid transport_params for {scheme}: {invalid_params}")
return valid_params
except Exception as e:
logger.error(f"Transport parameter validation failed: {e}")
return transport_params
# Usage
uri = 's3://my-bucket/file.txt'
params = {
'buffer_size': 1024*1024,
'multipart_upload': True,
'invalid_param': 'ignored'
}
valid_params = debug_transport_params(uri, params)
print(f"Valid parameters: {valid_params}")import time
import functools
from smart_open import open
def timing_decorator(func):
"""Decorator to measure function execution time."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
return result
return wrapper
@timing_decorator
def benchmark_read(uri, chunk_size=8192):
"""Benchmark file reading performance."""
total_bytes = 0
with open(uri, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
total_bytes += len(chunk)
return total_bytes
# Compare different chunk sizes
uris_to_test = [
's3://test-bucket/large-file.dat',
'gs://test-bucket/large-file.dat',
'azure://container/large-file.dat'
]
chunk_sizes = [4096, 8192, 16384, 32768, 65536]
for uri in uris_to_test:
print(f"\nTesting {uri}:")
for chunk_size in chunk_sizes:
total_bytes = benchmark_read(uri, chunk_size)
print(f" Chunk size {chunk_size}: {total_bytes} bytes")Always validate transport parameters to avoid runtime surprises:
from smart_open.utils import check_kwargs
from smart_open.s3 import open as s3_open
# Validate parameters before use
proposed_params = {
'buffer_size': 1024*1024,
'multipart_upload': True,
'typo_in_parameter_name': 'ignored'
}
valid_params = check_kwargs(s3_open, proposed_params)
# Use valid_params instead of proposed_paramsUse safe_urlsplit for URLs that might contain special characters:
from smart_open.utils import safe_urlsplit
# Safer than urllib.parse.urlsplit for cloud storage URLs
uri = 's3://bucket/file?with?questions.txt'
parsed = safe_urlsplit(uri)Choose appropriate parallelism based on I/O characteristics:
# CPU-bound: Use multiprocessing
with create_pool(processes=cpu_count()) as pool:
results = list(pool.imap_unordered(cpu_intensive_func, items))
# I/O-bound: Use threading
from smart_open.concurrency import ConcurrentFuturesPool
with ConcurrentFuturesPool(max_workers=20) as pool:
results = list(pool.imap_unordered(io_intensive_func, items))Install with Tessl CLI
npx tessl i tessl/pypi-smart-open