Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions
—
Connection pooling for scalable applications with configurable pool sizes, connection sharing, and automatic connection management for high-performance Oracle Database access.
Create and configure session pools for optimal connection management.
class SessionPool:
def __init__(self, user: str, password: str, dsn: str, min: int, max: int,
increment: int, connectiontype=Connection, threaded=True,
getmode=SPOOL_ATTRVAL_NOWAIT, events=False, homogeneous=True,
externalauth=False, encoding=None, nencoding=None, edition=None,
timeout=0, waitTimeout=0, maxLifetimeSession=0,
sessionCallback=None, maxSessionsPerShard=0, stmtcachesize=20,
ping_interval=60, **kwargs):
"""
Create session pool for connection management.
Parameters:
- user (str): Database username
- password (str): Database password
- dsn (str): Data source name
- min (int): Minimum number of connections in pool
- max (int): Maximum number of connections in pool
- increment (int): Number of connections to create when pool is exhausted
- connectiontype: Connection class to use (default: Connection)
- threaded (bool): Enable thread safety
- getmode (int): Connection acquisition mode
- events (bool): Enable Oracle events
- homogeneous (bool): All connections use same credentials
- externalauth (bool): Use external authentication
- encoding (str): Character encoding
- timeout (int): Session timeout in seconds
- waitTimeout (int): Time to wait for connection (seconds)
- maxLifetimeSession (int): Maximum session lifetime (seconds)
- sessionCallback: Function called when session created/returned
- stmtcachesize (int): Statement cache size per connection
- ping_interval (int): Connection ping interval (seconds)
"""Usage examples:
# Basic pool creation
pool = cx_Oracle.SessionPool("scott", "tiger", "localhost:1521/XE",
min=2, max=10, increment=2)
# Pool with custom configuration
pool = cx_Oracle.SessionPool(
user="hr",
password="password",
dsn="prod_db",
min=5,
max=50,
increment=5,
timeout=300, # 5 minute session timeout
waitTimeout=10, # 10 second wait for connection
stmtcachesize=50, # Larger statement cache
ping_interval=30 # Ping every 30 seconds
)
# Pool with session callback
def session_callback(connection, requestedTag, actualTag):
"""Called when session acquired from pool"""
print(f"Session acquired with tag: {actualTag}")
connection.current_schema = "HR"
pool = cx_Oracle.SessionPool("user", "pass", "dsn", 2, 10, 2,
sessionCallback=session_callback)Acquire and release connections from the pool.
class SessionPool:
def acquire(self, user=None, password=None, cclass=None, purity=None,
tag=None, matchanytag=False, shardingkey=[],
supershardingkey=[]) -> Connection:
"""
Acquire connection from pool.
Parameters:
- user (str): Override username for heterogeneous pools
- password (str): Override password for heterogeneous pools
- cclass (str): Connection class for DRCP
- purity (int): Session purity (ATTR_PURITY_NEW, ATTR_PURITY_SELF)
- tag (str): Requested session tag
- matchanytag (bool): Accept any tagged session
- shardingkey (list): Sharding key for sharded databases
- supershardingkey (list): Super sharding key
Returns:
Connection object from pool
"""
def release(self, connection: Connection, tag=None) -> None:
"""
Return connection to pool.
Parameters:
- connection: Connection object to return
- tag (str): Tag to associate with returned session
"""
def drop(self, connection: Connection) -> None:
"""
Drop connection from pool permanently.
Parameters:
- connection: Connection object to drop
"""
def close(self, force=False) -> None:
"""
Close pool and all connections.
Parameters:
- force (bool): Force close even with active connections
"""Usage examples:
# Basic connection acquisition
conn = pool.acquire()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees")
# Use connection...
finally:
pool.release(conn)
# Context manager (automatic release)
with pool.acquire() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM employees")
count = cursor.fetchone()[0]
print(f"Employee count: {count}")
# Tagged session management
with pool.acquire(tag="readonly") as conn:
conn.current_schema = "HR_READONLY"
# Connection returned with "readonly" tag
# Acquire tagged session (reuses previous session if available)
with pool.acquire(tag="readonly") as conn:
# Gets same session configuration as above
pass
# Drop problematic connection
conn = pool.acquire()
try:
# If connection has issues...
pool.drop(conn) # Permanently remove from pool
except:
# Don't release back to pool
pool.drop(conn)Monitor pool status and configuration.
class SessionPool:
@property
def username(self) -> str:
"""Pool username"""
@property
def dsn(self) -> str:
"""Data source name"""
@property
def tnsentry(self) -> str:
"""TNS entry (alias for dsn)"""
@property
def max(self) -> int:
"""Maximum number of connections"""
@property
def min(self) -> int:
"""Minimum number of connections"""
@property
def increment(self) -> int:
"""Connection increment value"""
@property
def opened(self) -> int:
"""Current number of opened connections"""
@property
def busy(self) -> int:
"""Current number of busy connections"""
@property
def timeout(self) -> int:
"""Session timeout in seconds"""
@property
def getmode(self) -> int:
"""Connection acquisition mode"""
@property
def homogeneous(self) -> bool:
"""Whether pool is homogeneous"""
@property
def name(self) -> str:
"""Pool name"""
@property
def stmtcachesize(self) -> int:
"""Statement cache size per connection"""
@property
def ping_interval(self) -> int:
"""Connection ping interval in seconds"""Usage examples:
# Monitor pool status
print(f"Pool status: {pool.busy}/{pool.opened}/{pool.max} (busy/opened/max)")
print(f"Pool efficiency: {(pool.busy/pool.opened)*100:.1f}%")
# Pool configuration info
print(f"Min connections: {pool.min}")
print(f"Max connections: {pool.max}")
print(f"Increment: {pool.increment}")
print(f"Timeout: {pool.timeout} seconds")
# Check if pool needs tuning
if pool.busy == pool.max:
print("Warning: Pool at maximum capacity")
elif pool.opened > pool.min and pool.busy < pool.min:
print("Info: Pool may be oversized")Control how connections are acquired from the pool.
SPOOL_ATTRVAL_WAIT: int # Wait for available connection
SPOOL_ATTRVAL_NOWAIT: int # Don't wait, raise error if none available
SPOOL_ATTRVAL_FORCEGET: int # Force new connection beyond max limit
SPOOL_ATTRVAL_TIMEDWAIT: int # Wait with timeout for connectionUsage examples:
# Pool that waits for connections
wait_pool = cx_Oracle.SessionPool("user", "pass", "dsn", 2, 5, 1,
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
# Pool that fails immediately if no connections available
nowait_pool = cx_Oracle.SessionPool("user", "pass", "dsn", 2, 5, 1,
getmode=cx_Oracle.SPOOL_ATTRVAL_NOWAIT)
# Try to acquire with specific behavior
try:
conn = nowait_pool.acquire()
except cx_Oracle.DatabaseError:
print("No connections immediately available")Control session state and reusability.
ATTR_PURITY_DEFAULT: int # Default purity
ATTR_PURITY_NEW: int # Require new session
ATTR_PURITY_SELF: int # Self-contained sessionUsage examples:
# Require fresh session (no previous state)
conn = pool.acquire(purity=cx_Oracle.ATTR_PURITY_NEW)
# Self-contained session (cleaned up automatically)
conn = pool.acquire(purity=cx_Oracle.ATTR_PURITY_SELF)Create pools that support multiple user credentials.
# Create heterogeneous pool
hetero_pool = cx_Oracle.SessionPool(None, None, "dsn", 2, 10, 2,
homogeneous=False,
externalauth=True)
# Acquire connections with different credentials
hr_conn = hetero_pool.acquire(user="hr", password="hr_pass")
sales_conn = hetero_pool.acquire(user="sales", password="sales_pass")Use Oracle's Database Resident Connection Pooling for additional scalability.
# Connect to DRCP-enabled service
drcp_dsn = "hostname:1521/service_name:POOLED"
pool = cx_Oracle.SessionPool("user", "pass", drcp_dsn, 0, 5, 1)
# Use connection class for better pooling
conn = pool.acquire(cclass="MYAPP")Implement session callbacks for connection initialization.
def init_session(connection, requestedTag, actualTag):
"""Initialize session when acquired from pool"""
# Set session-specific configuration
connection.current_schema = "APP_SCHEMA"
# Set session parameters
cursor = connection.cursor()
cursor.execute("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'")
cursor.close()
# Log session acquisition
print(f"Session initialized with tag: {actualTag}")
pool = cx_Oracle.SessionPool("user", "pass", "dsn", 2, 10, 2,
sessionCallback=init_session)Recommendations for pool configuration:
# Conservative pool for low-traffic applications
small_pool = cx_Oracle.SessionPool("user", "pass", "dsn",
min=2, max=10, increment=2)
# Aggressive pool for high-traffic applications
large_pool = cx_Oracle.SessionPool("user", "pass", "dsn",
min=10, max=100, increment=10)
# Web application pool (typical sizing)
web_pool = cx_Oracle.SessionPool("user", "pass", "dsn",
min=5, # Always keep 5 connections
max=50, # Scale up to 50 under load
increment=5, # Add 5 at a time
timeout=300, # 5 minute session timeout
waitTimeout=30) # Wait up to 30s for connectionPool-specific error handling patterns:
try:
conn = pool.acquire()
except cx_Oracle.DatabaseError as e:
error_obj, = e.args
if error_obj.code == 24496: # Pool exhausted
print("Connection pool exhausted")
elif error_obj.code == 24457: # Pool closed
print("Connection pool has been closed")
else:
print(f"Database error: {error_obj.message}")
# Always release connections, even on error
conn = None
try:
conn = pool.acquire()
# Use connection...
except Exception as e:
print(f"Error: {e}")
finally:
if conn:
pool.release(conn)Proper pool cleanup and resource management:
try:
# Use pool for application lifetime
pool = cx_Oracle.SessionPool("user", "pass", "dsn", 5, 50, 5)
# Application logic using pool...
finally:
# Clean shutdown
if pool:
pool.close(force=True) # Close all connectionsInstall with Tessl CLI
npx tessl i tessl/pypi-cx-oracle