Comprehensive Python SDK for Qiniu Cloud Storage services enabling file upload, download, CDN management, SMS, and real-time communication features
—
Complete file upload, download, and management operations with support for resumable uploads, batch operations, and comprehensive bucket management.
High-level upload functions supporting various data sources with resumable upload capabilities.
def put_file_v2(up_token: str, key: str, file_path: str, params: dict = None, mime_type: str = 'application/octet-stream', check_crc: bool = False, progress_handler = None, upload_progress_recorder = None, keep_last_modified: bool = False, part_size: int = None, version: str = 'v2', bucket_name: str = None, metadata: dict = None, regions = None, accelerate_uploading: bool = False) -> tuple:
"""
Upload file from local path (current version - recommended).
Args:
up_token: Upload token from Auth.upload_token()
key: File key in bucket
file_path: Local file path
params: Custom upload parameters
mime_type: File MIME type
check_crc: Enable CRC32 checksum verification
progress_handler: Callback for upload progress (func(uploaded, total))
upload_progress_recorder: UploadProgressRecorder for resumable uploads
keep_last_modified: Preserve original file modification time
part_size: Chunk size for resumable upload (default: 4MB)
version: Upload version ('v1' or 'v2')
bucket_name: Target bucket name
metadata: Custom metadata dictionary
regions: Region configuration
accelerate_uploading: Enable upload acceleration
Returns:
(dict, ResponseInfo): Upload result and response info
"""
def put_data(up_token: str, key: str, data: bytes, params: dict = None, mime_type: str = 'application/octet-stream', check_crc: bool = False, progress_handler = None, fname: str = None, hostscache_dir: str = None, metadata: dict = None, regions = None, accelerate_uploading: bool = False) -> tuple:
"""
Upload binary data directly.
Args:
up_token: Upload token
key: File key in bucket
data: Binary data to upload
params: Custom upload parameters
mime_type: Data MIME type
check_crc: Enable CRC32 verification
progress_handler: Progress callback function
fname: Original filename for metadata
hostscache_dir: Host cache directory
metadata: Custom metadata
regions: Region configuration
accelerate_uploading: Enable acceleration
Returns:
(dict, ResponseInfo): Upload result and response info
"""
def put_stream_v2(up_token: str, key: str, input_stream, file_name: str, data_size: int, params: dict = None, mime_type: str = None, progress_handler = None, upload_progress_recorder = None, modify_time: int = None, keep_last_modified: bool = False, part_size: int = None, version: str = 'v2', bucket_name: str = None, metadata: dict = None, regions = None, accelerate_uploading: bool = False) -> tuple:
"""
Upload from input stream (current version - recommended).
Args:
up_token: Upload token
key: File key in bucket
input_stream: Input stream object (file-like)
file_name: Original filename
data_size: Total data size in bytes
params: Custom upload parameters
mime_type: Stream MIME type
progress_handler: Progress callback function
upload_progress_recorder: Progress recorder for resumable uploads
modify_time: File modification timestamp
keep_last_modified: Preserve modification time
part_size: Chunk size for upload
version: Upload version
bucket_name: Target bucket name
metadata: Custom metadata
regions: Region configuration
accelerate_uploading: Enable acceleration
Returns:
(dict, ResponseInfo): Upload result and response info
"""
def put_file(up_token: str, key: str, file_path: str, **kwargs) -> tuple:
"""
Upload file from local path (deprecated - use put_file_v2).
Args:
up_token: Upload token
key: File key
file_path: Local file path
**kwargs: Additional parameters
Returns:
(dict, ResponseInfo): Upload result and response info
"""
def put_stream(up_token: str, key: str, input_stream, file_name: str, data_size: int, **kwargs) -> tuple:
"""
Upload from input stream (deprecated - use put_stream_v2).
Args:
up_token: Upload token
key: File key
input_stream: Input stream
file_name: Original filename
data_size: Data size
**kwargs: Additional parameters
Returns:
(dict, ResponseInfo): Upload result and response info
"""Comprehensive bucket and file management operations through the BucketManager class.
class BucketManager:
def __init__(self, auth: Auth, zone = None, regions = None, query_regions_endpoints = None, preferred_scheme: str = 'http'):
"""
Initialize bucket manager.
Args:
auth: Auth instance for authentication
zone: Upload zone configuration (deprecated, use regions)
regions: Region configuration
query_regions_endpoints: Custom region query endpoints
preferred_scheme: Preferred URL scheme ('http' or 'https')
"""
def list(self, bucket: str, prefix: str = None, marker: str = None, limit: int = None, delimiter: str = None) -> tuple:
"""
List files in bucket.
Args:
bucket: Bucket name
prefix: Key prefix filter
marker: Pagination marker (from previous response)
limit: Maximum number of files to return
delimiter: Delimiter for hierarchical listing
Returns:
(list, bool, str): (files_list, has_more, next_marker)
"""
def stat(self, bucket: str, key: str) -> tuple:
"""
Get file information.
Args:
bucket: Bucket name
key: File key
Returns:
(dict, ResponseInfo): File info and response info
"""
def delete(self, bucket: str, key: str) -> tuple:
"""
Delete file.
Args:
bucket: Bucket name
key: File key
Returns:
(dict, ResponseInfo): Result and response info
"""
def rename(self, bucket: str, key: str, key_to: str, force: str = 'false') -> tuple:
"""
Rename file within bucket.
Args:
bucket: Bucket name
key: Current file key
key_to: New file key
force: Force overwrite if target exists
Returns:
(dict, ResponseInfo): Result and response info
"""
def move(self, bucket: str, key: str, bucket_to: str, key_to: str, force: str = 'false') -> tuple:
"""
Move file to different bucket/key.
Args:
bucket: Source bucket name
key: Source file key
bucket_to: Destination bucket name
key_to: Destination file key
force: Force overwrite if target exists
Returns:
(dict, ResponseInfo): Result and response info
"""
def copy(self, bucket: str, key: str, bucket_to: str, key_to: str, force: str = 'false') -> tuple:
"""
Copy file to different bucket/key.
Args:
bucket: Source bucket name
key: Source file key
bucket_to: Destination bucket name
key_to: Destination file key
force: Force overwrite if target exists
Returns:
(dict, ResponseInfo): Result and response info
"""
def fetch(self, url: str, bucket: str, key: str = None, hostscache_dir: str = None) -> tuple:
"""
Fetch file from external URL.
Args:
url: External file URL
bucket: Target bucket name
key: Target file key (auto-generated if None)
hostscache_dir: Host cache directory
Returns:
(dict, ResponseInfo): Result and response info
"""
def prefetch(self, bucket: str, key: str, hostscache_dir: str = None) -> tuple:
"""
Prefetch file from origin (mirror storage).
Args:
bucket: Bucket name
key: File key
hostscache_dir: Host cache directory
Returns:
(dict, ResponseInfo): Result and response info
"""
def change_mime(self, bucket: str, key: str, mime: str) -> tuple:
"""
Change file MIME type.
Args:
bucket: Bucket name
key: File key
mime: New MIME type
Returns:
(dict, ResponseInfo): Result and response info
"""
def change_type(self, bucket: str, key: str, storage_type: int) -> tuple:
"""
Change file storage type.
Args:
bucket: Bucket name
key: File key
storage_type: Storage type (0=Standard, 1=IA, 2=Archive)
Returns:
(dict, ResponseInfo): Result and response info
"""
def change_status(self, bucket: str, key: str, status: int, cond: dict = None) -> tuple:
"""
Change file status.
Args:
bucket: Bucket name
key: File key
status: New status
cond: Conditional parameters
Returns:
(dict, ResponseInfo): Result and response info
"""
def delete_after_days(self, bucket: str, key: str, days: int) -> tuple:
"""
Set file deletion schedule.
Args:
bucket: Bucket name
key: File key
days: Days until deletion
Returns:
(dict, ResponseInfo): Result and response info
"""
def restore_ar(self, bucket: str, key: str, freeze_after_days: int) -> tuple:
"""
Restore archived file.
Args:
bucket: Bucket name
key: File key
freeze_after_days: Days to keep restored before re-archiving
Returns:
(dict, ResponseInfo): Result and response info
"""
def buckets(self) -> tuple:
"""
List all accessible buckets.
Returns:
(list, ResponseInfo): Bucket list and response info
"""
def mkbucketv3(self, bucket_name: str, region: str) -> tuple:
"""
Create new bucket.
Args:
bucket_name: Name for new bucket
region: Target region identifier
Returns:
(dict, ResponseInfo): Result and response info
"""
def bucket_info(self, bucket_name: str) -> tuple:
"""
Get bucket information.
Args:
bucket_name: Bucket name
Returns:
(dict, ResponseInfo): Bucket info and response info
"""
def list_domains(self, bucket: str) -> tuple:
"""
List bucket domains.
Args:
bucket: Bucket name
Returns:
(list, ResponseInfo): Domain list and response info
"""
def batch(self, operations: list) -> tuple:
"""
Execute batch operations.
Args:
operations: List of operation commands
Returns:
(list, ResponseInfo): Results list and response info
"""Helper functions to build batch operation commands for efficient bulk operations.
def build_batch_copy(source_bucket: str, key_pairs: list, target_bucket: str, force: str = 'false') -> list:
"""
Build batch copy operations.
Args:
source_bucket: Source bucket name
key_pairs: List of (source_key, target_key) tuples
target_bucket: Target bucket name
force: Force overwrite existing files
Returns:
List of batch operation commands
"""
def build_batch_rename(bucket: str, key_pairs: list, force: str = 'false') -> list:
"""
Build batch rename operations.
Args:
bucket: Bucket name
key_pairs: List of (old_key, new_key) tuples
force: Force overwrite existing files
Returns:
List of batch operation commands
"""
def build_batch_move(source_bucket: str, key_pairs: list, target_bucket: str, force: str = 'false') -> list:
"""
Build batch move operations.
Args:
source_bucket: Source bucket name
key_pairs: List of (source_key, target_key) tuples
target_bucket: Target bucket name
force: Force overwrite existing files
Returns:
List of batch operation commands
"""
def build_batch_stat(bucket: str, keys: list) -> list:
"""
Build batch stat operations.
Args:
bucket: Bucket name
keys: List of file keys
Returns:
List of batch operation commands
"""
def build_batch_delete(bucket: str, keys: list) -> list:
"""
Build batch delete operations.
Args:
bucket: Bucket name
keys: List of file keys to delete
Returns:
List of batch operation commands
"""
def build_batch_restore_ar(bucket: str, keys: list) -> list:
"""
Build batch restore archive operations.
Args:
bucket: Bucket name
keys: List of archived file keys
Returns:
List of batch operation commands
"""Persistent progress tracking for resumable uploads.
class UploadProgressRecorder:
def __init__(self, record_folder: str = None):
"""
Initialize upload progress recorder.
Args:
record_folder: Directory to store progress records (default: temp dir)
"""
def has_upload_record(self, file_name: str, key: str) -> bool:
"""
Check if upload record exists.
Args:
file_name: Local file name
key: Upload key
Returns:
True if record exists
"""
def get_upload_record(self, file_name: str, key: str) -> dict:
"""
Get upload progress record.
Args:
file_name: Local file name
key: Upload key
Returns:
Progress record dictionary
"""
def set_upload_record(self, file_name: str, key: str, data: dict):
"""
Save upload progress record.
Args:
file_name: Local file name
key: Upload key
data: Progress data to save
"""
def delete_upload_record(self, file_name: str, key: str):
"""
Delete upload progress record.
Args:
file_name: Local file name
key: Upload key
"""from qiniu import Auth, put_file_v2
# Initialize authentication
auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket', 'my-file.jpg')
# Upload file
ret, info = put_file_v2(token, 'my-file.jpg', './local/path/image.jpg')
if info.ok():
print(f"Upload successful: {ret['key']} -> {ret['hash']}")
else:
print(f"Upload failed: {info.error}")from qiniu import Auth, put_file_v2, UploadProgressRecorder
def progress_handler(uploaded_bytes, total_bytes):
progress = uploaded_bytes / total_bytes * 100
print(f"Upload progress: {progress:.2f}%")
auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket')
# Enable resumable upload with progress tracking
recorder = UploadProgressRecorder()
ret, info = put_file_v2(
token,
'large-file.zip',
'./large-file.zip',
progress_handler=progress_handler,
upload_progress_recorder=recorder,
part_size=8 * 1024 * 1024 # 8MB chunks
)from qiniu import Auth, put_data
auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket', 'data.json')
# Upload JSON data
import json
data = json.dumps({"message": "Hello Qiniu"}).encode('utf-8')
ret, info = put_data(token, 'data.json', data, mime_type='application/json')from qiniu import Auth, BucketManager
auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)
# Get file info
ret, info = bucket_manager.stat('my-bucket', 'my-file.jpg')
if info.ok():
print(f"File size: {ret['fsize']} bytes")
print(f"MIME type: {ret['mimeType']}")
# List files with prefix
files, has_more, marker = bucket_manager.list('my-bucket', prefix='images/', limit=100)
for file in files['items']:
print(f"File: {file['key']}, Size: {file['fsize']}")
# Copy file
ret, info = bucket_manager.copy('my-bucket', 'original.jpg', 'my-bucket', 'backup.jpg')
# Delete file
ret, info = bucket_manager.delete('my-bucket', 'old-file.jpg')from qiniu import Auth, BucketManager, build_batch_delete, build_batch_copy
auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)
# Batch delete multiple files
keys_to_delete = ['file1.jpg', 'file2.png', 'file3.pdf']
delete_ops = build_batch_delete('my-bucket', keys_to_delete)
results, info = bucket_manager.batch(delete_ops)
# Batch copy files
key_pairs = [('src1.jpg', 'dst1.jpg'), ('src2.png', 'dst2.png')]
copy_ops = build_batch_copy('source-bucket', key_pairs, 'target-bucket')
results, info = bucket_manager.batch(copy_ops)
# Check results
for i, result in enumerate(results):
if result['code'] == 200:
print(f"Operation {i} succeeded")
else:
print(f"Operation {i} failed: {result}")from qiniu import Auth, BucketManager
auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)
# Fetch file from external URL
external_url = 'https://example.com/image.jpg'
ret, info = bucket_manager.fetch(external_url, 'my-bucket', 'fetched-image.jpg')
if info.ok():
print(f"Fetched file: {ret['key']}")
print(f"File size: {ret['fsize']} bytes")Install with Tessl CLI
npx tessl i tessl/pypi-qiniu