CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-smart-open

Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support

Pending
Overview
Eval results
Files

big-data.mddocs/

Big Data and Distributed Systems

Integration with Hadoop ecosystem (HDFS, WebHDFS) for big data processing workflows. Smart-open provides seamless access to distributed file systems commonly used in big data and analytics environments.

Capabilities

HDFS Operations

Direct access to Hadoop Distributed File System through command-line interface.

def open(uri, mode):
    """Open HDFS resource via CLI tools.
    
    Parameters:
        uri: str - HDFS URI (hdfs://namenode:port/path or hdfs:///path)
        mode: str - File mode ('rb', 'wb', 'r', 'w')
        
    Returns:
        File-like object for HDFS operations
        
    Notes:
        Requires hadoop CLI tools to be installed and accessible
        Uses subprocess calls to hdfs dfs commands
    """

def parse_uri(uri_as_string):
    """Parse HDFS URI into components.
    
    Returns:
        dict with scheme, namenode, port, path components
    """

WebHDFS Operations

HTTP-based access to HDFS through WebHDFS REST API.

def open(http_uri, mode, min_part_size=50*1024**2):
    """Open WebHDFS resource via REST API.
    
    Parameters:
        http_uri: str - WebHDFS HTTP endpoint URI
        mode: str - File mode ('rb', 'wb', 'r', 'w')
        min_part_size: int - Minimum part size for chunked operations
        
    Returns:
        File-like object for WebHDFS operations
        
    Notes:
        Uses HTTP requests to WebHDFS REST API
        Supports authentication via transport_params
    """

def parse_uri(uri_as_string):
    """Parse WebHDFS URI into components.
    
    Returns:
        dict with scheme, host, port, path, namenode components
    """

Usage Examples

HDFS Examples

from smart_open import open

# Read from HDFS using default namenode
with open('hdfs:///user/data/input.txt', 'rb') as f:
    content = f.read()

# Read from specific namenode
with open('hdfs://namenode.example.com:9000/user/data/file.txt', 'rb') as f:
    data = f.read()

# Write to HDFS
with open('hdfs:///user/output/results.txt', 'w') as f:
    f.write('Processing results')

# Binary operations
with open('hdfs:///user/data/binary-file.dat', 'rb') as f:
    binary_data = f.read()

# Text mode with encoding
with open('hdfs:///user/logs/application.log', 'r', encoding='utf-8') as f:
    for line in f:
        if 'ERROR' in line:
            print(line.strip())

# ViewFS (federated HDFS)
with open('viewfs:///user/data/federated-file.txt', 'rb') as f:
    content = f.read()

WebHDFS Examples

# Basic WebHDFS read
with open('webhdfs://namenode.example.com:50070/user/data/file.txt', 'rb') as f:
    content = f.read()

# WebHDFS write
with open('webhdfs://namenode.example.com:50070/user/output/data.txt', 'w') as f:
    f.write('WebHDFS content')

# With authentication (Kerberos, delegation tokens, etc.)
transport_params = {
    'user': 'hadoop-user',
    'delegation_token': 'token-string'
}
with open('webhdfs://namenode:50070/user/data/secure-file.txt', 'rb',
          transport_params=transport_params) as f:
    content = f.read()

# Custom WebHDFS parameters
transport_params = {
    'replication': 3,
    'blocksize': 134217728,  # 128MB
    'permission': '755'
}
with open('webhdfs://namenode:50070/user/output/large-file.dat', 'wb',
          transport_params=transport_params) as f:
    f.write(large_data)

# Direct WebHDFS module usage
from smart_open.webhdfs import open as webhdfs_open

with webhdfs_open('http://namenode:50070/webhdfs/v1/user/data/file.txt', 'rb') as f:
    content = f.read()

Configuration and Setup

HDFS CLI Setup

# Ensure Hadoop CLI tools are installed and configured
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

# Test HDFS connectivity
hdfs dfs -ls /

# Set HDFS configuration (core-site.xml, hdfs-site.xml)
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop

WebHDFS Configuration

# WebHDFS endpoint configuration
WEBHDFS_BASE_URL = 'http://namenode.example.com:50070'

# Authentication configuration
transport_params = {
    'user': 'your-username',
    'timeout': 30,
    'headers': {
        'User-Agent': 'smart-open-client/1.0'
    }
}

# Kerberos authentication (if enabled)
transport_params = {
    'auth': 'kerberos',  # Requires requests-kerberos
    'principal': 'user@REALM.COM'
}

# Custom SSL configuration for secure WebHDFS
transport_params = {
    'verify': '/path/to/ca-cert.pem',
    'cert': ('/path/to/client-cert.pem', '/path/to/client-key.pem')
}

Integration with Big Data Frameworks

Apache Spark Integration

# Reading HDFS data in PySpark application
from pyspark.sql import SparkSession
from smart_open import open

spark = SparkSession.builder.appName("HDFSReader").getOrCreate()

# Read metadata or small config files
with open('hdfs:///user/config/spark-config.json') as f:
    config = json.load(f)

# Process large datasets with Spark
df = spark.read.parquet('hdfs:///user/data/large-dataset.parquet')

# Write results back to HDFS
with open('hdfs:///user/output/summary.txt', 'w') as f:
    f.write(f"Processed {df.count()} records")

MapReduce Integration

# Hadoop Streaming job with smart-open
import sys
from smart_open import open

