Python interface to Oracle Database with thin and thick connectivity modes
—
Batch multiple database operations for improved performance using pipelining. Pipeline operations enable grouping SQL executions, fetch operations, and stored procedure calls into batches that are executed together, reducing network round-trips and improving throughput for high-volume operations.
Create and execute batched database operations with support for various operation types.
class Pipeline:
"""Pipeline for batching database operations."""
def add_execute(self, statement, parameters=None) -> None:
"""
Add an execute operation to the pipeline.
Parameters:
- statement (str): SQL statement to execute
- parameters (dict|list|tuple): Bind parameters for the statement
"""
def add_executemany(self, statement, parameters) -> None:
"""
Add an executemany operation to the pipeline.
Parameters:
- statement (str): SQL statement to execute
- parameters (list): List of parameter sets
"""
def add_fetchall(self) -> None:
"""Add a fetchall operation to the pipeline."""
def add_fetchone(self) -> None:
"""Add a fetchone operation to the pipeline."""
def add_fetchmany(self, size=None) -> None:
"""
Add a fetchmany operation to the pipeline.
Parameters:
- size (int): Number of rows to fetch (default: cursor arraysize)
"""
def add_callfunc(self, name, return_type, parameters=None) -> None:
"""
Add a function call operation to the pipeline.
Parameters:
- name (str): Function name
- return_type: Expected return type
- parameters (list): Function parameters
"""
def add_callproc(self, name, parameters=None) -> None:
"""
Add a procedure call operation to the pipeline.
Parameters:
- name (str): Procedure name
- parameters (list): Procedure parameters
"""
def add_commit(self) -> None:
"""Add a commit operation to the pipeline."""
def execute(self) -> list:
"""
Execute all operations in the pipeline.
Returns:
list: List of PipelineOpResult objects, one for each operation
"""Represent individual pipeline operations with metadata about the operation type.
class PipelineOp:
"""Individual pipeline operation."""
# Properties
op_type: int # Operation type (PIPELINE_OP_TYPE_*)Result of a pipeline operation containing the operation outcome and any returned data.
class PipelineOpResult:
"""Result of a pipeline operation."""
# Properties contain operation-specific results
# For execute operations: affected row count
# For fetch operations: fetched rows
# For function calls: return value
# For procedure calls: modified parametersCreate pipeline instances for batching operations.
def create_pipeline() -> Pipeline:
"""
Create a new pipeline for batching operations.
Returns:
Pipeline: New pipeline instance
"""Constants identifying different types of pipeline operations.
# Pipeline Operation Types
PIPELINE_OP_TYPE_EXECUTE: int # Execute SQL statement
PIPELINE_OP_TYPE_EXECUTE_MANY: int # Execute SQL with multiple parameter sets
PIPELINE_OP_TYPE_FETCH_ALL: int # Fetch all rows
PIPELINE_OP_TYPE_FETCH_ONE: int # Fetch single row
PIPELINE_OP_TYPE_FETCH_MANY: int # Fetch multiple rows
PIPELINE_OP_TYPE_CALL_FUNC: int # Call stored function
PIPELINE_OP_TYPE_CALL_PROC: int # Call stored procedure
PIPELINE_OP_TYPE_COMMIT: int # Commit transactionimport oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Create a pipeline
pipeline = oracledb.create_pipeline()
# Add multiple operations to the pipeline
pipeline.add_execute("""
INSERT INTO employees (employee_id, first_name, last_name, hire_date)
VALUES (:1, :2, :3, :4)
""", [1001, 'John', 'Smith', '2024-01-15'])
pipeline.add_execute("""
INSERT INTO employees (employee_id, first_name, last_name, hire_date)
VALUES (:1, :2, :3, :4)
""", [1002, 'Jane', 'Doe', '2024-01-16'])
pipeline.add_execute("""
INSERT INTO employees (employee_id, first_name, last_name, hire_date)
VALUES (:1, :2, :3, :4)
""", [1003, 'Bob', 'Johnson', '2024-01-17'])
# Add a commit operation
pipeline.add_commit()
# Execute all operations in the pipeline
with connection.cursor() as cursor:
results = pipeline.execute()
print(f"Pipeline executed {len(results)} operations")
for i, result in enumerate(results):
print(f"Operation {i}: {result}")
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Prepare large dataset
employee_data = [
(2001, 'Alice', 'Johnson', 50000, 10),
(2002, 'Bob', 'Smith', 55000, 20),
(2003, 'Carol', 'Brown', 60000, 30),
(2004, 'David', 'Wilson', 52000, 10),
(2005, 'Eve', 'Davis', 58000, 20),
# ... potentially thousands more records
]
# Create pipeline for batch operations
pipeline = oracledb.create_pipeline()
# Add executemany operation for efficient batch insert
pipeline.add_executemany("""
INSERT INTO employees (employee_id, first_name, last_name, salary, department_id)
VALUES (:1, :2, :3, :4, :5)
""", employee_data)
# Add commit
pipeline.add_commit()
# Execute pipeline
with connection.cursor() as cursor:
results = pipeline.execute()
print(f"Batch insert completed")
print(f"Rows affected: {cursor.rowcount}")
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Create pipeline with mixed operations
pipeline = oracledb.create_pipeline()
# Execute query to prepare data
pipeline.add_execute("SELECT COUNT(*) FROM employees WHERE department_id = :1", [10])
pipeline.add_fetchone()
# Call stored function
pipeline.add_callfunc("calculate_bonus", oracledb.NUMBER, [50000, 0.15])
# Call stored procedure
pipeline.add_callproc("update_employee_status", ['ACTIVE'])
# Execute multiple updates
department_updates = [
("Engineering", 10),
("Marketing", 20),
("Sales", 30)
]
for dept_name, dept_id in department_updates:
pipeline.add_execute("""
UPDATE departments SET department_name = :1 WHERE department_id = :2
""", [dept_name, dept_id])
# Commit all changes
pipeline.add_commit()
# Execute pipeline
with connection.cursor() as cursor:
results = pipeline.execute()
print("Pipeline results:")
for i, result in enumerate(results):
print(f" Operation {i}: {result}")
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Create pipeline combining queries and fetches
pipeline = oracledb.create_pipeline()
# First query
pipeline.add_execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = :1", [10])
pipeline.add_fetchall()
# Second query
pipeline.add_execute("SELECT department_id, department_name FROM departments WHERE department_id IN (10, 20, 30)")
pipeline.add_fetchall()
# Third query with limited fetch
pipeline.add_execute("SELECT * FROM employees ORDER BY hire_date DESC")
pipeline.add_fetchmany(5) # Get only first 5 rows
# Execute all operations
with connection.cursor() as cursor:
results = pipeline.execute()
# Process results
employees_dept_10 = results[1] # fetchall result from first query
departments = results[3] # fetchall result from second query
recent_hires = results[5] # fetchmany result from third query
print("Employees in Department 10:")
for emp in employees_dept_10:
print(f" {emp[0]}: {emp[1]} {emp[2]}")
print("\nDepartments:")
for dept in departments:
print(f" {dept[0]}: {dept[1]}")
print("\nRecent Hires (Top 5):")
for emp in recent_hires:
print(f" {emp[1]} {emp[2]} - Hired: {emp[5]}")
connection.close()import oracledb
import time
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Test data
test_data = [(i, f'Name{i}', f'Last{i}', 50000 + i) for i in range(1000, 2000)]
# Method 1: Individual operations
start_time = time.time()
with connection.cursor() as cursor:
for data in test_data:
cursor.execute("""
INSERT INTO test_employees (id, first_name, last_name, salary)
VALUES (:1, :2, :3, :4)
""", data)
connection.commit()
individual_time = time.time() - start_time
print(f"Individual operations time: {individual_time:.2f} seconds")
# Method 2: Pipeline operations
start_time = time.time()
pipeline = oracledb.create_pipeline()
# Add all inserts to pipeline
for data in test_data:
pipeline.add_execute("""
INSERT INTO test_employees2 (id, first_name, last_name, salary)
VALUES (:1, :2, :3, :4)
""", data)
pipeline.add_commit()
with connection.cursor() as cursor:
results = pipeline.execute()
pipeline_time = time.time() - start_time
print(f"Pipeline operations time: {pipeline_time:.2f} seconds")
print(f"Performance improvement: {individual_time/pipeline_time:.2f}x faster")
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Create pipeline with potential errors
pipeline = oracledb.create_pipeline()
# Valid operations
pipeline.add_execute("""
INSERT INTO employees (employee_id, first_name, last_name)
VALUES (:1, :2, :3)
""", [3001, 'Valid', 'Employee'])
# Invalid operation (duplicate key)
pipeline.add_execute("""
INSERT INTO employees (employee_id, first_name, last_name)
VALUES (:1, :2, :3)
""", [3001, 'Duplicate', 'Employee']) # Same employee_id
# Another valid operation
pipeline.add_execute("""
INSERT INTO employees (employee_id, first_name, last_name)
VALUES (:1, :2, :3)
""", [3002, 'Another', 'Employee'])
try:
with connection.cursor() as cursor:
results = pipeline.execute()
print("All operations completed successfully")
for i, result in enumerate(results):
print(f" Operation {i}: {result}")
except oracledb.DatabaseError as e:
print(f"Pipeline execution failed: {e}")
# Handle the error appropriately
connection.rollback()
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
def create_monthly_report_pipeline(month, year):
"""Create a pipeline for generating monthly reports."""
pipeline = oracledb.create_pipeline()
# Clear previous report data
pipeline.add_execute("DELETE FROM monthly_reports WHERE report_month = :1 AND report_year = :2", [month, year])
# Generate employee summary
pipeline.add_execute("""
INSERT INTO monthly_reports (report_month, report_year, report_type, data)
SELECT :1, :2, 'EMPLOYEE_COUNT', COUNT(*)
FROM employees
WHERE EXTRACT(MONTH FROM hire_date) = :1
AND EXTRACT(YEAR FROM hire_date) = :2
""", [month, year, month, year])
# Generate salary summary
pipeline.add_execute("""
INSERT INTO monthly_reports (report_month, report_year, report_type, data)
SELECT :1, :2, 'TOTAL_SALARY', SUM(salary)
FROM employees
WHERE EXTRACT(MONTH FROM hire_date) = :1
AND EXTRACT(YEAR FROM hire_date) = :2
""", [month, year, month, year])
# Generate department breakdown
pipeline.add_executemany("""
INSERT INTO monthly_reports (report_month, report_year, report_type, data, department_id)
SELECT :1, :2, 'DEPT_COUNT', COUNT(*), department_id
FROM employees
WHERE EXTRACT(MONTH FROM hire_date) = :3
AND EXTRACT(YEAR FROM hire_date) = :4
GROUP BY department_id
""", [(month, year, month, year)])
# Commit all changes
pipeline.add_commit()
return pipeline
# Generate reports for January 2024
report_pipeline = create_monthly_report_pipeline(1, 2024)
with connection.cursor() as cursor:
results = report_pipeline.execute()
print(f"Monthly report pipeline completed with {len(results)} operations")
# Verify results
with connection.cursor() as cursor:
cursor.execute("""
SELECT report_type, data, department_id
FROM monthly_reports
WHERE report_month = 1 AND report_year = 2024
ORDER BY report_type
""")
print("Report Results:")
for row in cursor:
if row[2]: # department_id is not None
print(f" {row[0]} (Dept {row[2]}): {row[1]}")
else:
print(f" {row[0]}: {row[1]}")
connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-oracledb