CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-qiniu

Comprehensive Python SDK for Qiniu Cloud Storage services enabling file upload, download, CDN management, SMS, and real-time communication features

Pending
Overview
Eval results
Files

file-storage.mddocs/

File Storage

Complete file upload, download, and management operations with support for resumable uploads, batch operations, and comprehensive bucket management.

Capabilities

File Upload Operations

High-level upload functions supporting various data sources with resumable upload capabilities.

def put_file_v2(up_token: str, key: str, file_path: str, params: dict = None, mime_type: str = 'application/octet-stream', check_crc: bool = False, progress_handler = None, upload_progress_recorder = None, keep_last_modified: bool = False, part_size: int = None, version: str = 'v2', bucket_name: str = None, metadata: dict = None, regions = None, accelerate_uploading: bool = False) -> tuple:
    """
    Upload file from local path (current version - recommended).
    
    Args:
        up_token: Upload token from Auth.upload_token()
        key: File key in bucket
        file_path: Local file path
        params: Custom upload parameters
        mime_type: File MIME type
        check_crc: Enable CRC32 checksum verification
        progress_handler: Callback for upload progress (func(uploaded, total))
        upload_progress_recorder: UploadProgressRecorder for resumable uploads
        keep_last_modified: Preserve original file modification time
        part_size: Chunk size for resumable upload (default: 4MB)
        version: Upload version ('v1' or 'v2')
        bucket_name: Target bucket name
        metadata: Custom metadata dictionary
        regions: Region configuration
        accelerate_uploading: Enable upload acceleration
        
    Returns:
        (dict, ResponseInfo): Upload result and response info
    """

def put_data(up_token: str, key: str, data: bytes, params: dict = None, mime_type: str = 'application/octet-stream', check_crc: bool = False, progress_handler = None, fname: str = None, hostscache_dir: str = None, metadata: dict = None, regions = None, accelerate_uploading: bool = False) -> tuple:
    """
    Upload binary data directly.
    
    Args:
        up_token: Upload token
        key: File key in bucket
        data: Binary data to upload
        params: Custom upload parameters
        mime_type: Data MIME type
        check_crc: Enable CRC32 verification
        progress_handler: Progress callback function
        fname: Original filename for metadata
        hostscache_dir: Host cache directory
        metadata: Custom metadata
        regions: Region configuration
        accelerate_uploading: Enable acceleration
        
    Returns:
        (dict, ResponseInfo): Upload result and response info
    """

def put_stream_v2(up_token: str, key: str, input_stream, file_name: str, data_size: int, params: dict = None, mime_type: str = None, progress_handler = None, upload_progress_recorder = None, modify_time: int = None, keep_last_modified: bool = False, part_size: int = None, version: str = 'v2', bucket_name: str = None, metadata: dict = None, regions = None, accelerate_uploading: bool = False) -> tuple:
    """
    Upload from input stream (current version - recommended).
    
    Args:
        up_token: Upload token
        key: File key in bucket
        input_stream: Input stream object (file-like)
        file_name: Original filename
        data_size: Total data size in bytes
        params: Custom upload parameters
        mime_type: Stream MIME type
        progress_handler: Progress callback function
        upload_progress_recorder: Progress recorder for resumable uploads
        modify_time: File modification timestamp
        keep_last_modified: Preserve modification time
        part_size: Chunk size for upload
        version: Upload version
        bucket_name: Target bucket name
        metadata: Custom metadata
        regions: Region configuration
        accelerate_uploading: Enable acceleration
        
    Returns:
        (dict, ResponseInfo): Upload result and response info
    """

def put_file(up_token: str, key: str, file_path: str, **kwargs) -> tuple:
    """
    Upload file from local path (deprecated - use put_file_v2).
    
    Args:
        up_token: Upload token
        key: File key
        file_path: Local file path
        **kwargs: Additional parameters
        
    Returns:
        (dict, ResponseInfo): Upload result and response info
    """

