Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support
—
HTTP/HTTPS, FTP, and SSH-based file access with authentication and secure connection support. Smart-open provides seamless integration with various network protocols for remote file operations.
Read-only access to web resources with authentication and custom headers support.
def open(uri, mode, kerberos=False, user=None, password=None, cert=None,
headers=None, timeout=None, session=None, buffer_size=256*1024):
"""Open HTTP/HTTPS resource for reading.
Parameters:
uri: str - HTTP(S) URL
mode: str - Must be 'rb' (read binary only)
kerberos: bool - Use Kerberos authentication
user: str - Basic authentication username
password: str - Basic authentication password
cert: str - Path to client certificate file
headers: dict - Additional HTTP headers
timeout: float - Request timeout in seconds
session: requests.Session - Custom requests session
buffer_size: int - Buffer size for reading (default: 256KB)
Returns:
File-like object for reading HTTP response
"""
def parse_uri(uri_as_string):
"""Parse HTTP/HTTPS URI into components.
Returns:
dict with parsed URI components
"""Full read/write access to FTP servers with secure connection support.
def open(path, mode="rb", host=None, user=None, password=None, port=21,
secure_connection=False, transport_params=None):
"""Open FTP resource for reading or writing.
Parameters:
path: str - Remote file path
mode: str - File mode ('rb' or 'wb')
host: str - FTP server hostname
user: str - FTP username
password: str - FTP password
port: int - FTP server port (default: 21)
secure_connection: bool - Use FTPS (secure FTP)
transport_params: dict - Additional transport parameters
Returns:
File-like object for FTP operations
"""
def parse_uri(uri_as_string):
"""Parse FTP/FTPS URI into components.
Returns:
dict with hostname, username, password, path, port, scheme
"""Secure file access over SSH with key-based and password authentication.
def open(path, mode="r", host=None, user=None, password=None, port=None,
connect_kwargs=None, prefetch_kwargs=None, buffer_size=-1):
"""Open SSH/SFTP resource for reading or writing.
Parameters:
path: str - Remote file path
mode: str - File mode ('r', 'w', 'rb', 'wb')
host: str - SSH server hostname
user: str - SSH username
password: str - SSH password (if not using key auth)
port: int - SSH server port (None for default)
connect_kwargs: dict - Additional SSH connection parameters including:
- pkey: paramiko.PKey - Private key object
- key_filename: str - Path to private key file
- look_for_keys: bool - Search for key files
- allow_agent: bool - Use SSH agent
- timeout: float - Connection timeout
- compress: bool - Enable compression
prefetch_kwargs: dict - SFTP prefetch parameters
buffer_size: int - Buffer size for I/O (-1 for default)
Returns:
File-like object for SSH/SFTP operations
"""
def parse_uri(uri_as_string):
"""Parse SSH/SCP/SFTP URI into components.
Returns:
dict with hostname, username, password, path, port, scheme
"""from smart_open import open
# Simple HTTP read
with open('http://example.com/data.txt', 'rb') as f:
content = f.read()
# HTTPS with custom headers
transport_params = {
'headers': {
'User-Agent': 'MyApp/1.0',
'Authorization': 'Bearer token123'
},
'timeout': 30
}
with open('https://api.example.com/data.json', 'rb',
transport_params=transport_params) as f:
data = f.read()
# Basic authentication
transport_params = {
'user': 'username',
'password': 'password'
}
with open('https://secure.example.com/file.txt', 'rb',
transport_params=transport_params) as f:
content = f.read()
# Client certificate authentication
transport_params = {
'cert': '/path/to/client.pem',
'ca_certs': '/path/to/ca-bundle.pem'
}
with open('https://secure.example.com/api/data', 'rb',
transport_params=transport_params) as f:
response = f.read()
# Kerberos authentication (requires requests-kerberos)
transport_params = {'kerberos': True}
with open('https://intranet.company.com/file.txt', 'rb',
transport_params=transport_params) as f:
data = f.read()# Basic FTP access
with open('ftp://user:pass@ftp.example.com/path/file.txt', 'rb') as f:
content = f.read()
# FTP write operation
with open('ftp://user:pass@ftp.example.com/upload/data.txt', 'w') as f:
f.write('Upload this content')
# FTPS (secure FTP)
with open('ftps://user:pass@secure-ftp.example.com/file.txt', 'rb') as f:
content = f.read()
# Custom FTP port
with open('ftp://user:pass@ftp.example.com:2121/file.txt', 'rb') as f:
data = f.read()
# Direct FTP module usage
from smart_open.ftp import open as ftp_open
with ftp_open('ftp.example.com', 'username', 'password',
'/remote/path/file.txt', 'rb', port=21) as f:
content = f.read()# Password authentication
with open('ssh://user:password@server.com/path/file.txt', 'rb') as f:
content = f.read()
# Key-based authentication (using SSH agent or default keys)
with open('ssh://user@server.com/path/file.txt', 'rb') as f:
content = f.read()
# Explicit private key file
transport_params = {
'key_filename': '/home/user/.ssh/id_rsa'
}
with open('ssh://user@server.com/path/file.txt', 'rb',
transport_params=transport_params) as f:
content = f.read()
# Custom SSH port and connection options
transport_params = {
'port': 2222,
'timeout': 10,
'compress': True,
'look_for_keys': True
}
with open('ssh://user@server.com/path/file.txt', 'rb',
transport_params=transport_params) as f:
content = f.read()
# SFTP write operation
with open('sftp://user@server.com/upload/data.txt', 'w') as f:
f.write('Remote file content')
# SCP-style URLs (same as SSH/SFTP)
with open('scp://user@server.com/path/file.txt', 'rb') as f:
content = f.read()
# Direct SSH module usage
from smart_open.ssh import open as ssh_open
with ssh_open('server.com', 'username', '/remote/path/file.txt', 'rb',
password='password', port=22) as f:
content = f.read()# Basic authentication
transport_params = {
'user': 'username',
'password': 'password'
}
# Bearer token
transport_params = {
'headers': {'Authorization': 'Bearer your-token'}
}
# API key header
transport_params = {
'headers': {'X-API-Key': 'your-api-key'}
}
# Digest authentication (handled automatically by requests)
transport_params = {
'user': 'username',
'password': 'password'
}
# Custom authentication header
transport_params = {
'headers': {'Authorization': 'Custom your-auth-string'}
}# Using specific private key
transport_params = {
'key_filename': '/path/to/private_key'
}
# Using multiple key files
transport_params = {
'key_filename': ['/path/to/key1', '/path/to/key2']
}
# Using paramiko PKey object
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/path/to/key')
transport_params = {
'pkey': private_key
}
# Disable automatic key search
transport_params = {
'look_for_keys': False,
'allow_agent': False,
'key_filename': '/specific/key/only'
}# Default behavior: verify certificates
with open('https://secure.example.com/file.txt', 'rb') as f:
content = f.read()
# Custom CA certificates
transport_params = {
'ca_certs': '/path/to/custom-ca-bundle.pem'
}
# Client certificate authentication
transport_params = {
'cert': '/path/to/client-cert.pem' # Can include key
}
# Separate cert and key files
transport_params = {
'cert': ('/path/to/client-cert.pem', '/path/to/client-key.pem')
}# Restrict to specific host key
transport_params = {
'host_key_policy': paramiko.RejectPolicy() # Reject unknown hosts
}
# Custom host key verification
import paramiko
class CustomHostKeyPolicy(paramiko.MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
# Custom host key verification logic
pass
transport_params = {
'host_key_policy': CustomHostKeyPolicy()
}
# Connection timeout and retries
transport_params = {
'timeout': 10,
'banner_timeout': 30,
'auth_timeout': 30
}import requests
from smart_open import open
try:
with open('https://api.example.com/data.json', 'rb') as f:
data = f.read()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 404:
print("Resource not found")
elif status_code == 401:
print("Authentication required")
elif status_code == 403:
print("Access forbidden")
else:
print(f"HTTP error: {status_code}")
except requests.exceptions.ConnectionError:
print("Connection failed")
except requests.exceptions.Timeout:
print("Request timed out")import paramiko
from smart_open import open
try:
with open('ssh://user@server.com/file.txt', 'rb') as f:
content = f.read()
except paramiko.AuthenticationException:
print("SSH authentication failed")
except paramiko.SSHException as e:
print(f"SSH connection error: {e}")
except FileNotFoundError:
print("Remote file not found")
except PermissionError:
print("Permission denied")import ftplib
from smart_open import open
try:
with open('ftp://user:pass@server.com/file.txt', 'rb') as f:
content = f.read()
except ftplib.error_perm as e:
error_code = str(e)[:3]
if error_code == '530':
print("FTP authentication failed")
elif error_code == '550':
print("File not found or no permission")
else:
print(f"FTP permission error: {e}")
except ftplib.error_temp as e:
print(f"Temporary FTP error: {e}")
except ConnectionError:
print("FTP connection failed")# Connection pooling and keep-alive
import requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=3
)
session.mount('http://', adapter)
session.mount('https://', adapter)
transport_params = {'session': session}
# Streaming large files
with open('https://example.com/large-file.dat', 'rb',
transport_params={'stream': True}) as f:
for chunk in iter(lambda: f.read(8192), b''):
process_chunk(chunk)import paramiko
# Reuse SSH client for multiple files
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('server.com', username='user', password='pass')
transport_params = {'client': client}
# Use same client for multiple operations
with open('ssh://server.com/file1.txt', 'rb',
transport_params=transport_params) as f:
content1 = f.read()
with open('ssh://server.com/file2.txt', 'rb',
transport_params=transport_params) as f:
content2 = f.read()
client.close()Install with Tessl CLI
npx tessl i tessl/pypi-smart-open