CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oracledb

Python interface to Oracle Database with thin and thick connectivity modes

Pending
Overview
Eval results
Files

connection-pooling.mddocs/

Connection Pooling

Manage pools of database connections for scalable applications. Connection pooling improves performance by reusing connections and provides better resource management for multi-user applications. Supports both synchronous and asynchronous pool operations with extensive configuration options.

Capabilities

Creating Connection Pools

Create and configure connection pools with various settings for optimal performance and resource utilization.

def create_pool(
    user=None,
    password=None,
    dsn=None,
    min=1,
    max=2,
    increment=1,
    connectiontype=Connection,
    getmode=POOL_GETMODE_WAIT,
    homogeneous=True,
    timeout=0,
    wait_timeout=0,
    max_lifetime_session=0,
    session_callback=None,
    max_sessions_per_shard=0,
    soda_metadata_cache=False,
    ping_interval=60,
    **kwargs
) -> ConnectionPool:
    """
    Create a connection pool.
    
    Parameters:
    - user (str): Username for authentication
    - password (str): Password for authentication
    - dsn (str): Data source name
    - min (int): Minimum number of connections in pool
    - max (int): Maximum number of connections in pool
    - increment (int): Number of connections to create when pool needs to grow
    - connectiontype: Connection class to use (Connection or AsyncConnection)
    - getmode (int): Pool get mode (POOL_GETMODE_WAIT, POOL_GETMODE_NOWAIT, etc.)
    - homogeneous (bool): Whether all connections use same credentials
    - timeout (int): Connection timeout in seconds
    - wait_timeout (int): Time to wait for available connection
    - max_lifetime_session (int): Maximum session lifetime in seconds
    - session_callback: Callback function for new sessions
    - ping_interval (int): Interval for connection health checks
    
    Returns:
    ConnectionPool object
    """

def create_pool_async(
    user=None,
    password=None,
    dsn=None,
    min=1,
    max=2,
    increment=1,
    **kwargs
) -> AsyncConnectionPool:
    """
    Create an asynchronous connection pool.
    
    Parameters: Same as create_pool()
    
    Returns:
    AsyncConnectionPool object
    """

def get_pool(name=None) -> ConnectionPool:
    """
    Get a named connection pool.
    
    Parameters:
    - name (str): Pool name (None for default pool)
    
    Returns:
    ConnectionPool object
    """

ConnectionPool Class

Manage a pool of database connections with automatic scaling and resource management.

class ConnectionPool:
    """Synchronous connection pool."""
    
    # Properties
    min: int
    max: int
    increment: int
    opened: int
    busy: int
    timeout: int
    getmode: int
    homogeneous: bool
    name: str
    dsn: str
    username: str
    wait_timeout: int
    max_lifetime_session: int
    max_sessions_per_shard: int
    soda_metadata_cache: bool
    ping_interval: int
    
    def acquire(
        self,
        user=None,
        password=None,
        cclass=None,
        purity=PURITY_DEFAULT,
        tag=None,
        matchanytag=False,
        shardingkey=None,
        supershardingkey=None
    ) -> Connection:
        """
        Acquire a connection from the pool.
        
        Parameters:
        - user (str): Username (for heterogeneous pools)
        - password (str): Password (for heterogeneous pools)
        - cclass (str): Connection class for session pooling
        - purity (int): Session purity level
        - tag (str): Session tag
        - matchanytag (bool): Match any tagged session
        - shardingkey (list): Sharding key for database sharding
        - supershardingkey (list): Super sharding key
        
        Returns:
        Connection object from pool
        """
    
    def release(self, connection, tag=None) -> None:
        """
        Release a connection back to the pool.
        
        Parameters:
        - connection: Connection object to release
        - tag (str): Tag to associate with released session
        """
    
    def close(self, force=False) -> None:
        """
        Close the connection pool and all connections.
        
        Parameters:
        - force (bool): Force close even with active connections
        """
    
    def drop(self, connection) -> None:
        """
        Drop a connection from the pool permanently.
        
        Parameters:
        - connection: Connection object to drop
        """
    
    def reconfigure(
        self,
        min=None,
        max=None,
        increment=None,
        timeout=None,
        getmode=None,
        wait_timeout=None,
        max_lifetime_session=None,
        ping_interval=None,
        **kwargs
    ) -> None:
        """
        Reconfigure pool parameters.
        
        Parameters:
        - min (int): New minimum connections
        - max (int): New maximum connections  
        - increment (int): New increment value
        - timeout (int): New connection timeout
        - getmode (int): New get mode
        - wait_timeout (int): New wait timeout
        - max_lifetime_session (int): New max session lifetime
        - ping_interval (int): New ping interval
        """

