or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queuing.mdconnection-pooling.mdconnectivity.mddata-types.mddatabase-objects.mdindex.mdlobs.mdpipeline.mdsoda.mdsql-execution.mdsubscriptions.md

pipeline.mddocs/

0

# Pipeline Operations

1

2

Batch multiple database operations for improved performance using pipelining. Pipeline operations enable grouping SQL executions, fetch operations, and stored procedure calls into batches that are executed together, reducing network round-trips and improving throughput for high-volume operations.

3

4

## Capabilities

5

6

### Pipeline Class

7

8

Create and execute batched database operations with support for various operation types.

9

10

```python { .api }

11

class Pipeline:

12

"""Pipeline for batching database operations."""

13

14

def add_execute(self, statement, parameters=None) -> None:

15

"""

16

Add an execute operation to the pipeline.

17

18

Parameters:

19

- statement (str): SQL statement to execute

20

- parameters (dict|list|tuple): Bind parameters for the statement

21

"""

22

23

def add_executemany(self, statement, parameters) -> None:

24

"""

25

Add an executemany operation to the pipeline.

26

27

Parameters:

28

- statement (str): SQL statement to execute

29

- parameters (list): List of parameter sets

30

"""

31

32

def add_fetchall(self) -> None:

33

"""Add a fetchall operation to the pipeline."""

34

35

def add_fetchone(self) -> None:

36

"""Add a fetchone operation to the pipeline."""

37

38

def add_fetchmany(self, size=None) -> None:

39

"""

40

Add a fetchmany operation to the pipeline.

41

42

Parameters:

43

- size (int): Number of rows to fetch (default: cursor arraysize)

44

"""

45

46

def add_callfunc(self, name, return_type, parameters=None) -> None:

47

"""

48

Add a function call operation to the pipeline.

49

50

Parameters:

51

- name (str): Function name

52

- return_type: Expected return type

53

- parameters (list): Function parameters

54

"""

55

56

def add_callproc(self, name, parameters=None) -> None:

57

"""

58

Add a procedure call operation to the pipeline.

59

60

Parameters:

61

- name (str): Procedure name

62

- parameters (list): Procedure parameters

63

"""

64

65

def add_commit(self) -> None:

66

"""Add a commit operation to the pipeline."""

67

68

def execute(self) -> list:

69

"""

70

Execute all operations in the pipeline.

71

72

Returns:

73

list: List of PipelineOpResult objects, one for each operation

74

"""

75

```

76

77

### PipelineOp Class

78

79

Represent individual pipeline operations with metadata about the operation type.

80

81

```python { .api }

82

class PipelineOp:

83

"""Individual pipeline operation."""

84

85

# Properties

86

op_type: int # Operation type (PIPELINE_OP_TYPE_*)

87

```

88

89

### PipelineOpResult Class

90

91

Result of a pipeline operation containing the operation outcome and any returned data.

92

93

```python { .api }

94

class PipelineOpResult:

95

"""Result of a pipeline operation."""

96

97

# Properties contain operation-specific results

98

# For execute operations: affected row count

99

# For fetch operations: fetched rows

100

# For function calls: return value

101

# For procedure calls: modified parameters

102

```

103

104

### Pipeline Creation

105

106

Create pipeline instances for batching operations.

107

108

```python { .api }

109

def create_pipeline() -> Pipeline:

110

"""

111

Create a new pipeline for batching operations.

112

113

Returns:

114

Pipeline: New pipeline instance

115

"""

116

```

117

118

### Pipeline Operation Type Constants

119

120

Constants identifying different types of pipeline operations.

121

122

```python { .api }

123

# Pipeline Operation Types

124

PIPELINE_OP_TYPE_EXECUTE: int # Execute SQL statement

125

PIPELINE_OP_TYPE_EXECUTE_MANY: int # Execute SQL with multiple parameter sets

126

PIPELINE_OP_TYPE_FETCH_ALL: int # Fetch all rows

127

PIPELINE_OP_TYPE_FETCH_ONE: int # Fetch single row

128

PIPELINE_OP_TYPE_FETCH_MANY: int # Fetch multiple rows

129

PIPELINE_OP_TYPE_CALL_FUNC: int # Call stored function

130

PIPELINE_OP_TYPE_CALL_PROC: int # Call stored procedure

131

PIPELINE_OP_TYPE_COMMIT: int # Commit transaction

132

```

133

134

## Usage Examples

135

136

### Basic Pipeline Operations

137

138