# Read input from HDFS
for line in sys.stdin:
    # Process line
    result = process_line(line.strip())
    
    # Write intermediate results to HDFS
    with open('hdfs:///user/temp/intermediate-results.txt', 'a') as f:
        f.write(f"{result}\n")

Apache Kafka Integration

# Read HDFS data for Kafka producers
from kafka import KafkaProducer
from smart_open import open

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Stream data from HDFS to Kafka
with open('hdfs:///user/streaming/data.jsonl', 'rb') as f:
    for line in f:
        producer.send('data-topic', line)

producer.flush()
producer.close()

Performance Optimization

HDFS Performance Tips

# Use binary mode for better performance
with open('hdfs:///user/data/large-file.dat', 'rb') as f:
    # Process in chunks
    while True:
        chunk = f.read(1024 * 1024)  # 1MB chunks
        if not chunk:
            break
        process_chunk(chunk)

# Parallel processing with multiple HDFS files
import concurrent.futures
import glob

def process_hdfs_file(filepath):
    with open(f'hdfs://{filepath}', 'rb') as f:
        return process_data(f.read())

# Process multiple files in parallel
hdfs_files = glob.glob('/user/data/part-*')
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(process_hdfs_file, hdfs_files))

WebHDFS Performance Tips

# Connection pooling for WebHDFS
import requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=10,
    pool_maxsize=20
)
session.mount('http://', adapter)
session.mount('https://', adapter)

transport_params = {'session': session}

# Batch operations
files_to_process = [
    'webhdfs://namenode:50070/user/data/file1.txt',
    'webhdfs://namenode:50070/user/data/file2.txt',
    'webhdfs://namenode:50070/user/data/file3.txt'
]

for file_uri in files_to_process:
    with open(file_uri, 'rb', transport_params=transport_params) as f:
        process_file(f.read())

# Chunked uploads for large files
transport_params = {
    'min_part_size': 64 * 1024 * 1024,  # 64MB chunks
    'timeout': 300
}
with open('webhdfs://namenode:50070/user/output/huge-file.dat', 'wb',
          transport_params=transport_params) as f:
    for chunk in generate_large_data():
        f.write(chunk)

Error Handling and Reliability

HDFS Error Handling

import subprocess
from smart_open import open

try:
    with open('hdfs:///user/data/file.txt', 'rb') as f:
        content = f.read()
except subprocess.CalledProcessError as e:
    if 'No such file or directory' in str(e):
        print("HDFS file not found")
    elif 'Permission denied' in str(e):
        print("HDFS permission denied")
    else:
        print(f"HDFS command failed: {e}")
except FileNotFoundError:
    print("Hadoop CLI tools not found - ensure HADOOP_HOME is set")

WebHDFS Error Handling

import requests
from smart_open import open

try:
    with open('webhdfs://namenode:50070/user/data/file.txt', 'rb') as f:
        content = f.read()
except requests.exceptions.HTTPError as e:
    status_code = e.response.status_code
    if status_code == 404:
        print("WebHDFS file not found")
    elif status_code == 403:
        print("WebHDFS access forbidden")
    elif status_code == 401:
        print("WebHDFS authentication required")
    else:
        print(f"WebHDFS HTTP error: {status_code}")
except requests.exceptions.ConnectionError:
    print("WebHDFS connection failed - check namenode availability")
except requests.exceptions.Timeout:
    print("WebHDFS request timed out")

Security and Authentication

HDFS Security

# Kerberos authentication for HDFS
kinit user@REALM.COM

# Check current Kerberos ticket
klist

# Set Hadoop security configuration
export HADOOP_SECURITY_AUTHENTICATION=kerberos
export HADOOP_SECURITY_AUTHORIZATION=true

WebHDFS Security

# Simple authentication
transport_params = {
    'user': 'hadoop-user'
}

# Delegation token authentication
transport_params = {
    'delegation': 'delegation-token-string'
}

# HTTPS WebHDFS with client certificates
transport_params = {
    'cert': '/path/to/client-cert.pem',
    'verify': '/path/to/ca-cert.pem'
}

# Custom authentication headers
transport_params = {
    'headers': {
        'Authorization': 'Bearer jwt-token-here'
    }
}

Monitoring and Debugging

HDFS Debugging

import logging
from smart_open import open

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

# Check HDFS file status before opening
import subprocess
try:
    result = subprocess.run(['hdfs', 'dfs', '-stat', '%s', '/user/data/file.txt'], 
                          capture_output=True, text=True, check=True)
    file_size = int(result.stdout.strip())
    print(f"HDFS file size: {file_size} bytes")
except subprocess.CalledProcessError:
    print("HDFS file does not exist")

WebHDFS Debugging

# Enable HTTP request debugging
import requests
import logging

# Enable debug logging for requests
logging.basicConfig(level=logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True

# Check WebHDFS file status
transport_params = {'timeout': 10}
try:
    response = requests.get(
        'http://namenode:50070/webhdfs/v1/user/data/file.txt?op=GETFILESTATUS',
        **transport_params
    )
    file_info = response.json()
    print(f"WebHDFS file info: {file_info}")
except Exception as e:
    print(f"WebHDFS status check failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-smart-open

docs

big-data.md

cloud-storage.md

compression.md

core-operations.md

index.md

network-access.md

utilities.md

tile.json