or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcdn-management.mdcloud-computing.mdcommunication-services.mdconfiguration-utilities.mddata-processing.mdfile-storage.mdindex.md

data-processing.mddocs/

0

# Data Processing

1

2

Persistent data processing operations for media files including audio, video, and image transformations. The processing system supports asynchronous operations with status monitoring and flexible command chaining.

3

4

## Capabilities

5

6

### Persistent File Operations

7

8

Asynchronous data processing with status monitoring through the `PersistentFop` class.

9

10

```python { .api }

11

class PersistentFop:

12

def __init__(self, auth: Auth, bucket: str, pipeline: str = None, notify_url: str = None):

13

"""

14

Initialize persistent file operations manager.

15

16

Args:

17

auth: Auth instance for authentication

18

bucket: Target bucket name

19

pipeline: Processing pipeline name (optional)

20

notify_url: Callback URL for processing completion (optional)

21

"""

22

23

def execute(self, key: str, fops: str = None, force: bool = None, persistent_type: int = None, workflow_template_id: str = None) -> tuple:

24

"""

25

Execute processing operations on a file.

26

27

Args:

28

key: File key to process

29

fops: Processing operations string (semicolon-separated)

30

force: Force processing even if output exists

31

persistent_type: Processing type (1=normal, 2=workflow)

32

workflow_template_id: Workflow template ID for batch processing

33

34

Returns:

35

(dict, ResponseInfo): Processing job info and response info

36

"""

37

38

def get_status(self, persistent_id: str) -> tuple:

39

"""

40

Get processing job status.

41

42

Args:

43

persistent_id: Processing job ID from execute() response

44

45

Returns:

46

(dict, ResponseInfo): Job status info and response info

47

"""

48

```

49

50

### Processing Command Building

51

52

Helper functions for constructing processing command strings.

53

54

```python { .api }

55

def build_op(cmd: str, first_arg: str, **kwargs) -> str:

56

"""

57

Build processing operation command.

58

59

Args:

60

cmd: Processing command name (e.g., 'imageView2', 'avthumb')

61

first_arg: First command argument

62

**kwargs: Additional command parameters as key-value pairs

63

64

Returns:

65

Formatted processing operation string

66

"""

67

68

def pipe_cmd(*cmds: str) -> str:

69

"""

70

Chain multiple processing commands with pipe operator.

71

72

Args:

73

*cmds: Processing command strings to chain

74

75

Returns:

76

Piped command string

77

"""

78

79

def op_save(op: str, bucket: str, key: str) -> str:

80

"""

81

Add save operation to processing command.

82

83

Args:

84

op: Base processing operation

85

bucket: Target bucket for saving result

86

key: Target key for saving result

87

88

Returns:

89

Processing command with save operation

90

"""

91

```

92

93

## Usage Examples

94

95

### Image Processing

96

97

```python

98

from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save

99

100

auth = Auth(access_key, secret_key)

101

pfop = PersistentFop(auth, 'source-bucket', pipeline='image-processing')

102

103

# Build image processing operations

104

# Resize to 800x600 and convert to WebP format

105

resize_op = build_op('imageView2', '2', w=800, h=600, format='webp')

106

watermark_op = build_op('watermark', '1',

107

image='aHR0cDovL3d3dy5xaW5pdS5jb20vaW1hZ2VzL2xvZ28ucG5n', # Base64 encoded watermark URL

108

dissolve=50,

109

gravity='SouthEast',

110

dx=10, dy=10)

111

112

# Chain operations and save result

113

fops = pipe_cmd(resize_op, watermark_op)

114

fops = op_save(fops, 'output-bucket', 'processed-image.webp')

115

116

# Execute processing

117

ret, info = pfop.execute('original-image.jpg', fops=fops)

118

119

if info.ok():

120

persistent_id = ret['persistentId']

121

print(f"Processing job started: {persistent_id}")

122

123

# Check processing status

124

import time

125

while True:

126

ret, info = pfop.get_status(persistent_id)

127

if info.ok():

128

status = ret['code']

129

if status == 0:

130

print("Processing completed successfully")

131

for item in ret['items']:

132

if item['code'] == 0:

133

print(f"Output: {item['key']}")

134

else:

135

print(f"Error: {item['error']}")

136

break

137

elif status == 1:

138

print("Processing in progress...")

139

time.sleep(5)

140

else:

141

print(f"Processing failed: {ret['desc']}")

142

break

143

else:

144

print(f"Status check failed: {info.error}")

145

break

146

```