```python

139

import oracledb

140

141

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

142

143

# Create a pipeline

144

pipeline = oracledb.create_pipeline()

145

146

# Add multiple operations to the pipeline

147

pipeline.add_execute("""

148

INSERT INTO employees (employee_id, first_name, last_name, hire_date)

149

VALUES (:1, :2, :3, :4)

150

""", [1001, 'John', 'Smith', '2024-01-15'])

151

152

pipeline.add_execute("""

153

INSERT INTO employees (employee_id, first_name, last_name, hire_date)

154

VALUES (:1, :2, :3, :4)

155

""", [1002, 'Jane', 'Doe', '2024-01-16'])

156

157

pipeline.add_execute("""

158

INSERT INTO employees (employee_id, first_name, last_name, hire_date)

159

VALUES (:1, :2, :3, :4)

160

""", [1003, 'Bob', 'Johnson', '2024-01-17'])

161

162

# Add a commit operation

163

pipeline.add_commit()

164

165

# Execute all operations in the pipeline

166

with connection.cursor() as cursor:

167

results = pipeline.execute()

168

169

print(f"Pipeline executed {len(results)} operations")

170

for i, result in enumerate(results):

171

print(f"Operation {i}: {result}")

172

173

connection.close()

174

```

175

176

### Batch Insert with Pipeline

177

178

```python

179

import oracledb

180

181

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

182

183

# Prepare large dataset

184

employee_data = [

185

(2001, 'Alice', 'Johnson', 50000, 10),

186

(2002, 'Bob', 'Smith', 55000, 20),

187

(2003, 'Carol', 'Brown', 60000, 30),

188

(2004, 'David', 'Wilson', 52000, 10),

189

(2005, 'Eve', 'Davis', 58000, 20),

190

# ... potentially thousands more records

191

]

192

193

# Create pipeline for batch operations

194

pipeline = oracledb.create_pipeline()

195

196

# Add executemany operation for efficient batch insert

197

pipeline.add_executemany("""

198

INSERT INTO employees (employee_id, first_name, last_name, salary, department_id)

199

VALUES (:1, :2, :3, :4, :5)

200

""", employee_data)

201

202

# Add commit

203

pipeline.add_commit()

204

205

# Execute pipeline

206

with connection.cursor() as cursor:

207

results = pipeline.execute()

208

209

print(f"Batch insert completed")

210

print(f"Rows affected: {cursor.rowcount}")

211

212

connection.close()

213

```

214

215

### Complex Pipeline with Mixed Operations

216

217

```python

218

import oracledb

219

220

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

221

222

# Create pipeline with mixed operations

223

pipeline = oracledb.create_pipeline()

224

225

# Execute query to prepare data

226

pipeline.add_execute("SELECT COUNT(*) FROM employees WHERE department_id = :1", [10])

227

pipeline.add_fetchone()

228

229

# Call stored function

230

pipeline.add_callfunc("calculate_bonus", oracledb.NUMBER, [50000, 0.15])

231

232

# Call stored procedure

233

pipeline.add_callproc("update_employee_status", ['ACTIVE'])

234

235

# Execute multiple updates

236

department_updates = [

237

("Engineering", 10),

238

("Marketing", 20),

239

("Sales", 30)

240

]

241

242

for dept_name, dept_id in department_updates:

243

pipeline.add_execute("""

244

UPDATE departments SET department_name = :1 WHERE department_id = :2

245

""", [dept_name, dept_id])

246

247

# Commit all changes

248

pipeline.add_commit()

249

250

# Execute pipeline

251

with connection.cursor() as cursor:

252

results = pipeline.execute()

253

254

print("Pipeline results:")

255

for i, result in enumerate(results):

256

print(f" Operation {i}: {result}")

257

258

connection.close()

259

```

260

261

### Pipeline with Fetch Operations

262

263

```python

264

import oracledb

265

266

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

267

268

# Create pipeline combining queries and fetches

269

pipeline = oracledb.create_pipeline()

270

271

# First query

272

pipeline.add_execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = :1", [10])

273

pipeline.add_fetchall()

274

275

# Second query

276

pipeline.add_execute("SELECT department_id, department_name FROM departments WHERE department_id IN (10, 20, 30)")

277

pipeline.add_fetchall()

278

279

# Third query with limited fetch

280

pipeline.add_execute("SELECT * FROM employees ORDER BY hire_date DESC")

281

pipeline.add_fetchmany(5) # Get only first 5 rows

282

283

# Execute all operations

284

with connection.cursor() as cursor:

285

results = pipeline.execute()

286

287

# Process results

288

employees_dept_10 = results[1] # fetchall result from first query

289

departments = results[3] # fetchall result from second query

290

recent_hires = results[5] # fetchmany result from third query

291

292

print("Employees in Department 10:")

293

for emp in employees_dept_10:

294

print(f" {emp[0]}: {emp[1]} {emp[2]}")

295

296

print("\nDepartments:")

297

for dept in departments:

298

print(f" {dept[0]}: {dept[1]}")

299

300

print("\nRecent Hires (Top 5):")

301

for emp in recent_hires:

302

print(f" {emp[1]} {emp[2]} - Hired: {emp[5]}")

303

304

connection.close()

305

```

306

307

### Performance Comparison: Pipeline vs Individual Operations

308

309