def put_stream(up_token: str, key: str, input_stream, file_name: str, data_size: int, **kwargs) -> tuple:
    """
    Upload from input stream (deprecated - use put_stream_v2).
    
    Args:
        up_token: Upload token
        key: File key
        input_stream: Input stream
        file_name: Original filename
        data_size: Data size
        **kwargs: Additional parameters
        
    Returns:
        (dict, ResponseInfo): Upload result and response info
    """

Bucket and File Management

Comprehensive bucket and file management operations through the BucketManager class.

class BucketManager:
    def __init__(self, auth: Auth, zone = None, regions = None, query_regions_endpoints = None, preferred_scheme: str = 'http'):
        """
        Initialize bucket manager.
        
        Args:
            auth: Auth instance for authentication
            zone: Upload zone configuration (deprecated, use regions)
            regions: Region configuration
            query_regions_endpoints: Custom region query endpoints
            preferred_scheme: Preferred URL scheme ('http' or 'https')
        """

    def list(self, bucket: str, prefix: str = None, marker: str = None, limit: int = None, delimiter: str = None) -> tuple:
        """
        List files in bucket.
        
        Args:
            bucket: Bucket name
            prefix: Key prefix filter
            marker: Pagination marker (from previous response)
            limit: Maximum number of files to return
            delimiter: Delimiter for hierarchical listing
            
        Returns:
            (list, bool, str): (files_list, has_more, next_marker)
        """

    def stat(self, bucket: str, key: str) -> tuple:
        """
        Get file information.
        
        Args:
            bucket: Bucket name
            key: File key
            
        Returns:
            (dict, ResponseInfo): File info and response info
        """

    def delete(self, bucket: str, key: str) -> tuple:
        """
        Delete file.
        
        Args:
            bucket: Bucket name
            key: File key
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def rename(self, bucket: str, key: str, key_to: str, force: str = 'false') -> tuple:
        """
        Rename file within bucket.
        
        Args:
            bucket: Bucket name
            key: Current file key
            key_to: New file key
            force: Force overwrite if target exists
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def move(self, bucket: str, key: str, bucket_to: str, key_to: str, force: str = 'false') -> tuple:
        """
        Move file to different bucket/key.
        
        Args:
            bucket: Source bucket name
            key: Source file key
            bucket_to: Destination bucket name
            key_to: Destination file key
            force: Force overwrite if target exists
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def copy(self, bucket: str, key: str, bucket_to: str, key_to: str, force: str = 'false') -> tuple:
        """
        Copy file to different bucket/key.
        
        Args:
            bucket: Source bucket name
            key: Source file key
            bucket_to: Destination bucket name
            key_to: Destination file key
            force: Force overwrite if target exists
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def fetch(self, url: str, bucket: str, key: str = None, hostscache_dir: str = None) -> tuple:
        """
        Fetch file from external URL.
        
        Args:
            url: External file URL
            bucket: Target bucket name
            key: Target file key (auto-generated if None)
            hostscache_dir: Host cache directory
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def prefetch(self, bucket: str, key: str, hostscache_dir: str = None) -> tuple:
        """
        Prefetch file from origin (mirror storage).
        
        Args:
            bucket: Bucket name
            key: File key
            hostscache_dir: Host cache directory
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def change_mime(self, bucket: str, key: str, mime: str) -> tuple:
        """
        Change file MIME type.
        
        Args:
            bucket: Bucket name
            key: File key
            mime: New MIME type
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def change_type(self, bucket: str, key: str, storage_type: int) -> tuple:
        """
        Change file storage type.
        
        Args:
            bucket: Bucket name
            key: File key
            storage_type: Storage type (0=Standard, 1=IA, 2=Archive)
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def change_status(self, bucket: str, key: str, status: int, cond: dict = None) -> tuple:
        """
        Change file status.
        
        Args:
            bucket: Bucket name
            key: File key
            status: New status
            cond: Conditional parameters
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def delete_after_days(self, bucket: str, key: str, days: int) -> tuple:
        """
        Set file deletion schedule.
        
        Args:
            bucket: Bucket name
            key: File key
            days: Days until deletion
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def restore_ar(self, bucket: str, key: str, freeze_after_days: int) -> tuple:
        """
        Restore archived file.
        
        Args:
            bucket: Bucket name
            key: File key
            freeze_after_days: Days to keep restored before re-archiving
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def buckets(self) -> tuple:
        """
        List all accessible buckets.
        
        Returns:
            (list, ResponseInfo): Bucket list and response info
        """

    def mkbucketv3(self, bucket_name: str, region: str) -> tuple:
        """
        Create new bucket.
        
        Args:
            bucket_name: Name for new bucket
            region: Target region identifier
            
        Returns:
            (dict, ResponseInfo): Result and response info
        """

    def bucket_info(self, bucket_name: str) -> tuple:
        """
        Get bucket information.
        
        Args:
            bucket_name: Bucket name
            
        Returns:
            (dict, ResponseInfo): Bucket info and response info
        """

    def list_domains(self, bucket: str) -> tuple:
        """
        List bucket domains.
        
        Args:
            bucket: Bucket name
            
        Returns:
            (list, ResponseInfo): Domain list and response info
        """

    def batch(self, operations: list) -> tuple:
        """
        Execute batch operations.
        
        Args:
            operations: List of operation commands
            
        Returns:
            (list, ResponseInfo): Results list and response info
        """

Batch Operations

Helper functions to build batch operation commands for efficient bulk operations.

def build_batch_copy(source_bucket: str, key_pairs: list, target_bucket: str, force: str = 'false') -> list:
    """
    Build batch copy operations.
    
    Args:
        source_bucket: Source bucket name
        key_pairs: List of (source_key, target_key) tuples
        target_bucket: Target bucket name
        force: Force overwrite existing files
        
    Returns:
        List of batch operation commands
    """

def build_batch_rename(bucket: str, key_pairs: list, force: str = 'false') -> list:
    """
    Build batch rename operations.
    
    Args:
        bucket: Bucket name
        key_pairs: List of (old_key, new_key) tuples
        force: Force overwrite existing files
        
    Returns:
        List of batch operation commands
    """

def build_batch_move(source_bucket: str, key_pairs: list, target_bucket: str, force: str = 'false') -> list:
    """
    Build batch move operations.
    
    Args:
        source_bucket: Source bucket name
        key_pairs: List of (source_key, target_key) tuples
        target_bucket: Target bucket name
        force: Force overwrite existing files
        
    Returns:
        List of batch operation commands
    """

def build_batch_stat(bucket: str, keys: list) -> list:
    """
    Build batch stat operations.
    
    Args:
        bucket: Bucket name
        keys: List of file keys
        
    Returns:
        List of batch operation commands
    """

def build_batch_delete(bucket: str, keys: list) -> list:
    """
    Build batch delete operations.
    
    Args:
        bucket: Bucket name
        keys: List of file keys to delete
        
    Returns:
        List of batch operation commands
    """

def build_batch_restore_ar(bucket: str, keys: list) -> list:
    """
    Build batch restore archive operations.
    
    Args:
        bucket: Bucket name
        keys: List of archived file keys
        
    Returns:
        List of batch operation commands
    """

Upload Progress Recording

Persistent progress tracking for resumable uploads.

class UploadProgressRecorder:
    def __init__(self, record_folder: str = None):
        """
        Initialize upload progress recorder.
        
        Args:
            record_folder: Directory to store progress records (default: temp dir)
        """

    def has_upload_record(self, file_name: str, key: str) -> bool:
        """
        Check if upload record exists.
        
        Args:
            file_name: Local file name
            key: Upload key
            
        Returns:
            True if record exists
        """

    def get_upload_record(self, file_name: str, key: str) -> dict:
        """
        Get upload progress record.
        
        Args:
            file_name: Local file name
            key: Upload key
            
        Returns:
            Progress record dictionary
        """

    def set_upload_record(self, file_name: str, key: str, data: dict):
        """
        Save upload progress record.
        
        Args:
            file_name: Local file name
            key: Upload key
            data: Progress data to save
        """

    def delete_upload_record(self, file_name: str, key: str):
        """
        Delete upload progress record.
        
        Args:
            file_name: Local file name
            key: Upload key
        """

Usage Examples

Basic File Upload

from qiniu import Auth, put_file_v2

# Initialize authentication
auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket', 'my-file.jpg')

# Upload file
ret, info = put_file_v2(token, 'my-file.jpg', './local/path/image.jpg')

if info.ok():
    print(f"Upload successful: {ret['key']} -> {ret['hash']}")
else:
    print(f"Upload failed: {info.error}")

Resumable Upload with Progress

from qiniu import Auth, put_file_v2, UploadProgressRecorder

def progress_handler(uploaded_bytes, total_bytes):
    progress = uploaded_bytes / total_bytes * 100
    print(f"Upload progress: {progress:.2f}%")

auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket')

# Enable resumable upload with progress tracking
recorder = UploadProgressRecorder()
ret, info = put_file_v2(
    token, 
    'large-file.zip', 
    './large-file.zip',
    progress_handler=progress_handler,
    upload_progress_recorder=recorder,
    part_size=8 * 1024 * 1024  # 8MB chunks
)

Upload Binary Data

from qiniu import Auth, put_data

auth = Auth(access_key, secret_key)
token = auth.upload_token('my-bucket', 'data.json')

# Upload JSON data
import json
data = json.dumps({"message": "Hello Qiniu"}).encode('utf-8')
ret, info = put_data(token, 'data.json', data, mime_type='application/json')

File Management Operations

from qiniu import Auth, BucketManager

auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)

# Get file info
ret, info = bucket_manager.stat('my-bucket', 'my-file.jpg')
if info.ok():
    print(f"File size: {ret['fsize']} bytes")
    print(f"MIME type: {ret['mimeType']}")

# List files with prefix
files, has_more, marker = bucket_manager.list('my-bucket', prefix='images/', limit=100)
for file in files['items']:
    print(f"File: {file['key']}, Size: {file['fsize']}")

# Copy file
ret, info = bucket_manager.copy('my-bucket', 'original.jpg', 'my-bucket', 'backup.jpg')

# Delete file
ret, info = bucket_manager.delete('my-bucket', 'old-file.jpg')

Batch Operations

from qiniu import Auth, BucketManager, build_batch_delete, build_batch_copy

auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)

# Batch delete multiple files
keys_to_delete = ['file1.jpg', 'file2.png', 'file3.pdf']
delete_ops = build_batch_delete('my-bucket', keys_to_delete)
results, info = bucket_manager.batch(delete_ops)

# Batch copy files
key_pairs = [('src1.jpg', 'dst1.jpg'), ('src2.png', 'dst2.png')]
copy_ops = build_batch_copy('source-bucket', key_pairs, 'target-bucket')
results, info = bucket_manager.batch(copy_ops)

# Check results
for i, result in enumerate(results):
    if result['code'] == 200:
        print(f"Operation {i} succeeded")
    else:
        print(f"Operation {i} failed: {result}")

Fetch External Files

from qiniu import Auth, BucketManager

auth = Auth(access_key, secret_key)
bucket_manager = BucketManager(auth)

# Fetch file from external URL
external_url = 'https://example.com/image.jpg'
ret, info = bucket_manager.fetch(external_url, 'my-bucket', 'fetched-image.jpg')

if info.ok():
    print(f"Fetched file: {ret['key']}")
    print(f"File size: {ret['fsize']} bytes")

Install with Tessl CLI

npx tessl i tessl/pypi-qiniu

docs

authentication.md

cdn-management.md

cloud-computing.md

communication-services.md

configuration-utilities.md

data-processing.md

file-storage.md

index.md

tile.json