CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oracledb

Python interface to Oracle Database with thin and thick connectivity modes

Pending
Overview
Eval results
Files

pipeline.mddocs/

Pipeline Operations

Batch multiple database operations for improved performance using pipelining. Pipeline operations enable grouping SQL executions, fetch operations, and stored procedure calls into batches that are executed together, reducing network round-trips and improving throughput for high-volume operations.

Capabilities

Pipeline Class

Create and execute batched database operations with support for various operation types.

class Pipeline:
    """Pipeline for batching database operations."""
    
    def add_execute(self, statement, parameters=None) -> None:
        """
        Add an execute operation to the pipeline.
        
        Parameters:
        - statement (str): SQL statement to execute
        - parameters (dict|list|tuple): Bind parameters for the statement
        """
    
    def add_executemany(self, statement, parameters) -> None:
        """
        Add an executemany operation to the pipeline.
        
        Parameters:
        - statement (str): SQL statement to execute
        - parameters (list): List of parameter sets
        """
    
    def add_fetchall(self) -> None:
        """Add a fetchall operation to the pipeline."""
    
    def add_fetchone(self) -> None:
        """Add a fetchone operation to the pipeline."""
    
    def add_fetchmany(self, size=None) -> None:
        """
        Add a fetchmany operation to the pipeline.
        
        Parameters:
        - size (int): Number of rows to fetch (default: cursor arraysize)
        """
    
    def add_callfunc(self, name, return_type, parameters=None) -> None:
        """
        Add a function call operation to the pipeline.
        
        Parameters:
        - name (str): Function name
        - return_type: Expected return type
        - parameters (list): Function parameters
        """
    
    def add_callproc(self, name, parameters=None) -> None:
        """
        Add a procedure call operation to the pipeline.
        
        Parameters:
        - name (str): Procedure name
        - parameters (list): Procedure parameters
        """
    
    def add_commit(self) -> None:
        """Add a commit operation to the pipeline."""
    
    def execute(self) -> list:
        """
        Execute all operations in the pipeline.
        
        Returns:
        list: List of PipelineOpResult objects, one for each operation
        """

PipelineOp Class

Represent individual pipeline operations with metadata about the operation type.

class PipelineOp:
    """Individual pipeline operation."""
    
    # Properties
    op_type: int  # Operation type (PIPELINE_OP_TYPE_*)

PipelineOpResult Class

Result of a pipeline operation containing the operation outcome and any returned data.

class PipelineOpResult:
    """Result of a pipeline operation."""
    
    # Properties contain operation-specific results
    # For execute operations: affected row count
    # For fetch operations: fetched rows
    # For function calls: return value
    # For procedure calls: modified parameters

Pipeline Creation

Create pipeline instances for batching operations.

def create_pipeline() -> Pipeline:
    """
    Create a new pipeline for batching operations.
    
    Returns:
    Pipeline: New pipeline instance
    """

Pipeline Operation Type Constants

Constants identifying different types of pipeline operations.

# Pipeline Operation Types
PIPELINE_OP_TYPE_EXECUTE: int       # Execute SQL statement
PIPELINE_OP_TYPE_EXECUTE_MANY: int  # Execute SQL with multiple parameter sets
PIPELINE_OP_TYPE_FETCH_ALL: int     # Fetch all rows
PIPELINE_OP_TYPE_FETCH_ONE: int     # Fetch single row
PIPELINE_OP_TYPE_FETCH_MANY: int    # Fetch multiple rows
PIPELINE_OP_TYPE_CALL_FUNC: int     # Call stored function
PIPELINE_OP_TYPE_CALL_PROC: int     # Call stored procedure
PIPELINE_OP_TYPE_COMMIT: int        # Commit transaction

Usage Examples

Basic Pipeline Operations

import oracledb

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Create a pipeline
pipeline = oracledb.create_pipeline()

# Add multiple operations to the pipeline
pipeline.add_execute("""
    INSERT INTO employees (employee_id, first_name, last_name, hire_date)
    VALUES (:1, :2, :3, :4)
""", [1001, 'John', 'Smith', '2024-01-15'])

pipeline.add_execute("""
    INSERT INTO employees (employee_id, first_name, last_name, hire_date)
    VALUES (:1, :2, :3, :4)
""", [1002, 'Jane', 'Doe', '2024-01-16'])