147

148

### Video Processing

149

150

```python

151

from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save

152

153

auth = Auth(access_key, secret_key)

154

pfop = PersistentFop(auth, 'video-bucket',

155

pipeline='video-processing',

156

notify_url='https://api.example.com/processing-callback')

157

158

# Video transcoding operations

159

# Convert to MP4 H.264 with different quality levels

160

hd_transcode = build_op('avthumb', 'mp4',

161

vcodec='libx264',

162

acodec='aac',

163

vb='2000k',

164

ab='128k',

165

r=30,

166

s='1920x1080')

167

168

sd_transcode = build_op('avthumb', 'mp4',

169

vcodec='libx264',

170

acodec='aac',

171

vb='1000k',

172

ab='96k',

173

r=30,

174

s='1280x720')

175

176

# Create thumbnail

177

thumbnail = build_op('vframe', 'jpg', offset=10, w=320, h=240)

178

179

# Save operations

180

hd_fops = op_save(hd_transcode, 'output-bucket', 'video-hd.mp4')

181

sd_fops = op_save(sd_transcode, 'output-bucket', 'video-sd.mp4')

182

thumb_fops = op_save(thumbnail, 'output-bucket', 'video-thumb.jpg')

183

184

# Execute multiple operations

185

all_fops = f"{hd_fops};{sd_fops};{thumb_fops}"

186

ret, info = pfop.execute('source-video.mov', fops=all_fops, force=True)

187

188

if info.ok():

189

print(f"Video processing started: {ret['persistentId']}")

190

```

191

192

### Audio Processing

193

194

```python

195

from qiniu import Auth, PersistentFop, build_op, op_save

196

197

auth = Auth(access_key, secret_key)

198

pfop = PersistentFop(auth, 'audio-bucket')

199

200

# Audio format conversion and quality adjustment

201

mp3_convert = build_op('avthumb', 'mp3', ab='192k', ar=44100)

202

aac_convert = build_op('avthumb', 'aac', ab='128k', ar=44100)

203

204

# Audio effects

205

volume_adjust = build_op('avthumb', 'mp3', ab='192k', af='volume=1.5')

206

fade_effect = build_op('avthumb', 'mp3', ab='192k', af='afade=t=in:ss=0:d=3,afade=t=out:st=57:d=3')

207

208

# Execute conversions

209

mp3_fops = op_save(mp3_convert, 'output-bucket', 'audio.mp3')

210

aac_fops = op_save(aac_convert, 'output-bucket', 'audio.aac')

211

212

fops = f"{mp3_fops};{aac_fops}"

213

ret, info = pfop.execute('source-audio.wav', fops=fops)

214

```

215

216

### Document Processing

217

218

```python

219

from qiniu import Auth, PersistentFop, build_op, op_save

220

221

auth = Auth(access_key, secret_key)

222

pfop = PersistentFop(auth, 'document-bucket')

223

224

# PDF to image conversion

225

pdf_to_image = build_op('yifangyun_preview', 'v2',

226

type='pdf',

227

dpi=150,

228

page=1,

229

format='jpg')

230

231

# Document preview generation

232

doc_preview = build_op('yifangyun_preview', 'v2',

233

type='doc',

234

page=1,

235

format='png',

236

quality=85)

237

238

# Execute document processing

239

pdf_fops = op_save(pdf_to_image, 'output-bucket', 'document-page1.jpg')

240

ret, info = pfop.execute('document.pdf', fops=pdf_fops)

241

```

242

243

### Workflow Template Processing

244

245

