CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-qiniu

Comprehensive Python SDK for Qiniu Cloud Storage services enabling file upload, download, CDN management, SMS, and real-time communication features

Pending
Overview
Eval results
Files

data-processing.mddocs/

Data Processing

Persistent data processing operations for media files including audio, video, and image transformations. The processing system supports asynchronous operations with status monitoring and flexible command chaining.

Capabilities

Persistent File Operations

Asynchronous data processing with status monitoring through the PersistentFop class.

class PersistentFop:
    def __init__(self, auth: Auth, bucket: str, pipeline: str = None, notify_url: str = None):
        """
        Initialize persistent file operations manager.
        
        Args:
            auth: Auth instance for authentication
            bucket: Target bucket name
            pipeline: Processing pipeline name (optional)
            notify_url: Callback URL for processing completion (optional)
        """

    def execute(self, key: str, fops: str = None, force: bool = None, persistent_type: int = None, workflow_template_id: str = None) -> tuple:
        """
        Execute processing operations on a file.
        
        Args:
            key: File key to process
            fops: Processing operations string (semicolon-separated)
            force: Force processing even if output exists
            persistent_type: Processing type (1=normal, 2=workflow)
            workflow_template_id: Workflow template ID for batch processing
            
        Returns:
            (dict, ResponseInfo): Processing job info and response info
        """

    def get_status(self, persistent_id: str) -> tuple:
        """
        Get processing job status.
        
        Args:
            persistent_id: Processing job ID from execute() response
            
        Returns:
            (dict, ResponseInfo): Job status info and response info
        """

Processing Command Building

Helper functions for constructing processing command strings.

def build_op(cmd: str, first_arg: str, **kwargs) -> str:
    """
    Build processing operation command.
    
    Args:
        cmd: Processing command name (e.g., 'imageView2', 'avthumb')
        first_arg: First command argument
        **kwargs: Additional command parameters as key-value pairs
        
    Returns:
        Formatted processing operation string
    """

def pipe_cmd(*cmds: str) -> str:
    """
    Chain multiple processing commands with pipe operator.
    
    Args:
        *cmds: Processing command strings to chain
        
    Returns:
        Piped command string
    """

def op_save(op: str, bucket: str, key: str) -> str:
    """
    Add save operation to processing command.
    
    Args:
        op: Base processing operation
        bucket: Target bucket for saving result
        key: Target key for saving result
        
    Returns:
        Processing command with save operation
    """

Usage Examples

Image Processing

from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'source-bucket', pipeline='image-processing')

# Build image processing operations
# Resize to 800x600 and convert to WebP format
resize_op = build_op('imageView2', '2', w=800, h=600, format='webp')
watermark_op = build_op('watermark', '1', 
                       image='aHR0cDovL3d3dy5xaW5pdS5jb20vaW1hZ2VzL2xvZ28ucG5n',  # Base64 encoded watermark URL
                       dissolve=50,
                       gravity='SouthEast',
                       dx=10, dy=10)

# Chain operations and save result
fops = pipe_cmd(resize_op, watermark_op)
fops = op_save(fops, 'output-bucket', 'processed-image.webp')

# Execute processing
ret, info = pfop.execute('original-image.jpg', fops=fops)

if info.ok():
    persistent_id = ret['persistentId']
    print(f"Processing job started: {persistent_id}")
    
    # Check processing status
    import time
    while True:
        ret, info = pfop.get_status(persistent_id)
        if info.ok():
            status = ret['code']
            if status == 0:
                print("Processing completed successfully")
                for item in ret['items']:
                    if item['code'] == 0:
                        print(f"Output: {item['key']}")
                    else:
                        print(f"Error: {item['error']}")
                break
            elif status == 1:
                print("Processing in progress...")
                time.sleep(5)
            else:
                print(f"Processing failed: {ret['desc']}")
                break
        else:
            print(f"Status check failed: {info.error}")
            break

Video Processing

from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'video-bucket', 
                    pipeline='video-processing',
                    notify_url='https://api.example.com/processing-callback')

# Video transcoding operations
# Convert to MP4 H.264 with different quality levels
hd_transcode = build_op('avthumb', 'mp4', 
                       vcodec='libx264',
                       acodec='aac',
                       vb='2000k',
                       ab='128k',
                       r=30,
                       s='1920x1080')

sd_transcode = build_op('avthumb', 'mp4',
                       vcodec='libx264', 
                       acodec='aac',
                       vb='1000k',
                       ab='96k',
                       r=30,
                       s='1280x720')

# Create thumbnail
thumbnail = build_op('vframe', 'jpg', offset=10, w=320, h=240)

# Save operations
hd_fops = op_save(hd_transcode, 'output-bucket', 'video-hd.mp4')
sd_fops = op_save(sd_transcode, 'output-bucket', 'video-sd.mp4')
thumb_fops = op_save(thumbnail, 'output-bucket', 'video-thumb.jpg')

# Execute multiple operations
all_fops = f"{hd_fops};{sd_fops};{thumb_fops}"
ret, info = pfop.execute('source-video.mov', fops=all_fops, force=True)

if info.ok():
    print(f"Video processing started: {ret['persistentId']}")

Audio Processing

from qiniu import Auth, PersistentFop, build_op, op_save

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'audio-bucket')

