Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
HTTP and HTTPS resource access with custom authentication, directory listing parsers, and RESTful operations for web-based storage systems. This implementation provides pathlib-compatible access to HTTP/HTTPS resources with full control over HTTP operations.
HTTP-specific path implementations with RESTful operation support.
class HttpPath(CloudPath):
"""HTTP resource path implementation."""
@property
def parsed_url(self) -> "urllib.parse.ParseResult":
"""
Parsed URL components.
Returns:
ParseResult object with URL components
"""
@property
def drive(self) -> str:
"""
Network location (netloc).
Returns:
Network location from URL
"""
@property
def anchor(self) -> str:
"""
Full scheme + netloc.
Returns:
Scheme and network location
"""
def get(self, **kwargs) -> typing.Tuple["http.client.HTTPResponse", bytes]:
"""
Issue GET request to the URL.
Args:
**kwargs: Arguments passed to urllib request
Returns:
Tuple of (HTTPResponse, response body)
"""
def put(self, **kwargs) -> typing.Tuple["http.client.HTTPResponse", bytes]:
"""
Issue PUT request to the URL.
Args:
**kwargs: Arguments passed to urllib request
Returns:
Tuple of (HTTPResponse, response body)
"""
def post(self, **kwargs) -> typing.Tuple["http.client.HTTPResponse", bytes]:
"""
Issue POST request to the URL.
Args:
**kwargs: Arguments passed to urllib request
Returns:
Tuple of (HTTPResponse, response body)
"""
def delete(self, **kwargs) -> typing.Tuple["http.client.HTTPResponse", bytes]:
"""
Issue DELETE request to the URL.
Args:
**kwargs: Arguments passed to urllib request
Returns:
Tuple of (HTTPResponse, response body)
"""
def head(self, **kwargs) -> typing.Tuple["http.client.HTTPResponse", bytes]:
"""
Issue HEAD request to the URL.
Args:
**kwargs: Arguments passed to urllib request
Returns:
Tuple of (HTTPResponse, response body)
"""
class HttpsPath(HttpPath):
"""HTTPS resource path implementation (same API as HttpPath)."""HTTP client with comprehensive authentication and configuration options.
class HttpClient:
"""HTTP client for web resource access."""
def __init__(
self,
file_cache_mode: FileCacheMode = None,
local_cache_dir: str = None,
content_type_method = None,
auth = None,
custom_list_page_parser = None,
custom_dir_matcher = None,
write_file_http_method: str = 'PUT'
):
"""
Initialize HTTP client.
Args:
file_cache_mode: Cache management strategy
local_cache_dir: Local directory for file cache
content_type_method: Function to determine MIME types
auth: Authentication handler (requests auth object)
custom_list_page_parser: Function to parse directory listings
custom_dir_matcher: Function to identify directories
write_file_http_method: HTTP method for file uploads
"""
def request(
self,
url: str,
method: str,
**kwargs
) -> typing.Tuple["http.client.HTTPResponse", bytes]:
"""
Make HTTP request.
Args:
url: Target URL
method: HTTP method
**kwargs: Additional request arguments
Returns:
Tuple of (HTTPResponse, response body)
"""
@property
def dir_matcher(self):
"""Function to identify directories from HTTP responses."""
@property
def write_file_http_method(self) -> str:
"""HTTP method used for file uploads."""
class HttpsClient(HttpClient):
"""HTTPS client (same API as HttpClient)."""from cloudpathlib import HttpPath, HttpsPath, HttpClient
# Create HTTP paths
http_path = HttpPath("http://example.com/api/data.json")
https_path = HttpsPath("https://api.example.com/data.json")
# Access URL properties
print(f"Netloc: {https_path.drive}") # "api.example.com"
print(f"Anchor: {https_path.anchor}") # "https://api.example.com"
print(f"Parsed: {https_path.parsed_url}") # ParseResult object# Create HTTPS path for API endpoint
api_path = HttpsPath("https://api.example.com/users/123")
# GET request
response = api_path.get()
if response.status_code == 200:
user_data = response.json()
print(f"User: {user_data}")
# POST request with data
create_path = HttpsPath("https://api.example.com/users")
response = create_path.post(json={
"name": "John Doe",
"email": "john@example.com"
})
# PUT request to update
update_data = {"name": "Jane Doe"}
response = api_path.put(json=update_data)
# DELETE request
response = api_path.delete()
print(f"Delete status: {response.status_code}")
# HEAD request for metadata
response = api_path.head()
print(f"Content-Length: {response.headers.get('Content-Length')}")from requests.auth import HTTPBasicAuth, HTTPDigestAuth
# Basic authentication
auth = HTTPBasicAuth('username', 'password')
client = HttpClient(auth=auth)
authenticated_path = HttpsPath(
"https://protected.example.com/data.json",
client=client
)
# API key authentication
class APIKeyAuth:
def __init__(self, api_key):
self.api_key = api_key
def __call__(self, request):
request.headers['Authorization'] = f'Bearer {self.api_key}'
return request
api_auth = APIKeyAuth('your-api-key')
client = HttpClient(auth=api_auth)
# OAuth token authentication
def oauth_auth(request):
request.headers['Authorization'] = f'Bearer {oauth_token}'
return request
client = HttpClient(auth=oauth_auth)# Download file from HTTP
file_url = HttpsPath("https://example.com/files/document.pdf")
# Download to local file
local_path = file_url.download_to("downloaded_document.pdf")
print(f"Downloaded to: {local_path}")
# Read content directly
content = file_url.read_bytes()
# Upload file via PUT (default)
upload_url = HttpsPath("https://upload.example.com/files/new_document.pdf")
upload_url.upload_from("local_document.pdf")
# Upload via POST
client = HttpClient(write_file_http_method='POST')
upload_url = HttpsPath("https://upload.example.com/files/", client=client)
upload_url.upload_from("local_document.pdf")import re
from bs4 import BeautifulSoup
def parse_apache_directory_listing(response_text):
"""Parse Apache-style directory listing."""
soup = BeautifulSoup(response_text, 'html.parser')
entries = []
for link in soup.find_all('a'):
href = link.get('href')
if href and href not in ['../', '../']:
entries.append(href.rstrip('/'))
return entries
def is_directory(name):
"""Identify directories by trailing slash or no extension."""
return name.endswith('/') or '.' not in name.split('/')[-1]
# Configure client with custom parsers
client = HttpClient(
custom_list_page_parser=parse_apache_directory_listing,
custom_dir_matcher=is_directory
)
# List directory contents
dir_path = HttpPath("http://files.example.com/data/", client=client)
for item in dir_path.iterdir():
print(f"{'Dir' if item.is_dir() else 'File'}: {item.name}")# REST API interaction
api_base = HttpsPath("https://jsonplaceholder.typicode.com")
# Get all posts
posts_path = api_base / "posts"
response = posts_path.get()
posts = response.json()
print(f"Found {len(posts)} posts")
# Get specific post
post_path = api_base / "posts" / "1"
response = post_path.get()
post = response.json()
print(f"Post title: {post['title']}")
# Create new post
new_post = {
"title": "New Post",
"body": "This is a new post",
"userId": 1
}
response = posts_path.post(json=new_post)
created_post = response.json()
print(f"Created post ID: {created_post['id']}")
# Update post
updated_data = {"title": "Updated Title"}
response = post_path.put(json=updated_data)
# Delete post
response = post_path.delete()
print(f"Delete status: {response.status_code}")# Work with file servers
file_server = HttpsPath("https://files.example.com")
# List files in directory
data_dir = file_server / "data"
for file_path in data_dir.glob("*.csv"):
print(f"CSV file: {file_path}")
# Download and process
local_file = file_path.download_to(f"local_{file_path.name}")
process_csv_file(local_file)
# Upload files to server
local_files = Path("uploads/").glob("*.txt")
upload_dir = file_server / "uploads"
for local_file in local_files:
remote_path = upload_dir / local_file.name
remote_path.upload_from(local_file)
print(f"Uploaded: {remote_path}")from requests_toolbelt.auth.http_proxy_digest import HTTPProxyDigestAuth
# WebDAV server access
webdav_auth = HTTPDigestAuth('username', 'password')
client = HttpClient(
auth=webdav_auth,
write_file_http_method='PUT'
)
webdav_path = HttpsPath("https://webdav.example.com/files/", client=client)
# WebDAV operations
document = webdav_path / "document.txt"
document.write_text("WebDAV content")
# Create directory (MKCOL method via custom request)
new_dir = webdav_path / "new_folder"
response = client.request(str(new_dir), 'MKCOL')
# List directory contents
for item in webdav_path.iterdir():
print(f"WebDAV item: {item}")# Stream large files
large_file_url = HttpsPath("https://download.example.com/large-dataset.zip")
# Stream download
with large_file_url.open('rb') as remote_file:
with open('local-dataset.zip', 'wb') as local_file:
for chunk in remote_file:
local_file.write(chunk)
print(f"Downloaded chunk: {len(chunk)} bytes")
# Stream processing
csv_url = HttpsPath("https://data.example.com/big-data.csv")
with csv_url.open('r') as f:
import csv
reader = csv.DictReader(f)
for row_num, row in enumerate(reader):
process_row(row)
if row_num % 1000 == 0:
print(f"Processed {row_num} rows")# Configure client with custom headers
class CustomHeadersAuth:
def __init__(self, api_key, user_agent):
self.api_key = api_key
self.user_agent = user_agent
def __call__(self, request):
request.headers.update({
'X-API-Key': self.api_key,
'User-Agent': self.user_agent,
'Accept': 'application/json'
})
return request
client = HttpClient(auth=CustomHeadersAuth('key123', 'MyApp/1.0'))
# Make requests with custom headers
api_path = HttpsPath("https://api.example.com/data", client=client)
response = api_path.get(params={'format': 'json', 'limit': 100})import requests
# Use persistent session
session = requests.Session()
session.headers.update({'User-Agent': 'CloudPathLib/1.0'})
session.auth = HTTPBasicAuth('user', 'pass')
# Configure client to use session
class SessionClient(HttpClient):
def __init__(self, session, **kwargs):
super().__init__(**kwargs)
self.session = session
def request(self, url, method, **kwargs):
return self.session.request(method, url, **kwargs)
client = SessionClient(session)
# All requests use the same session
path1 = HttpsPath("https://api.example.com/resource1", client=client)
path2 = HttpsPath("https://api.example.com/resource2", client=client)
response1 = path1.get() # Uses session
response2 = path2.get() # Reuses session connectionfrom cloudpathlib import CloudPathFileNotFoundError
import requests
try:
http_path = HttpsPath("https://api.example.com/nonexistent")
content = http_path.read_text()
except CloudPathFileNotFoundError:
print("HTTP resource not found")
except requests.exceptions.ConnectionError:
print("Connection failed")
except requests.exceptions.Timeout:
print("Request timed out")
except requests.exceptions.HTTPError as e:
print(f"HTTP error: {e}")
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
# Check response status
http_path = HttpsPath("https://api.example.com/data")
response = http_path.get()
if response.status_code == 200:
data = response.json()
elif response.status_code == 404:
print("Resource not found")
elif response.status_code == 401:
print("Authentication required")
else:
print(f"HTTP {response.status_code}: {response.reason}")# Configure timeouts and retries
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set timeouts
session.timeout = (10, 30) # (connect, read) timeout
client = SessionClient(session)
# Concurrent downloads
import concurrent.futures
def download_file(url_str):
url = HttpsPath(url_str, client=client)
return url.download_to(f"downloads/{url.name}")
urls = [
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(download_file, url) for url in urls]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Downloaded: {result}")
except Exception as e:
print(f"Download failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cloudpathlib