```python

310

import oracledb

311

import time

312

313

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

314

315

# Test data

316

test_data = [(i, f'Name{i}', f'Last{i}', 50000 + i) for i in range(1000, 2000)]

317

318

# Method 1: Individual operations

319

start_time = time.time()

320

with connection.cursor() as cursor:

321

for data in test_data:

322

cursor.execute("""

323

INSERT INTO test_employees (id, first_name, last_name, salary)

324

VALUES (:1, :2, :3, :4)

325

""", data)

326

connection.commit()

327

328

individual_time = time.time() - start_time

329

print(f"Individual operations time: {individual_time:.2f} seconds")

330

331

# Method 2: Pipeline operations

332

start_time = time.time()

333

pipeline = oracledb.create_pipeline()

334

335

# Add all inserts to pipeline

336

for data in test_data:

337

pipeline.add_execute("""

338

INSERT INTO test_employees2 (id, first_name, last_name, salary)

339

VALUES (:1, :2, :3, :4)

340

""", data)

341

342

pipeline.add_commit()

343

344

with connection.cursor() as cursor:

345

results = pipeline.execute()

346

347

pipeline_time = time.time() - start_time

348

print(f"Pipeline operations time: {pipeline_time:.2f} seconds")

349

print(f"Performance improvement: {individual_time/pipeline_time:.2f}x faster")

350

351

connection.close()

352

```

353

354

### Error Handling in Pipelines

355

356

```python

357

import oracledb

358

359

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

360

361

# Create pipeline with potential errors

362

pipeline = oracledb.create_pipeline()

363

364

# Valid operations

365

pipeline.add_execute("""

366

INSERT INTO employees (employee_id, first_name, last_name)

367

VALUES (:1, :2, :3)

368

""", [3001, 'Valid', 'Employee'])

369

370

# Invalid operation (duplicate key)

371

pipeline.add_execute("""

372

INSERT INTO employees (employee_id, first_name, last_name)

373

VALUES (:1, :2, :3)

374

""", [3001, 'Duplicate', 'Employee']) # Same employee_id

375

376

# Another valid operation

377

pipeline.add_execute("""

378

INSERT INTO employees (employee_id, first_name, last_name)

379

VALUES (:1, :2, :3)

380

""", [3002, 'Another', 'Employee'])

381

382

try:

383

with connection.cursor() as cursor:

384

results = pipeline.execute()

385

386

print("All operations completed successfully")

387

for i, result in enumerate(results):

388

print(f" Operation {i}: {result}")

389

390

except oracledb.DatabaseError as e:

391

print(f"Pipeline execution failed: {e}")

392

# Handle the error appropriately

393

connection.rollback()

394

395

connection.close()

396

```

397

398

### Advanced Pipeline Usage

399

400

```python

401

import oracledb

402

403

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

404

405

def create_monthly_report_pipeline(month, year):

406

"""Create a pipeline for generating monthly reports."""

407

408

pipeline = oracledb.create_pipeline()

409

410

# Clear previous report data

411

pipeline.add_execute("DELETE FROM monthly_reports WHERE report_month = :1 AND report_year = :2", [month, year])

412

413

# Generate employee summary

414

pipeline.add_execute("""

415

INSERT INTO monthly_reports (report_month, report_year, report_type, data)

416

SELECT :1, :2, 'EMPLOYEE_COUNT', COUNT(*)

417

FROM employees

418

WHERE EXTRACT(MONTH FROM hire_date) = :1

419

AND EXTRACT(YEAR FROM hire_date) = :2

420

""", [month, year, month, year])

421

422

# Generate salary summary

423

pipeline.add_execute("""

424

INSERT INTO monthly_reports (report_month, report_year, report_type, data)

425

SELECT :1, :2, 'TOTAL_SALARY', SUM(salary)

426

FROM employees

427

WHERE EXTRACT(MONTH FROM hire_date) = :1

428

AND EXTRACT(YEAR FROM hire_date) = :2

429

""", [month, year, month, year])

430

431

# Generate department breakdown

432

pipeline.add_executemany("""

433

INSERT INTO monthly_reports (report_month, report_year, report_type, data, department_id)

434

SELECT :1, :2, 'DEPT_COUNT', COUNT(*), department_id

435

FROM employees

436

WHERE EXTRACT(MONTH FROM hire_date) = :3

437

AND EXTRACT(YEAR FROM hire_date) = :4

438

GROUP BY department_id

439

""", [(month, year, month, year)])

440

441

# Commit all changes

442

pipeline.add_commit()

443

444

return pipeline

445

446

# Generate reports for January 2024

447

report_pipeline = create_monthly_report_pipeline(1, 2024)

448

449

with connection.cursor() as cursor:

450

results = report_pipeline.execute()

451

print(f"Monthly report pipeline completed with {len(results)} operations")

452

453

# Verify results

454

with connection.cursor() as cursor:

455

cursor.execute("""

456

SELECT report_type, data, department_id

457

FROM monthly_reports

458

WHERE report_month = 1 AND report_year = 2024

459

ORDER BY report_type

460

""")

461

462

print("Report Results:")

463

for row in cursor:

464

if row[2]: # department_id is not None

465

print(f" {row[0]} (Dept {row[2]}): {row[1]}")

466

else:

467

print(f" {row[0]}: {row[1]}")

468

469

connection.close()

470

```