Comprehensive Python SDK for Qiniu Cloud Storage services enabling file upload, download, CDN management, SMS, and real-time communication features
—
Global configuration management, region/zone settings, and utility functions for encoding, data transformation, and SDK customization.
SDK-wide configuration management for default settings and service endpoints.
def set_default(**kwargs):
"""
Set global configuration defaults.
Args:
default_zone: Default upload zone
connection_retries: Number of connection retries (default: 3)
connection_pool: Connection pool size
connection_timeout: Connection timeout in seconds (default: 30)
default_rs_host: Default resource management host
default_uc_host: Default space info host
default_rsf_host: Default list operations host
default_api_host: Default data processing host
default_upload_threshold: File size threshold for resumable upload (default: 4MB)
default_chunk_size: Chunk size for resumable uploads (default: 4MB)
default_upload_recorder_root_directory: Root directory for upload progress records
connection_pool_size: HTTP connection pool size
connection_pool_max_size: Maximum connection pool size
preferred_scheme: Preferred URL scheme ('http' or 'https')
"""
def get_default(key: str):
"""
Get configuration value.
Args:
key: Configuration key name
Returns:
Configuration value or None if not set
"""
def is_customized_default(key: str) -> bool:
"""
Check if configuration value has been customized.
Args:
key: Configuration key name
Returns:
True if value was customized from default
"""# Service Host Constants
RS_HOST = 'http://rs.qiniu.com' # Resource management operations
RSF_HOST = 'http://rsf.qbox.me' # List operations
API_HOST = 'http://api.qiniuapi.com' # Data processing operations
QUERY_REGION_HOST = 'https://uc.qiniuapi.com' # Region query
UC_HOST = QUERY_REGION_HOST # Space info operations
# Upload Configuration
_BLOCK_SIZE = 4 * 1024 * 1024 # Resume upload block size (4MB)Upload region configuration and endpoint discovery for optimal performance.
class Region:
def __init__(self, up_host: str = None, up_host_backup: str = None, io_host: str = None, host_cache = None, home_dir: str = None, scheme: str = "http", rs_host: str = None, rsf_host: str = None, api_host: str = None, accelerate_uploading: bool = False):
"""
Initialize region configuration.
Args:
up_host: Primary upload host
up_host_backup: Backup upload host
io_host: IO operations host
host_cache: Host cache instance
home_dir: Home directory for cache
scheme: URL scheme ('http' or 'https')
rs_host: Resource management host
rsf_host: List operations host
api_host: API operations host
accelerate_uploading: Enable upload acceleration
"""
def get_up_host_by_token(self, up_token: str, home_dir: str) -> str:
"""
Get upload host from token.
Args:
up_token: Upload token
home_dir: Home directory for cache
Returns:
Primary upload host URL
"""
def get_up_host_backup_by_token(self, up_token: str, home_dir: str) -> str:
"""
Get backup upload host from token.
Args:
up_token: Upload token
home_dir: Home directory for cache
Returns:
Backup upload host URL
"""
def get_io_host(self, ak: str, bucket: str, home_dir: str = None) -> str:
"""
Get IO operations host for bucket.
Args:
ak: Access key
bucket: Bucket name
home_dir: Home directory for cache
Returns:
IO host URL
"""
def get_rs_host(self, ak: str, bucket: str, home_dir: str = None) -> str:
"""
Get resource management host for bucket.
Args:
ak: Access key
bucket: Bucket name
home_dir: Home directory for cache
Returns:
Resource management host URL
"""
def get_rsf_host(self, ak: str, bucket: str, home_dir: str = None) -> str:
"""
Get listing operations host for bucket.
Args:
ak: Access key
bucket: Bucket name
home_dir: Home directory for cache
Returns:
Listing host URL
"""
def get_api_host(self, ak: str, bucket: str, home_dir: str = None) -> str:
"""
Get API operations host for bucket.
Args:
ak: Access key
bucket: Bucket name
home_dir: Home directory for cache
Returns:
API host URL
"""
def get_up_host(self, ak: str, bucket: str, home_dir: str) -> tuple:
"""
Get upload hosts for bucket.
Args:
ak: Access key
bucket: Bucket name
home_dir: Home directory for cache
Returns:
(primary_host, backup_host): Upload host URLs
"""
def unmarshal_up_token(self, up_token: str) -> dict:
"""
Parse upload token to extract policy information.
Args:
up_token: Upload token to parse
Returns:
Dictionary containing token policy data
"""
def get_bucket_hosts(self, ak: str, bucket: str, home_dir: str = None, force: bool = False) -> dict:
"""
Get all service hosts for bucket.
Args:
ak: Access key
bucket: Bucket name
home_dir: Home directory for cache
force: Force refresh from server
Returns:
Dictionary containing all service host URLs
"""
def bucket_hosts(self, ak: str, bucket: str) -> dict:
"""
Get bucket service hosts information.
Args:
ak: Access key
bucket: Bucket name
Returns:
Dictionary containing bucket hosts info
"""
class Zone:
"""Alias for Region class for backward compatibility."""
# Inherits all methods from RegionUtility functions for data encoding, decoding, and format transformation.
def urlsafe_base64_encode(data: bytes) -> str:
"""
URL-safe base64 encoding.
Args:
data: Binary data to encode
Returns:
URL-safe base64 encoded string
"""
def urlsafe_base64_decode(data: str) -> bytes:
"""
URL-safe base64 decoding.
Args:
data: Base64 string to decode
Returns:
Decoded binary data
"""
def entry(bucket: str, key: str) -> str:
"""
Create entry format for API operations.
Args:
bucket: Bucket name
key: File key
Returns:
Base64 encoded entry string
"""
def decode_entry(e: str) -> tuple:
"""
Decode entry format to extract bucket and key.
Args:
e: Encoded entry string
Returns:
(bucket, key): Decoded bucket name and file key
"""
def canonical_mime_header_key(field_name: str) -> str:
"""
Canonicalize HTTP header key.
Args:
field_name: Header field name
Returns:
Canonicalized header key
"""Data integrity verification and hash calculation utilities.
def file_crc32(file_path: str) -> int:
"""
Calculate CRC32 checksum of file.
Args:
file_path: Path to file
Returns:
CRC32 checksum value
"""
def io_crc32(io_data) -> int:
"""
Calculate CRC32 checksum of IO data.
Args:
io_data: IO object or binary data
Returns:
CRC32 checksum value
"""
def io_md5(io_data) -> str:
"""
Calculate MD5 hash of IO data.
Args:
io_data: IO object or binary data
Returns:
MD5 hash string
"""
def crc32(data: bytes) -> int:
"""
Calculate CRC32 checksum of binary data.
Args:
data: Binary data
Returns:
CRC32 checksum value
"""
def etag(file_path: str) -> str:
"""
Calculate file etag (deprecated for v2 uploads).
Args:
file_path: Path to file
Returns:
Etag string
"""
def etag_stream(input_stream) -> str:
"""
Calculate stream etag (deprecated for v2 uploads).
Args:
input_stream: Input stream object
Returns:
Etag string
"""Time format conversion and timestamp utilities.
def rfc_from_timestamp(timestamp: int) -> str:
"""
Convert Unix timestamp to HTTP RFC format.
Args:
timestamp: Unix timestamp
Returns:
RFC formatted date string
"""
def dt2ts(dt) -> int:
"""
Convert datetime object to Unix timestamp.
Args:
dt: datetime object
Returns:
Unix timestamp
"""Helper decorators and development tools.
def deprecated(reason: str):
"""
Decorator to mark functions as deprecated.
Args:
reason: Deprecation reason message
Returns:
Decorator function
"""Response information wrapper for standardized error handling.
class ResponseInfo:
def __init__(self, response, exception: Exception = None):
"""
Initialize response info wrapper.
Args:
response: HTTP response object
exception: Exception object if error occurred
"""
@property
def status_code(self) -> int:
"""HTTP status code."""
@property
def text_body(self) -> str:
"""Response body as text."""
@property
def req_id(self) -> str:
"""Qiniu request ID."""
@property
def x_log(self) -> str:
"""Qiniu debug log information."""
@property
def error(self) -> str:
"""Error message."""
@property
def url(self) -> str:
"""Request URL."""
@property
def exception(self) -> Exception:
"""Exception object."""
def ok(self) -> bool:
"""
Check if response indicates success (2xx status).
Returns:
True if response is successful
"""
def need_retry(self) -> bool:
"""
Check if request should be retried.
Returns:
True if request should be retried
"""
def connect_failed(self) -> bool:
"""
Check if connection failed.
Returns:
True if connection failed
"""
def json(self) -> dict:
"""
Parse response body as JSON.
Returns:
Parsed JSON dictionary
"""from qiniu import set_default, get_default
# Configure SDK defaults
set_default(
connection_retries=5,
connection_timeout=60,
default_upload_threshold=8 * 1024 * 1024, # 8MB threshold for resumable upload
preferred_scheme='https',
connection_pool_size=20
)
# Check current configuration
print(f"Connection retries: {get_default('connection_retries')}")
print(f"Upload threshold: {get_default('default_upload_threshold')}")
print(f"Preferred scheme: {get_default('preferred_scheme')}")
# Custom host configuration for enterprise deployment
set_default(
default_rs_host='https://rs.qiniu-enterprise.com',
default_uc_host='https://uc.qiniu-enterprise.com',
default_api_host='https://api.qiniu-enterprise.com'
)from qiniu import Region, Auth, BucketManager
# Create custom region configuration
custom_region = Region(
up_host='https://upload-z0.qiniup.com',
up_host_backup='https://upload-z0.qiniup.com',
io_host='https://iovip.qbox.me',
scheme='https',
accelerate_uploading=True
)
# Use custom region with bucket manager
auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth, regions=[custom_region])
# Get region-specific hosts for a bucket
hosts = custom_region.get_bucket_hosts(access_key, 'my-bucket', force=True)
print(f"Upload hosts: {hosts['up']}")
print(f"IO host: {hosts['io']}")
print(f"RS host: {hosts['rs']}")
# Parse upload token to determine region
upload_token = auth.upload_token('my-bucket')
token_info = custom_region.unmarshal_up_token(upload_token)
print(f"Token scope: {token_info['scope']}")
print(f"Token deadline: {token_info['deadline']}")from qiniu import urlsafe_base64_encode, urlsafe_base64_decode, entry, decode_entry
# Base64 encoding for API parameters
data = b"Hello, Qiniu Cloud Storage!"
encoded = urlsafe_base64_encode(data)
print(f"Encoded: {encoded}")
decoded = urlsafe_base64_decode(encoded)
print(f"Decoded: {decoded.decode('utf-8')}")
# Entry format for batch operations
bucket = 'my-bucket'
key = 'path/to/file.jpg'
entry_str = entry(bucket, key)
print(f"Entry: {entry_str}")
# Decode entry back to bucket and key
decoded_bucket, decoded_key = decode_entry(entry_str)
print(f"Bucket: {decoded_bucket}, Key: {decoded_key}")
# Header canonicalization
from qiniu import canonical_mime_header_key
header_key = canonical_mime_header_key('content-type')
print(f"Canonical header: {header_key}") # Content-Typefrom qiniu import file_crc32, io_crc32, io_md5, crc32
import io
# File checksum verification
file_path = '/path/to/file.jpg'
file_checksum = file_crc32(file_path)
print(f"File CRC32: {file_checksum}")
# Data checksum calculation
data = b"Sample data for checksum"
data_checksum = crc32(data)
print(f"Data CRC32: {data_checksum}")
# IO stream checksum
stream = io.BytesIO(data)
stream_crc32 = io_crc32(stream)
stream_md5 = io_md5(stream)
print(f"Stream CRC32: {stream_crc32}")
print(f"Stream MD5: {stream_md5}")
# Verify upload integrity
from qiniu import Auth, put_data
auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket', 'data.bin')
# Calculate checksum before upload
expected_crc32 = crc32(data)
# Upload with CRC verification
ret, info = put_data(token, 'data.bin', data, check_crc=True)
if info.ok():
# Verify uploaded file CRC matches
uploaded_crc32 = ret.get('crc32')
if uploaded_crc32 == expected_crc32:
print("Upload integrity verified")
else:
print("Upload integrity check failed")from qiniu import rfc_from_timestamp, dt2ts
import datetime
import time
# Convert timestamp to HTTP date format
timestamp = int(time.time())
http_date = rfc_from_timestamp(timestamp)
print(f"HTTP Date: {http_date}")
# Convert datetime to timestamp
dt = datetime.datetime.now()
ts = dt2ts(dt)
print(f"Datetime: {dt}")
print(f"Timestamp: {ts}")
# Use in upload token expiration
from qiniu import Auth
auth = Auth(access_key, secret_key)
# Token expires in 2 hours
expire_time = datetime.datetime.now() + datetime.timedelta(hours=2)
expire_timestamp = dt2ts(expire_time)
# Create token with specific expiration
token = auth.upload_token('my-bucket', expires=expire_timestamp)
print(f"Token expires at: {expire_time}")from qiniu import Auth, BucketManager, ResponseInfo
auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)
def handle_response(operation_name, ret, info):
"""Standard response handling"""
print(f"\n=== {operation_name} ===")
if info.ok():
print("✓ Operation successful")
print(f"Status: {info.status_code}")
print(f"Request ID: {info.req_id}")
if ret:
print(f"Result: {ret}")
return True
else:
print("✗ Operation failed")
print(f"Status: {info.status_code}")
print(f"Error: {info.error}")
if info.exception:
print(f"Exception: {info.exception}")
if info.need_retry():
print("⏳ Operation can be retried")
if info.connect_failed():
print("🔌 Connection failed")
return False
# Example operations with error handling
ret, info = bucket_manager.stat('my-bucket', 'non-existent-file.jpg')
success = handle_response("File Stat", ret, info)
if not success and info.need_retry():
print("Retrying operation...")
ret, info = bucket_manager.stat('my-bucket', 'non-existent-file.jpg')
handle_response("File Stat Retry", ret, info)
# JSON response parsing
try:
json_data = info.json()
print(f"JSON Response: {json_data}")
except ValueError as e:
print(f"Failed to parse JSON: {e}")from qiniu import deprecated
import warnings
@deprecated("Use new_function() instead")
def old_function():
"""This function is deprecated"""
return "old result"
def new_function():
"""New improved function"""
return "new result"
# Using deprecated function shows warning
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = old_function()
if w:
print(f"Warning: {w[0].message}")
# Use new function instead
result = new_function()
print(f"Result: {result}")from qiniu import set_default, get_default, is_customized_default
import os
class QiniuConfig:
"""Advanced configuration management"""
def __init__(self):
self.load_from_environment()
def load_from_environment(self):
"""Load configuration from environment variables"""
env_config = {}
# Map environment variables to config keys
env_mapping = {
'QINIU_CONNECTION_RETRIES': 'connection_retries',
'QINIU_CONNECTION_TIMEOUT': 'connection_timeout',
'QINIU_PREFERRED_SCHEME': 'preferred_scheme',
'QINIU_UPLOAD_THRESHOLD': 'default_upload_threshold',
'QINIU_RS_HOST': 'default_rs_host',
'QINIU_UC_HOST': 'default_uc_host'
}
for env_var, config_key in env_mapping.items():
value = os.getenv(env_var)
if value:
# Convert numeric values
if config_key in ['connection_retries', 'connection_timeout', 'default_upload_threshold']:
value = int(value)
env_config[config_key] = value
if env_config:
set_default(**env_config)
print(f"Loaded {len(env_config)} settings from environment")
def show_current_config(self):
"""Display current configuration"""
config_keys = [
'connection_retries',
'connection_timeout',
'preferred_scheme',
'default_upload_threshold',
'default_rs_host',
'default_uc_host'
]
print("\n=== Qiniu SDK Configuration ===")
for key in config_keys:
value = get_default(key)
customized = is_customized_default(key)
status = "CUSTOM" if customized else "DEFAULT"
print(f"{key}: {value} ({status})")
def reset_to_defaults(self):
"""Reset configuration to defaults"""
# This would require SDK support for resetting
print("Configuration reset not directly supported")
# Usage
config = QiniuConfig()
config.show_current_config()Install with Tessl CLI
npx tessl i tessl/pypi-qiniu