0
# Pipeline Operations
1
2
Batch multiple database operations for improved performance using pipelining. Pipeline operations enable grouping SQL executions, fetch operations, and stored procedure calls into batches that are executed together, reducing network round-trips and improving throughput for high-volume operations.
3
4
## Capabilities
5
6
### Pipeline Class
7
8
Create and execute batched database operations with support for various operation types.
9
10
```python { .api }
11
class Pipeline:
12
"""Pipeline for batching database operations."""
13
14
def add_execute(self, statement, parameters=None) -> None:
15
"""
16
Add an execute operation to the pipeline.
17
18
Parameters:
19
- statement (str): SQL statement to execute
20
- parameters (dict|list|tuple): Bind parameters for the statement
21
"""
22
23
def add_executemany(self, statement, parameters) -> None:
24
"""
25
Add an executemany operation to the pipeline.
26
27
Parameters:
28
- statement (str): SQL statement to execute
29
- parameters (list): List of parameter sets
30
"""
31
32
def add_fetchall(self) -> None:
33
"""Add a fetchall operation to the pipeline."""
34
35
def add_fetchone(self) -> None:
36
"""Add a fetchone operation to the pipeline."""
37
38
def add_fetchmany(self, size=None) -> None:
39
"""
40
Add a fetchmany operation to the pipeline.
41
42
Parameters:
43
- size (int): Number of rows to fetch (default: cursor arraysize)
44
"""
45
46
def add_callfunc(self, name, return_type, parameters=None) -> None:
47
"""
48
Add a function call operation to the pipeline.
49
50
Parameters:
51
- name (str): Function name
52
- return_type: Expected return type
53
- parameters (list): Function parameters
54
"""
55
56
def add_callproc(self, name, parameters=None) -> None:
57
"""
58
Add a procedure call operation to the pipeline.
59
60
Parameters:
61
- name (str): Procedure name
62
- parameters (list): Procedure parameters
63
"""
64
65
def add_commit(self) -> None:
66
"""Add a commit operation to the pipeline."""
67
68
def execute(self) -> list:
69
"""
70
Execute all operations in the pipeline.
71
72
Returns:
73
list: List of PipelineOpResult objects, one for each operation
74
"""
75
```
76
77
### PipelineOp Class
78
79
Represent individual pipeline operations with metadata about the operation type.
80
81
```python { .api }
82
class PipelineOp:
83
"""Individual pipeline operation."""
84
85
# Properties
86
op_type: int # Operation type (PIPELINE_OP_TYPE_*)
87
```
88
89
### PipelineOpResult Class
90
91
Result of a pipeline operation containing the operation outcome and any returned data.
92
93
```python { .api }
94
class PipelineOpResult:
95
"""Result of a pipeline operation."""
96
97
# Properties contain operation-specific results
98
# For execute operations: affected row count
99
# For fetch operations: fetched rows
100
# For function calls: return value
101
# For procedure calls: modified parameters
102
```
103
104
### Pipeline Creation
105
106
Create pipeline instances for batching operations.
107
108
```python { .api }
109
def create_pipeline() -> Pipeline:
110
"""
111
Create a new pipeline for batching operations.
112
113
Returns:
114
Pipeline: New pipeline instance
115
"""
116
```
117
118
### Pipeline Operation Type Constants
119
120
Constants identifying different types of pipeline operations.
121
122
```python { .api }
123
# Pipeline Operation Types
124
PIPELINE_OP_TYPE_EXECUTE: int # Execute SQL statement
125
PIPELINE_OP_TYPE_EXECUTE_MANY: int # Execute SQL with multiple parameter sets
126
PIPELINE_OP_TYPE_FETCH_ALL: int # Fetch all rows
127
PIPELINE_OP_TYPE_FETCH_ONE: int # Fetch single row
128
PIPELINE_OP_TYPE_FETCH_MANY: int # Fetch multiple rows
129
PIPELINE_OP_TYPE_CALL_FUNC: int # Call stored function
130
PIPELINE_OP_TYPE_CALL_PROC: int # Call stored procedure
131
PIPELINE_OP_TYPE_COMMIT: int # Commit transaction
132
```
133
134
## Usage Examples
135
136
### Basic Pipeline Operations
137
138
```python
139
import oracledb
140
141
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
142
143
# Create a pipeline
144
pipeline = oracledb.create_pipeline()
145
146
# Add multiple operations to the pipeline
147
pipeline.add_execute("""
148
INSERT INTO employees (employee_id, first_name, last_name, hire_date)
149
VALUES (:1, :2, :3, :4)
150
""", [1001, 'John', 'Smith', '2024-01-15'])
151
152
pipeline.add_execute("""
153
INSERT INTO employees (employee_id, first_name, last_name, hire_date)
154
VALUES (:1, :2, :3, :4)
155
""", [1002, 'Jane', 'Doe', '2024-01-16'])
156
157
pipeline.add_execute("""
158
INSERT INTO employees (employee_id, first_name, last_name, hire_date)
159
VALUES (:1, :2, :3, :4)
160
""", [1003, 'Bob', 'Johnson', '2024-01-17'])
161
162
# Add a commit operation
163
pipeline.add_commit()
164
165
# Execute all operations in the pipeline
166
with connection.cursor() as cursor:
167
results = pipeline.execute()
168
169
print(f"Pipeline executed {len(results)} operations")
170
for i, result in enumerate(results):
171
print(f"Operation {i}: {result}")
172
173
connection.close()
174
```
175
176
### Batch Insert with Pipeline
177
178
```python
179
import oracledb
180
181
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
182
183
# Prepare large dataset
184
employee_data = [
185
(2001, 'Alice', 'Johnson', 50000, 10),
186
(2002, 'Bob', 'Smith', 55000, 20),
187
(2003, 'Carol', 'Brown', 60000, 30),
188
(2004, 'David', 'Wilson', 52000, 10),
189
(2005, 'Eve', 'Davis', 58000, 20),
190
# ... potentially thousands more records
191
]
192
193
# Create pipeline for batch operations
194
pipeline = oracledb.create_pipeline()
195
196
# Add executemany operation for efficient batch insert
197
pipeline.add_executemany("""
198
INSERT INTO employees (employee_id, first_name, last_name, salary, department_id)
199
VALUES (:1, :2, :3, :4, :5)
200
""", employee_data)
201
202
# Add commit
203
pipeline.add_commit()
204
205
# Execute pipeline
206
with connection.cursor() as cursor:
207
results = pipeline.execute()
208
209
print(f"Batch insert completed")
210
print(f"Rows affected: {cursor.rowcount}")
211
212
connection.close()
213
```
214
215
### Complex Pipeline with Mixed Operations
216
217
```python
218
import oracledb
219
220
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
221
222
# Create pipeline with mixed operations
223
pipeline = oracledb.create_pipeline()
224
225
# Execute query to prepare data
226
pipeline.add_execute("SELECT COUNT(*) FROM employees WHERE department_id = :1", [10])
227
pipeline.add_fetchone()
228
229
# Call stored function
230
pipeline.add_callfunc("calculate_bonus", oracledb.NUMBER, [50000, 0.15])
231
232
# Call stored procedure
233
pipeline.add_callproc("update_employee_status", ['ACTIVE'])
234
235
# Execute multiple updates
236
department_updates = [
237
("Engineering", 10),
238
("Marketing", 20),
239
("Sales", 30)
240
]
241
242
for dept_name, dept_id in department_updates:
243
pipeline.add_execute("""
244
UPDATE departments SET department_name = :1 WHERE department_id = :2
245
""", [dept_name, dept_id])
246
247
# Commit all changes
248
pipeline.add_commit()
249
250
# Execute pipeline
251
with connection.cursor() as cursor:
252
results = pipeline.execute()
253
254
print("Pipeline results:")
255
for i, result in enumerate(results):
256
print(f" Operation {i}: {result}")
257
258
connection.close()
259
```
260
261
### Pipeline with Fetch Operations
262
263
```python
264
import oracledb
265
266
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
267
268
# Create pipeline combining queries and fetches
269
pipeline = oracledb.create_pipeline()
270
271
# First query
272
pipeline.add_execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = :1", [10])
273
pipeline.add_fetchall()
274
275
# Second query
276
pipeline.add_execute("SELECT department_id, department_name FROM departments WHERE department_id IN (10, 20, 30)")
277
pipeline.add_fetchall()
278
279
# Third query with limited fetch
280
pipeline.add_execute("SELECT * FROM employees ORDER BY hire_date DESC")
281
pipeline.add_fetchmany(5) # Get only first 5 rows
282
283
# Execute all operations
284
with connection.cursor() as cursor:
285
results = pipeline.execute()
286
287
# Process results
288
employees_dept_10 = results[1] # fetchall result from first query
289
departments = results[3] # fetchall result from second query
290
recent_hires = results[5] # fetchmany result from third query
291
292
print("Employees in Department 10:")
293
for emp in employees_dept_10:
294
print(f" {emp[0]}: {emp[1]} {emp[2]}")
295
296
print("\nDepartments:")
297
for dept in departments:
298
print(f" {dept[0]}: {dept[1]}")
299
300
print("\nRecent Hires (Top 5):")
301
for emp in recent_hires:
302
print(f" {emp[1]} {emp[2]} - Hired: {emp[5]}")
303
304
connection.close()
305
```
306
307
### Performance Comparison: Pipeline vs Individual Operations
308
309
```python
310
import oracledb
311
import time
312
313
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
314
315
# Test data
316
test_data = [(i, f'Name{i}', f'Last{i}', 50000 + i) for i in range(1000, 2000)]
317
318
# Method 1: Individual operations
319
start_time = time.time()
320
with connection.cursor() as cursor:
321
for data in test_data:
322
cursor.execute("""
323
INSERT INTO test_employees (id, first_name, last_name, salary)
324
VALUES (:1, :2, :3, :4)
325
""", data)
326
connection.commit()
327
328
individual_time = time.time() - start_time
329
print(f"Individual operations time: {individual_time:.2f} seconds")
330
331
# Method 2: Pipeline operations
332
start_time = time.time()
333
pipeline = oracledb.create_pipeline()
334
335
# Add all inserts to pipeline
336
for data in test_data:
337
pipeline.add_execute("""
338
INSERT INTO test_employees2 (id, first_name, last_name, salary)
339
VALUES (:1, :2, :3, :4)
340
""", data)
341
342
pipeline.add_commit()
343
344
with connection.cursor() as cursor:
345
results = pipeline.execute()
346
347
pipeline_time = time.time() - start_time
348
print(f"Pipeline operations time: {pipeline_time:.2f} seconds")
349
print(f"Performance improvement: {individual_time/pipeline_time:.2f}x faster")
350
351
connection.close()
352
```
353
354
### Error Handling in Pipelines
355
356
```python
357
import oracledb
358
359
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
360
361
# Create pipeline with potential errors
362
pipeline = oracledb.create_pipeline()
363
364
# Valid operations
365
pipeline.add_execute("""
366
INSERT INTO employees (employee_id, first_name, last_name)
367
VALUES (:1, :2, :3)
368
""", [3001, 'Valid', 'Employee'])
369
370
# Invalid operation (duplicate key)
371
pipeline.add_execute("""
372
INSERT INTO employees (employee_id, first_name, last_name)
373
VALUES (:1, :2, :3)
374
""", [3001, 'Duplicate', 'Employee']) # Same employee_id
375
376
# Another valid operation
377
pipeline.add_execute("""
378
INSERT INTO employees (employee_id, first_name, last_name)
379
VALUES (:1, :2, :3)
380
""", [3002, 'Another', 'Employee'])
381
382
try:
383
with connection.cursor() as cursor:
384
results = pipeline.execute()
385
386
print("All operations completed successfully")
387
for i, result in enumerate(results):
388
print(f" Operation {i}: {result}")
389
390
except oracledb.DatabaseError as e:
391
print(f"Pipeline execution failed: {e}")
392
# Handle the error appropriately
393
connection.rollback()
394
395
connection.close()
396
```
397
398
### Advanced Pipeline Usage
399
400
```python
401
import oracledb
402
403
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
404
405
def create_monthly_report_pipeline(month, year):
406
"""Create a pipeline for generating monthly reports."""
407
408
pipeline = oracledb.create_pipeline()
409
410
# Clear previous report data
411
pipeline.add_execute("DELETE FROM monthly_reports WHERE report_month = :1 AND report_year = :2", [month, year])
412
413
# Generate employee summary
414
pipeline.add_execute("""
415
INSERT INTO monthly_reports (report_month, report_year, report_type, data)
416
SELECT :1, :2, 'EMPLOYEE_COUNT', COUNT(*)
417
FROM employees
418
WHERE EXTRACT(MONTH FROM hire_date) = :1
419
AND EXTRACT(YEAR FROM hire_date) = :2
420
""", [month, year, month, year])
421
422
# Generate salary summary
423
pipeline.add_execute("""
424
INSERT INTO monthly_reports (report_month, report_year, report_type, data)
425
SELECT :1, :2, 'TOTAL_SALARY', SUM(salary)
426
FROM employees
427
WHERE EXTRACT(MONTH FROM hire_date) = :1
428
AND EXTRACT(YEAR FROM hire_date) = :2
429
""", [month, year, month, year])
430
431
# Generate department breakdown
432
pipeline.add_executemany("""
433
INSERT INTO monthly_reports (report_month, report_year, report_type, data, department_id)
434
SELECT :1, :2, 'DEPT_COUNT', COUNT(*), department_id
435
FROM employees
436
WHERE EXTRACT(MONTH FROM hire_date) = :3
437
AND EXTRACT(YEAR FROM hire_date) = :4
438
GROUP BY department_id
439
""", [(month, year, month, year)])
440
441
# Commit all changes
442
pipeline.add_commit()
443
444
return pipeline
445
446
# Generate reports for January 2024
447
report_pipeline = create_monthly_report_pipeline(1, 2024)
448
449
with connection.cursor() as cursor:
450
results = report_pipeline.execute()
451
print(f"Monthly report pipeline completed with {len(results)} operations")
452
453
# Verify results
454
with connection.cursor() as cursor:
455
cursor.execute("""
456
SELECT report_type, data, department_id
457
FROM monthly_reports
458
WHERE report_month = 1 AND report_year = 2024
459
ORDER BY report_type
460
""")
461
462
print("Report Results:")
463
for row in cursor:
464
if row[2]: # department_id is not None
465
print(f" {row[0]} (Dept {row[2]}): {row[1]}")
466
else:
467
print(f" {row[0]}: {row[1]}")
468
469
connection.close()
470
```