# Audio format conversion and quality adjustment
mp3_convert = build_op('avthumb', 'mp3', ab='192k', ar=44100)
aac_convert = build_op('avthumb', 'aac', ab='128k', ar=44100)

# Audio effects
volume_adjust = build_op('avthumb', 'mp3', ab='192k', af='volume=1.5')
fade_effect = build_op('avthumb', 'mp3', ab='192k', af='afade=t=in:ss=0:d=3,afade=t=out:st=57:d=3')

# Execute conversions
mp3_fops = op_save(mp3_convert, 'output-bucket', 'audio.mp3')
aac_fops = op_save(aac_convert, 'output-bucket', 'audio.aac')

fops = f"{mp3_fops};{aac_fops}"
ret, info = pfop.execute('source-audio.wav', fops=fops)

Document Processing

from qiniu import Auth, PersistentFop, build_op, op_save

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'document-bucket')

# PDF to image conversion
pdf_to_image = build_op('yifangyun_preview', 'v2', 
                       type='pdf',
                       dpi=150,
                       page=1,
                       format='jpg')

# Document preview generation
doc_preview = build_op('yifangyun_preview', 'v2',
                      type='doc',
                      page=1,
                      format='png',
                      quality=85)

# Execute document processing
pdf_fops = op_save(pdf_to_image, 'output-bucket', 'document-page1.jpg')
ret, info = pfop.execute('document.pdf', fops=pdf_fops)

Workflow Template Processing

from qiniu import Auth, PersistentFop

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'workflow-bucket')

# Use predefined workflow template
ret, info = pfop.execute(
    key='input-file.jpg',
    workflow_template_id='workflow-template-123',
    persistent_type=2  # Workflow processing type
)

if info.ok():
    persistent_id = ret['persistentId']
    print(f"Workflow processing started: {persistent_id}")

Batch Processing with Status Monitoring

from qiniu import Auth, PersistentFop, build_op, op_save
import time
import threading

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'batch-bucket')

def monitor_processing(persistent_id, file_name):
    """Monitor processing status in separate thread"""
    while True:
        ret, info = pfop.get_status(persistent_id)
        if info.ok():
            status = ret['code']
            if status == 0:
                print(f"✓ {file_name} processing completed")
                break
            elif status == 1:
                print(f"⏳ {file_name} processing in progress...")
                time.sleep(10)
            else:
                print(f"✗ {file_name} processing failed: {ret['desc']}")
                break
        else:
            print(f"✗ {file_name} status check failed: {info.error}")
            break

# Process multiple files
files_to_process = ['image1.jpg', 'image2.png', 'image3.gif']
resize_op = build_op('imageView2', '2', w=400, h=300, format='webp')

for file_name in files_to_process:
    output_key = file_name.rsplit('.', 1)[0] + '_resized.webp'
    fops = op_save(resize_op, 'output-bucket', output_key)
    
    ret, info = pfop.execute(file_name, fops=fops)
    if info.ok():
        persistent_id = ret['persistentId']
        print(f"Started processing {file_name}: {persistent_id}")
        
        # Start monitoring in background thread
        thread = threading.Thread(target=monitor_processing, 
                                 args=(persistent_id, file_name))
        thread.daemon = True
        thread.start()
    else:
        print(f"Failed to start processing {file_name}: {info.error}")

# Wait for all processing to complete
input("Press Enter to exit...")

Custom Processing Pipeline

from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save

auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'pipeline-bucket', pipeline='custom-pipeline')

def create_image_variants(source_key, base_name):
    """Create multiple image variants from source"""
    
    # Define different sizes and formats
    variants = [
        {'suffix': '_thumb', 'w': 150, 'h': 150, 'format': 'jpg'},
        {'suffix': '_medium', 'w': 800, 'h': 600, 'format': 'webp'},
        {'suffix': '_large', 'w': 1920, 'h': 1440, 'format': 'webp'},
        {'suffix': '_avatar', 'w': 64, 'h': 64, 'format': 'png'}
    ]
    
    fops_list = []
    
    for variant in variants:
        # Build resize operation
        resize_op = build_op('imageView2', '2',
                            w=variant['w'],
                            h=variant['h'],
                            format=variant['format'])
        
        # Add quality optimization for WebP
        if variant['format'] == 'webp':
            quality_op = build_op('imageMogr2', 'quality', 75)
            combined_op = pipe_cmd(resize_op, quality_op)
        else:
            combined_op = resize_op
        
        # Create output key
        output_key = f"{base_name}{variant['suffix']}.{variant['format']}"
        
        # Add save operation
        fops = op_save(combined_op, 'variants-bucket', output_key)
        fops_list.append(fops)
    
    # Combine all operations
    all_fops = ';'.join(fops_list)
    
    # Execute processing
    ret, info = pfop.execute(source_key, fops=all_fops)
    return ret, info

# Process image with multiple variants
ret, info = create_image_variants('original-photo.jpg', 'photo')
if info.ok():
    print(f"Variant processing started: {ret['persistentId']}")

Install with Tessl CLI

npx tessl i tessl/pypi-qiniu

docs

authentication.md

cdn-management.md

cloud-computing.md

communication-services.md

configuration-utilities.md

data-processing.md

file-storage.md

index.md

tile.json