AsyncConnectionPool Class

Asynchronous version of ConnectionPool with async/await support.

class AsyncConnectionPool:
    """Asynchronous connection pool."""
    
    # Properties (same as ConnectionPool)
    min: int
    max: int
    increment: int
    opened: int
    busy: int
    timeout: int
    getmode: int
    homogeneous: bool
    name: str
    dsn: str
    username: str
    wait_timeout: int
    max_lifetime_session: int
    
    async def acquire(
        self,
        user=None,
        password=None,
        cclass=None,
        purity=PURITY_DEFAULT,
        tag=None,
        matchanytag=False,
        shardingkey=None,
        supershardingkey=None
    ) -> AsyncConnection:
        """
        Acquire an async connection from the pool.
        
        Parameters: Same as ConnectionPool.acquire()
        
        Returns:
        AsyncConnection object from pool
        """
    
    async def release(self, connection, tag=None) -> None:
        """
        Release an async connection back to the pool.
        
        Parameters:
        - connection: AsyncConnection object to release
        - tag (str): Tag to associate with released session
        """
    
    async def close(self, force=False) -> None:
        """
        Close the async connection pool and all connections.
        
        Parameters:
        - force (bool): Force close even with active connections
        """

Pool Parameters

Configure pool parameters using PoolParams class.

class PoolParams:
    """Pool parameter configuration."""
    
    user: str
    password: str
    dsn: str
    min: int
    max: int
    increment: int
    connectiontype: type
    getmode: int
    homogeneous: bool
    timeout: int
    wait_timeout: int
    max_lifetime_session: int
    session_callback: callable
    max_sessions_per_shard: int
    soda_metadata_cache: bool
    ping_interval: int
    stmtcachesize: int
    edition: str
    events: bool
    externalauth: bool
    mode: int
    threaded: bool
    appcontext: list
    encoding: str
    nencoding: str
    tag: str
    matchanytag: bool
    config_dir: str
    appname: str
    disable_oob: bool

Pool Get Modes

Constants for controlling connection acquisition behavior.

# Pool Get Mode Constants
POOL_GETMODE_WAIT: int        # Wait for available connection
POOL_GETMODE_NOWAIT: int      # Return immediately if no connection available
POOL_GETMODE_FORCEGET: int    # Create new connection beyond max limit
POOL_GETMODE_TIMEDWAIT: int   # Wait with timeout

Usage Examples

Basic Connection Pool

import oracledb

# Create a basic connection pool
pool = oracledb.create_pool(
    user="hr",
    password="password", 
    dsn="localhost/xepdb1",
    min=2,
    max=10,
    increment=2
)

print(f"Pool created with {pool.opened} connections")

# Acquire connection from pool
connection = pool.acquire()
print(f"Pool now has {pool.busy} busy connections")

with connection.cursor() as cursor:
    cursor.execute("SELECT COUNT(*) FROM employees")
    count = cursor.fetchone()[0]
    print(f"Employee count: {count}")

# Release connection back to pool
pool.release(connection)
print(f"Pool now has {pool.busy} busy connections")

# Close the pool
pool.close()

Advanced Pool Configuration

import oracledb

def session_callback(connection, requested_tag):
    """Callback function called when session is returned from pool."""
    print(f"Session callback called with tag: {requested_tag}")
    
    # Set session state based on tag
    if requested_tag == "REPORTING":
        with connection.cursor() as cursor:
            cursor.execute("ALTER SESSION SET optimizer_mode = FIRST_ROWS")
    elif requested_tag == "BATCH":
        with connection.cursor() as cursor:
            cursor.execute("ALTER SESSION SET optimizer_mode = ALL_ROWS")

# Create pool with advanced configuration
pool = oracledb.create_pool(
    user="hr",
    password="password",
    dsn="localhost/xepdb1",
    min=5,
    max=20,
    increment=3,
    getmode=oracledb.POOL_GETMODE_WAIT,
    timeout=300,  # 5 minutes
    wait_timeout=30,  # 30 seconds wait for connection
    max_lifetime_session=3600,  # 1 hour max session life
    session_callback=session_callback,
    ping_interval=60,  # Ping every 60 seconds
    homogeneous=True
)