pipeline.add_execute("""
    INSERT INTO employees (employee_id, first_name, last_name, hire_date)
    VALUES (:1, :2, :3, :4)
""", [1003, 'Bob', 'Johnson', '2024-01-17'])

# Add a commit operation
pipeline.add_commit()

# Execute all operations in the pipeline
with connection.cursor() as cursor:
    results = pipeline.execute()
    
    print(f"Pipeline executed {len(results)} operations")
    for i, result in enumerate(results):
        print(f"Operation {i}: {result}")

connection.close()

Batch Insert with Pipeline

import oracledb

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Prepare large dataset
employee_data = [
    (2001, 'Alice', 'Johnson', 50000, 10),
    (2002, 'Bob', 'Smith', 55000, 20),
    (2003, 'Carol', 'Brown', 60000, 30),
    (2004, 'David', 'Wilson', 52000, 10),
    (2005, 'Eve', 'Davis', 58000, 20),
    # ... potentially thousands more records
]

# Create pipeline for batch operations
pipeline = oracledb.create_pipeline()

# Add executemany operation for efficient batch insert
pipeline.add_executemany("""
    INSERT INTO employees (employee_id, first_name, last_name, salary, department_id)
    VALUES (:1, :2, :3, :4, :5)
""", employee_data)

# Add commit
pipeline.add_commit()

# Execute pipeline
with connection.cursor() as cursor:
    results = pipeline.execute()
    
    print(f"Batch insert completed")
    print(f"Rows affected: {cursor.rowcount}")

connection.close()

Complex Pipeline with Mixed Operations

import oracledb

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Create pipeline with mixed operations
pipeline = oracledb.create_pipeline()

# Execute query to prepare data
pipeline.add_execute("SELECT COUNT(*) FROM employees WHERE department_id = :1", [10])
pipeline.add_fetchone()

# Call stored function
pipeline.add_callfunc("calculate_bonus", oracledb.NUMBER, [50000, 0.15])

# Call stored procedure  
pipeline.add_callproc("update_employee_status", ['ACTIVE'])

# Execute multiple updates
department_updates = [
    ("Engineering", 10),
    ("Marketing", 20), 
    ("Sales", 30)
]

for dept_name, dept_id in department_updates:
    pipeline.add_execute("""
        UPDATE departments SET department_name = :1 WHERE department_id = :2
    """, [dept_name, dept_id])

# Commit all changes
pipeline.add_commit()

# Execute pipeline
with connection.cursor() as cursor:
    results = pipeline.execute()
    
    print("Pipeline results:")
    for i, result in enumerate(results):
        print(f"  Operation {i}: {result}")

connection.close()

Pipeline with Fetch Operations

import oracledb

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Create pipeline combining queries and fetches
pipeline = oracledb.create_pipeline()

# First query
pipeline.add_execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = :1", [10])
pipeline.add_fetchall()

# Second query
pipeline.add_execute("SELECT department_id, department_name FROM departments WHERE department_id IN (10, 20, 30)")
pipeline.add_fetchall()

# Third query with limited fetch
pipeline.add_execute("SELECT * FROM employees ORDER BY hire_date DESC")
pipeline.add_fetchmany(5)  # Get only first 5 rows

# Execute all operations
with connection.cursor() as cursor:
    results = pipeline.execute()
    
    # Process results
    employees_dept_10 = results[1]  # fetchall result from first query
    departments = results[3]        # fetchall result from second query  
    recent_hires = results[5]       # fetchmany result from third query
    
    print("Employees in Department 10:")
    for emp in employees_dept_10:
        print(f"  {emp[0]}: {emp[1]} {emp[2]}")
    
    print("\nDepartments:")
    for dept in departments:
        print(f"  {dept[0]}: {dept[1]}")
    
    print("\nRecent Hires (Top 5):")
    for emp in recent_hires:
        print(f"  {emp[1]} {emp[2]} - Hired: {emp[5]}")

connection.close()

Performance Comparison: Pipeline vs Individual Operations

import oracledb
import time

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Test data
test_data = [(i, f'Name{i}', f'Last{i}', 50000 + i) for i in range(1000, 2000)]

# Method 1: Individual operations
start_time = time.time()
with connection.cursor() as cursor:
    for data in test_data:
        cursor.execute("""
            INSERT INTO test_employees (id, first_name, last_name, salary)
            VALUES (:1, :2, :3, :4)
        """, data)
    connection.commit()

individual_time = time.time() - start_time
print(f"Individual operations time: {individual_time:.2f} seconds")