```python

246

from qiniu import Auth, PersistentFop

247

248

auth = Auth(access_key, secret_key)

249

pfop = PersistentFop(auth, 'workflow-bucket')

250

251

# Use predefined workflow template

252

ret, info = pfop.execute(

253

key='input-file.jpg',

254

workflow_template_id='workflow-template-123',

255

persistent_type=2 # Workflow processing type

256

)

257

258

if info.ok():

259

persistent_id = ret['persistentId']

260

print(f"Workflow processing started: {persistent_id}")

261

```

262

263

### Batch Processing with Status Monitoring

264

265

```python

266

from qiniu import Auth, PersistentFop, build_op, op_save

267

import time

268

import threading

269

270

auth = Auth(access_key, secret_key)

271

pfop = PersistentFop(auth, 'batch-bucket')

272

273

def monitor_processing(persistent_id, file_name):

274

"""Monitor processing status in separate thread"""

275

while True:

276

ret, info = pfop.get_status(persistent_id)

277

if info.ok():

278

status = ret['code']

279

if status == 0:

280

print(f"✓ {file_name} processing completed")

281

break

282

elif status == 1:

283

print(f"⏳ {file_name} processing in progress...")

284

time.sleep(10)

285

else:

286

print(f"✗ {file_name} processing failed: {ret['desc']}")

287

break

288

else:

289

print(f"✗ {file_name} status check failed: {info.error}")

290

break

291

292

# Process multiple files

293

files_to_process = ['image1.jpg', 'image2.png', 'image3.gif']

294

resize_op = build_op('imageView2', '2', w=400, h=300, format='webp')

295

296

for file_name in files_to_process:

297

output_key = file_name.rsplit('.', 1)[0] + '_resized.webp'

298

fops = op_save(resize_op, 'output-bucket', output_key)

299

300

ret, info = pfop.execute(file_name, fops=fops)

301

if info.ok():

302

persistent_id = ret['persistentId']

303

print(f"Started processing {file_name}: {persistent_id}")

304

305

# Start monitoring in background thread

306

thread = threading.Thread(target=monitor_processing,

307

args=(persistent_id, file_name))

308

thread.daemon = True

309

thread.start()

310

else:

311

print(f"Failed to start processing {file_name}: {info.error}")

312

313

# Wait for all processing to complete

314

input("Press Enter to exit...")

315

```

316

317

### Custom Processing Pipeline

318

319

```python

320

from qiniu import Auth, PersistentFop, build_op, pipe_cmd, op_save

321

322

auth = Auth(access_key, secret_key)

323

pfop = PersistentFop(auth, 'pipeline-bucket', pipeline='custom-pipeline')

324

325

def create_image_variants(source_key, base_name):

326

"""Create multiple image variants from source"""

327

328

# Define different sizes and formats

329

variants = [

330

{'suffix': '_thumb', 'w': 150, 'h': 150, 'format': 'jpg'},

331

{'suffix': '_medium', 'w': 800, 'h': 600, 'format': 'webp'},

332

{'suffix': '_large', 'w': 1920, 'h': 1440, 'format': 'webp'},

333

{'suffix': '_avatar', 'w': 64, 'h': 64, 'format': 'png'}

334

]

335

336

fops_list = []

337

338

for variant in variants:

339

# Build resize operation

340

resize_op = build_op('imageView2', '2',

341

w=variant['w'],

342

h=variant['h'],

343

format=variant['format'])

344

345

# Add quality optimization for WebP

346

if variant['format'] == 'webp':

347

quality_op = build_op('imageMogr2', 'quality', 75)

348

combined_op = pipe_cmd(resize_op, quality_op)

349

else:

350

combined_op = resize_op

351

352

# Create output key

353

output_key = f"{base_name}{variant['suffix']}.{variant['format']}"

354

355

# Add save operation

356

fops = op_save(combined_op, 'variants-bucket', output_key)

357

fops_list.append(fops)

358

359

# Combine all operations

360

all_fops = ';'.join(fops_list)

361

362

# Execute processing

363

ret, info = pfop.execute(source_key, fops=all_fops)

364

return ret, info

365

366

# Process image with multiple variants

367

ret, info = create_image_variants('original-photo.jpg', 'photo')

368

if info.ok():

369

print(f"Variant processing started: {ret['persistentId']}")

370

```