Comprehensive Python SDK for Qiniu Cloud Storage services enabling file upload, download, CDN management, SMS, and real-time communication features
—
Persistent data processing operations for media files including audio, video, and image transformations. The processing system supports asynchronous operations with status monitoring and flexible command chaining.
Asynchronous data processing with status monitoring through the PersistentFop class.
class PersistentFop:
def __init__(self, auth: Auth, bucket: str, pipeline: str = None, notify_url: str = None):
"""
Initialize persistent file operations manager.
Args:
auth: Auth instance for authentication
bucket: Target bucket name
pipeline: Processing pipeline name (optional)
notify_url: Callback URL for processing completion (optional)
"""
def execute(self, key: str, fops: str = None, force: bool = None, persistent_type: int = None, workflow_template_id: str = None) -> tuple:
"""
Execute processing operations on a file.
Args:
key: File key to process
fops: Processing operations string (semicolon-separated)
force: Force processing even if output exists
persistent_type: Processing type (1=normal, 2=workflow)
workflow_template_id: Workflow template ID for batch processing
Returns:
(dict, ResponseInfo): Processing job info and response info
"""
def get_status(self, persistent_id: str) -> tuple:
"""
Get processing job status.
Args:
persistent_id: Processing job ID from execute() response
Returns:
(dict, ResponseInfo): Job status info and response info
"""Helper functions for constructing processing command strings.
def build_op(cmd: str, first_arg: str, **kwargs) -> str:
"""
Build processing operation command.
Args:
cmd: Processing command name (e.g., 'imageView2', 'avthumb')
first_arg: First command argument
**kwargs: Additional command parameters as key-value pairs
Returns:
Formatted processing operation string
"""
def pipe_cmd(*cmds: str) -> str:
"""
Chain multiple processing commands with pipe operator.
Args:
*cmds: Processing command strings to chain
Returns:
Piped command string
"""
def op_save(op: str, bucket: str, key: str) -> str:
"""
Add save operation to processing command.
Args:
op: Base processing operation
bucket: Target bucket for saving result
key: Target key for saving result
Returns:
Processing command with save operation
"""from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'source-bucket', pipeline='image-processing')
# Build image processing operations
# Resize to 800x600 and convert to WebP format
resize_op = build_op('imageView2', '2', w=800, h=600, format='webp')
watermark_op = build_op('watermark', '1',
image='aHR0cDovL3d3dy5xaW5pdS5jb20vaW1hZ2VzL2xvZ28ucG5n', # Base64 encoded watermark URL
dissolve=50,
gravity='SouthEast',
dx=10, dy=10)
# Chain operations and save result
fops = pipe_cmd(resize_op, watermark_op)
fops = op_save(fops, 'output-bucket', 'processed-image.webp')
# Execute processing
ret, info = pfop.execute('original-image.jpg', fops=fops)
if info.ok():
persistent_id = ret['persistentId']
print(f"Processing job started: {persistent_id}")
# Check processing status
import time
while True:
ret, info = pfop.get_status(persistent_id)
if info.ok():
status = ret['code']
if status == 0:
print("Processing completed successfully")
for item in ret['items']:
if item['code'] == 0:
print(f"Output: {item['key']}")
else:
print(f"Error: {item['error']}")
break
elif status == 1:
print("Processing in progress...")
time.sleep(5)
else:
print(f"Processing failed: {ret['desc']}")
break
else:
print(f"Status check failed: {info.error}")
breakfrom qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'video-bucket',
pipeline='video-processing',
notify_url='https://api.example.com/processing-callback')
# Video transcoding operations
# Convert to MP4 H.264 with different quality levels
hd_transcode = build_op('avthumb', 'mp4',
vcodec='libx264',
acodec='aac',
vb='2000k',
ab='128k',
r=30,
s='1920x1080')
sd_transcode = build_op('avthumb', 'mp4',
vcodec='libx264',
acodec='aac',
vb='1000k',
ab='96k',
r=30,
s='1280x720')
# Create thumbnail
thumbnail = build_op('vframe', 'jpg', offset=10, w=320, h=240)
# Save operations
hd_fops = op_save(hd_transcode, 'output-bucket', 'video-hd.mp4')
sd_fops = op_save(sd_transcode, 'output-bucket', 'video-sd.mp4')
thumb_fops = op_save(thumbnail, 'output-bucket', 'video-thumb.jpg')
# Execute multiple operations
all_fops = f"{hd_fops};{sd_fops};{thumb_fops}"
ret, info = pfop.execute('source-video.mov', fops=all_fops, force=True)
if info.ok():
print(f"Video processing started: {ret['persistentId']}")from qiniu import Auth, PersistentFop, build_op, op_save
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'audio-bucket')
# Audio format conversion and quality adjustment
mp3_convert = build_op('avthumb', 'mp3', ab='192k', ar=44100)
aac_convert = build_op('avthumb', 'aac', ab='128k', ar=44100)
# Audio effects
volume_adjust = build_op('avthumb', 'mp3', ab='192k', af='volume=1.5')
fade_effect = build_op('avthumb', 'mp3', ab='192k', af='afade=t=in:ss=0:d=3,afade=t=out:st=57:d=3')
# Execute conversions
mp3_fops = op_save(mp3_convert, 'output-bucket', 'audio.mp3')
aac_fops = op_save(aac_convert, 'output-bucket', 'audio.aac')
fops = f"{mp3_fops};{aac_fops}"
ret, info = pfop.execute('source-audio.wav', fops=fops)from qiniu import Auth, PersistentFop, build_op, op_save
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'document-bucket')
# PDF to image conversion
pdf_to_image = build_op('yifangyun_preview', 'v2',
type='pdf',
dpi=150,
page=1,
format='jpg')
# Document preview generation
doc_preview = build_op('yifangyun_preview', 'v2',
type='doc',
page=1,
format='png',
quality=85)
# Execute document processing
pdf_fops = op_save(pdf_to_image, 'output-bucket', 'document-page1.jpg')
ret, info = pfop.execute('document.pdf', fops=pdf_fops)from qiniu import Auth, PersistentFop
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'workflow-bucket')
# Use predefined workflow template
ret, info = pfop.execute(
key='input-file.jpg',
workflow_template_id='workflow-template-123',
persistent_type=2 # Workflow processing type
)
if info.ok():
persistent_id = ret['persistentId']
print(f"Workflow processing started: {persistent_id}")from qiniu import Auth, PersistentFop, build_op, op_save
import time
import threading
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'batch-bucket')
def monitor_processing(persistent_id, file_name):
"""Monitor processing status in separate thread"""
while True:
ret, info = pfop.get_status(persistent_id)
if info.ok():
status = ret['code']
if status == 0:
print(f"✓ {file_name} processing completed")
break
elif status == 1:
print(f"⏳ {file_name} processing in progress...")
time.sleep(10)
else:
print(f"✗ {file_name} processing failed: {ret['desc']}")
break
else:
print(f"✗ {file_name} status check failed: {info.error}")
break
# Process multiple files
files_to_process = ['image1.jpg', 'image2.png', 'image3.gif']
resize_op = build_op('imageView2', '2', w=400, h=300, format='webp')
for file_name in files_to_process:
output_key = file_name.rsplit('.', 1)[0] + '_resized.webp'
fops = op_save(resize_op, 'output-bucket', output_key)
ret, info = pfop.execute(file_name, fops=fops)
if info.ok():
persistent_id = ret['persistentId']
print(f"Started processing {file_name}: {persistent_id}")
# Start monitoring in background thread
thread = threading.Thread(target=monitor_processing,
args=(persistent_id, file_name))
thread.daemon = True
thread.start()
else:
print(f"Failed to start processing {file_name}: {info.error}")
# Wait for all processing to complete
input("Press Enter to exit...")from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save
auth = Auth(access_key, secret_key)
pfop = PersistentFop(auth, 'pipeline-bucket', pipeline='custom-pipeline')
def create_image_variants(source_key, base_name):
"""Create multiple image variants from source"""
# Define different sizes and formats
variants = [
{'suffix': '_thumb', 'w': 150, 'h': 150, 'format': 'jpg'},
{'suffix': '_medium', 'w': 800, 'h': 600, 'format': 'webp'},
{'suffix': '_large', 'w': 1920, 'h': 1440, 'format': 'webp'},
{'suffix': '_avatar', 'w': 64, 'h': 64, 'format': 'png'}
]
fops_list = []
for variant in variants:
# Build resize operation
resize_op = build_op('imageView2', '2',
w=variant['w'],
h=variant['h'],
format=variant['format'])
# Add quality optimization for WebP
if variant['format'] == 'webp':
quality_op = build_op('imageMogr2', 'quality', 75)
combined_op = pipe_cmd(resize_op, quality_op)
else:
combined_op = resize_op
# Create output key
output_key = f"{base_name}{variant['suffix']}.{variant['format']}"
# Add save operation
fops = op_save(combined_op, 'variants-bucket', output_key)
fops_list.append(fops)
# Combine all operations
all_fops = ';'.join(fops_list)
# Execute processing
ret, info = pfop.execute(source_key, fops=all_fops)
return ret, info
# Process image with multiple variants
ret, info = create_image_variants('original-photo.jpg', 'photo')
if info.ok():
print(f"Variant processing started: {ret['persistentId']}")Install with Tessl CLI
npx tessl i tessl/pypi-qiniu