0
# Data Processing
1
2
Persistent data processing operations for media files including audio, video, and image transformations. The processing system supports asynchronous operations with status monitoring and flexible command chaining.
3
4
## Capabilities
5
6
### Persistent File Operations
7
8
Asynchronous data processing with status monitoring through the `PersistentFop` class.
9
10
```python { .api }
11
class PersistentFop:
12
def __init__(self, auth: Auth, bucket: str, pipeline: str = None, notify_url: str = None):
13
"""
14
Initialize persistent file operations manager.
15
16
Args:
17
auth: Auth instance for authentication
18
bucket: Target bucket name
19
pipeline: Processing pipeline name (optional)
20
notify_url: Callback URL for processing completion (optional)
21
"""
22
23
def execute(self, key: str, fops: str = None, force: bool = None, persistent_type: int = None, workflow_template_id: str = None) -> tuple:
24
"""
25
Execute processing operations on a file.
26
27
Args:
28
key: File key to process
29
fops: Processing operations string (semicolon-separated)
30
force: Force processing even if output exists
31
persistent_type: Processing type (1=normal, 2=workflow)
32
workflow_template_id: Workflow template ID for batch processing
33
34
Returns:
35
(dict, ResponseInfo): Processing job info and response info
36
"""
37
38
def get_status(self, persistent_id: str) -> tuple:
39
"""
40
Get processing job status.
41
42
Args:
43
persistent_id: Processing job ID from execute() response
44
45
Returns:
46
(dict, ResponseInfo): Job status info and response info
47
"""
48
```
49
50
### Processing Command Building
51
52
Helper functions for constructing processing command strings.
53
54
```python { .api }
55
def build_op(cmd: str, first_arg: str, **kwargs) -> str:
56
"""
57
Build processing operation command.
58
59
Args:
60
cmd: Processing command name (e.g., 'imageView2', 'avthumb')
61
first_arg: First command argument
62
**kwargs: Additional command parameters as key-value pairs
63
64
Returns:
65
Formatted processing operation string
66
"""
67
68
def pipe_cmd(*cmds: str) -> str:
69
"""
70
Chain multiple processing commands with pipe operator.
71
72
Args:
73
*cmds: Processing command strings to chain
74
75
Returns:
76
Piped command string
77
"""
78
79
def op_save(op: str, bucket: str, key: str) -> str:
80
"""
81
Add save operation to processing command.
82
83
Args:
84
op: Base processing operation
85
bucket: Target bucket for saving result
86
key: Target key for saving result
87
88
Returns:
89
Processing command with save operation
90
"""
91
```
92
93
## Usage Examples
94
95
### Image Processing
96
97
```python
98
from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save
99
100
auth = Auth(access_key, secret_key)
101
pfop = PersistentFop(auth, 'source-bucket', pipeline='image-processing')
102
103
# Build image processing operations
104
# Resize to 800x600 and convert to WebP format
105
resize_op = build_op('imageView2', '2', w=800, h=600, format='webp')
106
watermark_op = build_op('watermark', '1',
107
image='aHR0cDovL3d3dy5xaW5pdS5jb20vaW1hZ2VzL2xvZ28ucG5n', # Base64 encoded watermark URL
108
dissolve=50,
109
gravity='SouthEast',
110
dx=10, dy=10)
111
112
# Chain operations and save result
113
fops = pipe_cmd(resize_op, watermark_op)
114
fops = op_save(fops, 'output-bucket', 'processed-image.webp')
115
116
# Execute processing
117
ret, info = pfop.execute('original-image.jpg', fops=fops)
118
119
if info.ok():
120
persistent_id = ret['persistentId']
121
print(f"Processing job started: {persistent_id}")
122
123
# Check processing status
124
import time
125
while True:
126
ret, info = pfop.get_status(persistent_id)
127
if info.ok():
128
status = ret['code']
129
if status == 0:
130
print("Processing completed successfully")
131
for item in ret['items']:
132
if item['code'] == 0:
133
print(f"Output: {item['key']}")
134
else:
135
print(f"Error: {item['error']}")
136
break
137
elif status == 1:
138
print("Processing in progress...")
139
time.sleep(5)
140
else:
141
print(f"Processing failed: {ret['desc']}")
142
break
143
else:
144
print(f"Status check failed: {info.error}")
145
break
146
```
147
148
### Video Processing
149
150
```python
151
from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save
152
153
auth = Auth(access_key, secret_key)
154
pfop = PersistentFop(auth, 'video-bucket',
155
pipeline='video-processing',
156
notify_url='https://api.example.com/processing-callback')
157
158
# Video transcoding operations
159
# Convert to MP4 H.264 with different quality levels
160
hd_transcode = build_op('avthumb', 'mp4',
161
vcodec='libx264',
162
acodec='aac',
163
vb='2000k',
164
ab='128k',
165
r=30,
166
s='1920x1080')
167
168
sd_transcode = build_op('avthumb', 'mp4',
169
vcodec='libx264',
170
acodec='aac',
171
vb='1000k',
172
ab='96k',
173
r=30,
174
s='1280x720')
175
176
# Create thumbnail
177
thumbnail = build_op('vframe', 'jpg', offset=10, w=320, h=240)
178
179
# Save operations
180
hd_fops = op_save(hd_transcode, 'output-bucket', 'video-hd.mp4')
181
sd_fops = op_save(sd_transcode, 'output-bucket', 'video-sd.mp4')
182
thumb_fops = op_save(thumbnail, 'output-bucket', 'video-thumb.jpg')
183
184
# Execute multiple operations
185
all_fops = f"{hd_fops};{sd_fops};{thumb_fops}"
186
ret, info = pfop.execute('source-video.mov', fops=all_fops, force=True)
187
188
if info.ok():
189
print(f"Video processing started: {ret['persistentId']}")
190
```
191
192
### Audio Processing
193
194
```python
195
from qiniu import Auth, PersistentFop, build_op, op_save
196
197
auth = Auth(access_key, secret_key)
198
pfop = PersistentFop(auth, 'audio-bucket')
199
200
# Audio format conversion and quality adjustment
201
mp3_convert = build_op('avthumb', 'mp3', ab='192k', ar=44100)
202
aac_convert = build_op('avthumb', 'aac', ab='128k', ar=44100)
203
204
# Audio effects
205
volume_adjust = build_op('avthumb', 'mp3', ab='192k', af='volume=1.5')
206
fade_effect = build_op('avthumb', 'mp3', ab='192k', af='afade=t=in:ss=0:d=3,afade=t=out:st=57:d=3')
207
208
# Execute conversions
209
mp3_fops = op_save(mp3_convert, 'output-bucket', 'audio.mp3')
210
aac_fops = op_save(aac_convert, 'output-bucket', 'audio.aac')
211
212
fops = f"{mp3_fops};{aac_fops}"
213
ret, info = pfop.execute('source-audio.wav', fops=fops)
214
```
215
216
### Document Processing
217
218
```python
219
from qiniu import Auth, PersistentFop, build_op, op_save
220
221
auth = Auth(access_key, secret_key)
222
pfop = PersistentFop(auth, 'document-bucket')
223
224
# PDF to image conversion
225
pdf_to_image = build_op('yifangyun_preview', 'v2',
226
type='pdf',
227
dpi=150,
228
page=1,
229
format='jpg')
230
231
# Document preview generation
232
doc_preview = build_op('yifangyun_preview', 'v2',
233
type='doc',
234
page=1,
235
format='png',
236
quality=85)
237
238
# Execute document processing
239
pdf_fops = op_save(pdf_to_image, 'output-bucket', 'document-page1.jpg')
240
ret, info = pfop.execute('document.pdf', fops=pdf_fops)
241
```
242
243
### Workflow Template Processing
244
245
```python
246
from qiniu import Auth, PersistentFop
247
248
auth = Auth(access_key, secret_key)
249
pfop = PersistentFop(auth, 'workflow-bucket')
250
251
# Use predefined workflow template
252
ret, info = pfop.execute(
253
key='input-file.jpg',
254
workflow_template_id='workflow-template-123',
255
persistent_type=2 # Workflow processing type
256
)
257
258
if info.ok():
259
persistent_id = ret['persistentId']
260
print(f"Workflow processing started: {persistent_id}")
261
```
262
263
### Batch Processing with Status Monitoring
264
265
```python
266
from qiniu import Auth, PersistentFop, build_op, op_save
267
import time
268
import threading
269
270
auth = Auth(access_key, secret_key)
271
pfop = PersistentFop(auth, 'batch-bucket')
272
273
def monitor_processing(persistent_id, file_name):
274
"""Monitor processing status in separate thread"""
275
while True:
276
ret, info = pfop.get_status(persistent_id)
277
if info.ok():
278
status = ret['code']
279
if status == 0:
280
print(f"✓ {file_name} processing completed")
281
break
282
elif status == 1:
283
print(f"⏳ {file_name} processing in progress...")
284
time.sleep(10)
285
else:
286
print(f"✗ {file_name} processing failed: {ret['desc']}")
287
break
288
else:
289
print(f"✗ {file_name} status check failed: {info.error}")
290
break
291
292
# Process multiple files
293
files_to_process = ['image1.jpg', 'image2.png', 'image3.gif']
294
resize_op = build_op('imageView2', '2', w=400, h=300, format='webp')
295
296
for file_name in files_to_process:
297
output_key = file_name.rsplit('.', 1)[0] + '_resized.webp'
298
fops = op_save(resize_op, 'output-bucket', output_key)
299
300
ret, info = pfop.execute(file_name, fops=fops)
301
if info.ok():
302
persistent_id = ret['persistentId']
303
print(f"Started processing {file_name}: {persistent_id}")
304
305
# Start monitoring in background thread
306
thread = threading.Thread(target=monitor_processing,
307
args=(persistent_id, file_name))
308
thread.daemon = True
309
thread.start()
310
else:
311
print(f"Failed to start processing {file_name}: {info.error}")
312
313
# Wait for all processing to complete
314
input("Press Enter to exit...")
315
```
316
317
### Custom Processing Pipeline
318
319
```python
320
from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save
321
322
auth = Auth(access_key, secret_key)
323
pfop = PersistentFop(auth, 'pipeline-bucket', pipeline='custom-pipeline')
324
325
def create_image_variants(source_key, base_name):
326
"""Create multiple image variants from source"""
327
328
# Define different sizes and formats
329
variants = [
330
{'suffix': '_thumb', 'w': 150, 'h': 150, 'format': 'jpg'},
331
{'suffix': '_medium', 'w': 800, 'h': 600, 'format': 'webp'},
332
{'suffix': '_large', 'w': 1920, 'h': 1440, 'format': 'webp'},
333
{'suffix': '_avatar', 'w': 64, 'h': 64, 'format': 'png'}
334
]
335
336
fops_list = []
337
338
for variant in variants:
339
# Build resize operation
340
resize_op = build_op('imageView2', '2',
341
w=variant['w'],
342
h=variant['h'],
343
format=variant['format'])
344
345
# Add quality optimization for WebP
346
if variant['format'] == 'webp':
347
quality_op = build_op('imageMogr2', 'quality', 75)
348
combined_op = pipe_cmd(resize_op, quality_op)
349
else:
350
combined_op = resize_op
351
352
# Create output key
353
output_key = f"{base_name}{variant['suffix']}.{variant['format']}"
354
355
# Add save operation
356
fops = op_save(combined_op, 'variants-bucket', output_key)
357
fops_list.append(fops)
358
359
# Combine all operations
360
all_fops = ';'.join(fops_list)
361
362
# Execute processing
363
ret, info = pfop.execute(source_key, fops=all_fops)
364
return ret, info
365
366
# Process image with multiple variants
367
ret, info = create_image_variants('original-photo.jpg', 'photo')
368
if info.ok():
369
print(f"Variant processing started: {ret['persistentId']}")
370
```