# Method 2: Pipeline operations
start_time = time.time()
pipeline = oracledb.create_pipeline()

# Add all inserts to pipeline
for data in test_data:
    pipeline.add_execute("""
        INSERT INTO test_employees2 (id, first_name, last_name, salary)
        VALUES (:1, :2, :3, :4)
    """, data)

pipeline.add_commit()

with connection.cursor() as cursor:
    results = pipeline.execute()

pipeline_time = time.time() - start_time
print(f"Pipeline operations time: {pipeline_time:.2f} seconds")
print(f"Performance improvement: {individual_time/pipeline_time:.2f}x faster")

connection.close()

Error Handling in Pipelines

import oracledb

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Create pipeline with potential errors
pipeline = oracledb.create_pipeline()

# Valid operations
pipeline.add_execute("""
    INSERT INTO employees (employee_id, first_name, last_name)
    VALUES (:1, :2, :3)
""", [3001, 'Valid', 'Employee'])

# Invalid operation (duplicate key)
pipeline.add_execute("""
    INSERT INTO employees (employee_id, first_name, last_name)
    VALUES (:1, :2, :3)
""", [3001, 'Duplicate', 'Employee'])  # Same employee_id

# Another valid operation
pipeline.add_execute("""
    INSERT INTO employees (employee_id, first_name, last_name)
    VALUES (:1, :2, :3)
""", [3002, 'Another', 'Employee'])

try:
    with connection.cursor() as cursor:
        results = pipeline.execute()
        
        print("All operations completed successfully")
        for i, result in enumerate(results):
            print(f"  Operation {i}: {result}")
            
except oracledb.DatabaseError as e:
    print(f"Pipeline execution failed: {e}")
    # Handle the error appropriately
    connection.rollback()

connection.close()

Advanced Pipeline Usage

import oracledb

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

def create_monthly_report_pipeline(month, year):
    """Create a pipeline for generating monthly reports."""
    
    pipeline = oracledb.create_pipeline()
    
    # Clear previous report data
    pipeline.add_execute("DELETE FROM monthly_reports WHERE report_month = :1 AND report_year = :2", [month, year])
    
    # Generate employee summary
    pipeline.add_execute("""
        INSERT INTO monthly_reports (report_month, report_year, report_type, data)
        SELECT :1, :2, 'EMPLOYEE_COUNT', COUNT(*)
        FROM employees
        WHERE EXTRACT(MONTH FROM hire_date) = :1 
        AND EXTRACT(YEAR FROM hire_date) = :2
    """, [month, year, month, year])
    
    # Generate salary summary
    pipeline.add_execute("""
        INSERT INTO monthly_reports (report_month, report_year, report_type, data)
        SELECT :1, :2, 'TOTAL_SALARY', SUM(salary)
        FROM employees
        WHERE EXTRACT(MONTH FROM hire_date) = :1 
        AND EXTRACT(YEAR FROM hire_date) = :2
    """, [month, year, month, year])
    
    # Generate department breakdown
    pipeline.add_executemany("""
        INSERT INTO monthly_reports (report_month, report_year, report_type, data, department_id)
        SELECT :1, :2, 'DEPT_COUNT', COUNT(*), department_id
        FROM employees
        WHERE EXTRACT(MONTH FROM hire_date) = :3 
        AND EXTRACT(YEAR FROM hire_date) = :4
        GROUP BY department_id
    """, [(month, year, month, year)])
    
    # Commit all changes
    pipeline.add_commit()
    
    return pipeline

# Generate reports for January 2024
report_pipeline = create_monthly_report_pipeline(1, 2024)

with connection.cursor() as cursor:
    results = report_pipeline.execute()
    print(f"Monthly report pipeline completed with {len(results)} operations")

# Verify results
with connection.cursor() as cursor:
    cursor.execute("""
        SELECT report_type, data, department_id
        FROM monthly_reports
        WHERE report_month = 1 AND report_year = 2024
        ORDER BY report_type
    """)
    
    print("Report Results:")
    for row in cursor:
        if row[2]:  # department_id is not None
            print(f"  {row[0]} (Dept {row[2]}): {row[1]}")
        else:
            print(f"  {row[0]}: {row[1]}")

connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-oracledb

docs

advanced-queuing.md

connection-pooling.md

connectivity.md

data-types.md

database-objects.md

index.md

lobs.md

pipeline.md

soda.md

sql-execution.md

subscriptions.md

tile.json