# Acquire connection with specific session requirements
connection = pool.acquire(
    cclass="REPORTING",
    purity=oracledb.PURITY_SELF,
    tag="REPORTING"
)

# Use connection for reporting queries
with connection.cursor() as cursor:
    cursor.execute("SELECT department_name, COUNT(*) FROM departments d JOIN employees e ON d.department_id = e.department_id GROUP BY department_name")
    for row in cursor:
        print(f"Department {row[0]}: {row[1]} employees")

# Release with tag for future reuse
pool.release(connection, tag="REPORTING")

pool.close()

Heterogeneous Pool

import oracledb

# Create heterogeneous pool (different users can connect)
pool = oracledb.create_pool(
    dsn="localhost/xepdb1",
    min=3,
    max=15,
    increment=2,
    homogeneous=False  # Allow different users
)

# Different users can acquire connections
hr_connection = pool.acquire(user="hr", password="hr_password")
sales_connection = pool.acquire(user="sales", password="sales_password")

# Use connections with different privileges
with hr_connection.cursor() as cursor:
    cursor.execute("SELECT COUNT(*) FROM employees")
    hr_count = cursor.fetchone()[0]
    print(f"HR can see {hr_count} employees")

with sales_connection.cursor() as cursor:
    cursor.execute("SELECT COUNT(*) FROM customers")  
    sales_count = cursor.fetchone()[0]
    print(f"Sales can see {sales_count} customers")

# Release connections
pool.release(hr_connection)
pool.release(sales_connection)

pool.close()

Async Connection Pool

import asyncio
import oracledb

async def main():
    # Create async connection pool
    pool = await oracledb.create_pool_async(
        user="hr",
        password="password",
        dsn="localhost/xepdb1",
        min=3,
        max=12,
        increment=3
    )
    
    print(f"Async pool created with {pool.opened} connections")
    
    # Acquire async connection
    connection = await pool.acquire()
    
    async with connection.cursor() as cursor:
        await cursor.execute("SELECT employee_id, first_name FROM employees WHERE ROWNUM <= 5")
        async for row in cursor:
            print(f"Employee {row[0]}: {row[1]}")
    
    # Release connection
    await pool.release(connection)
    
    # Close pool
    await pool.close()

asyncio.run(main())

Pool Monitoring and Management

import oracledb
import time

# Create pool with monitoring
pool = oracledb.create_pool(
    user="hr",
    password="password",
    dsn="localhost/xepdb1",
    min=2,
    max=8,
    increment=2
)

def monitor_pool():
    """Monitor pool statistics."""
    print(f"Pool Stats:")
    print(f"  Opened: {pool.opened}")
    print(f"  Busy: {pool.busy}")
    print(f"  Available: {pool.opened - pool.busy}")
    print(f"  Min: {pool.min}, Max: {pool.max}")
    print()

# Initial stats
monitor_pool()

# Acquire multiple connections to see pool growth
connections = []
for i in range(5):
    conn = pool.acquire()
    connections.append(conn)
    print(f"Acquired connection {i+1}")
    monitor_pool()

# Release connections
for i, conn in enumerate(connections):
    pool.release(conn)
    print(f"Released connection {i+1}")
    monitor_pool()

# Reconfigure pool
print("Reconfiguring pool...")
pool.reconfigure(min=4, max=12, increment=3)
monitor_pool()

pool.close()

Context Manager Usage

import oracledb

# Using pool and connections with context managers
pool = oracledb.create_pool(
    user="hr",
    password="password",
    dsn="localhost/xepdb1",
    min=2,
    max=8
)

# Context manager automatically handles acquire/release
with pool.acquire() as connection:
    with connection.cursor() as cursor:
        cursor.execute("SELECT department_id, department_name FROM departments")
        for row in cursor:
            print(f"Department {row[0]}: {row[1]}")
    # Connection is automatically released here

# Pool remains open for reuse
print(f"Pool still has {pool.opened} connections")

pool.close()

Install with Tessl CLI

npx tessl i tessl/pypi-oracledb

docs

advanced-queuing.md

connection-pooling.md

connectivity.md

data-types.md

database-objects.md

index.md

lobs.md

pipeline.md

soda.md

sql-execution.md

subscriptions.md

tile.json