Python interface to Oracle Database with thin and thick connectivity modes
—
Manage pools of database connections for scalable applications. Connection pooling improves performance by reusing connections and provides better resource management for multi-user applications. Supports both synchronous and asynchronous pool operations with extensive configuration options.
Create and configure connection pools with various settings for optimal performance and resource utilization.
def create_pool(
user=None,
password=None,
dsn=None,
min=1,
max=2,
increment=1,
connectiontype=Connection,
getmode=POOL_GETMODE_WAIT,
homogeneous=True,
timeout=0,
wait_timeout=0,
max_lifetime_session=0,
session_callback=None,
max_sessions_per_shard=0,
soda_metadata_cache=False,
ping_interval=60,
**kwargs
) -> ConnectionPool:
"""
Create a connection pool.
Parameters:
- user (str): Username for authentication
- password (str): Password for authentication
- dsn (str): Data source name
- min (int): Minimum number of connections in pool
- max (int): Maximum number of connections in pool
- increment (int): Number of connections to create when pool needs to grow
- connectiontype: Connection class to use (Connection or AsyncConnection)
- getmode (int): Pool get mode (POOL_GETMODE_WAIT, POOL_GETMODE_NOWAIT, etc.)
- homogeneous (bool): Whether all connections use same credentials
- timeout (int): Connection timeout in seconds
- wait_timeout (int): Time to wait for available connection
- max_lifetime_session (int): Maximum session lifetime in seconds
- session_callback: Callback function for new sessions
- ping_interval (int): Interval for connection health checks
Returns:
ConnectionPool object
"""
def create_pool_async(
user=None,
password=None,
dsn=None,
min=1,
max=2,
increment=1,
**kwargs
) -> AsyncConnectionPool:
"""
Create an asynchronous connection pool.
Parameters: Same as create_pool()
Returns:
AsyncConnectionPool object
"""
def get_pool(name=None) -> ConnectionPool:
"""
Get a named connection pool.
Parameters:
- name (str): Pool name (None for default pool)
Returns:
ConnectionPool object
"""Manage a pool of database connections with automatic scaling and resource management.
class ConnectionPool:
"""Synchronous connection pool."""
# Properties
min: int
max: int
increment: int
opened: int
busy: int
timeout: int
getmode: int
homogeneous: bool
name: str
dsn: str
username: str
wait_timeout: int
max_lifetime_session: int
max_sessions_per_shard: int
soda_metadata_cache: bool
ping_interval: int
def acquire(
self,
user=None,
password=None,
cclass=None,
purity=PURITY_DEFAULT,
tag=None,
matchanytag=False,
shardingkey=None,
supershardingkey=None
) -> Connection:
"""
Acquire a connection from the pool.
Parameters:
- user (str): Username (for heterogeneous pools)
- password (str): Password (for heterogeneous pools)
- cclass (str): Connection class for session pooling
- purity (int): Session purity level
- tag (str): Session tag
- matchanytag (bool): Match any tagged session
- shardingkey (list): Sharding key for database sharding
- supershardingkey (list): Super sharding key
Returns:
Connection object from pool
"""
def release(self, connection, tag=None) -> None:
"""
Release a connection back to the pool.
Parameters:
- connection: Connection object to release
- tag (str): Tag to associate with released session
"""
def close(self, force=False) -> None:
"""
Close the connection pool and all connections.
Parameters:
- force (bool): Force close even with active connections
"""
def drop(self, connection) -> None:
"""
Drop a connection from the pool permanently.
Parameters:
- connection: Connection object to drop
"""
def reconfigure(
self,
min=None,
max=None,
increment=None,
timeout=None,
getmode=None,
wait_timeout=None,
max_lifetime_session=None,
ping_interval=None,
**kwargs
) -> None:
"""
Reconfigure pool parameters.
Parameters:
- min (int): New minimum connections
- max (int): New maximum connections
- increment (int): New increment value
- timeout (int): New connection timeout
- getmode (int): New get mode
- wait_timeout (int): New wait timeout
- max_lifetime_session (int): New max session lifetime
- ping_interval (int): New ping interval
"""Asynchronous version of ConnectionPool with async/await support.
class AsyncConnectionPool:
"""Asynchronous connection pool."""
# Properties (same as ConnectionPool)
min: int
max: int
increment: int
opened: int
busy: int
timeout: int
getmode: int
homogeneous: bool
name: str
dsn: str
username: str
wait_timeout: int
max_lifetime_session: int
async def acquire(
self,
user=None,
password=None,
cclass=None,
purity=PURITY_DEFAULT,
tag=None,
matchanytag=False,
shardingkey=None,
supershardingkey=None
) -> AsyncConnection:
"""
Acquire an async connection from the pool.
Parameters: Same as ConnectionPool.acquire()
Returns:
AsyncConnection object from pool
"""
async def release(self, connection, tag=None) -> None:
"""
Release an async connection back to the pool.
Parameters:
- connection: AsyncConnection object to release
- tag (str): Tag to associate with released session
"""
async def close(self, force=False) -> None:
"""
Close the async connection pool and all connections.
Parameters:
- force (bool): Force close even with active connections
"""Configure pool parameters using PoolParams class.
class PoolParams:
"""Pool parameter configuration."""
user: str
password: str
dsn: str
min: int
max: int
increment: int
connectiontype: type
getmode: int
homogeneous: bool
timeout: int
wait_timeout: int
max_lifetime_session: int
session_callback: callable
max_sessions_per_shard: int
soda_metadata_cache: bool
ping_interval: int
stmtcachesize: int
edition: str
events: bool
externalauth: bool
mode: int
threaded: bool
appcontext: list
encoding: str
nencoding: str
tag: str
matchanytag: bool
config_dir: str
appname: str
disable_oob: boolConstants for controlling connection acquisition behavior.
# Pool Get Mode Constants
POOL_GETMODE_WAIT: int # Wait for available connection
POOL_GETMODE_NOWAIT: int # Return immediately if no connection available
POOL_GETMODE_FORCEGET: int # Create new connection beyond max limit
POOL_GETMODE_TIMEDWAIT: int # Wait with timeoutimport oracledb
# Create a basic connection pool
pool = oracledb.create_pool(
user="hr",
password="password",
dsn="localhost/xepdb1",
min=2,
max=10,
increment=2
)
print(f"Pool created with {pool.opened} connections")
# Acquire connection from pool
connection = pool.acquire()
print(f"Pool now has {pool.busy} busy connections")
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM employees")
count = cursor.fetchone()[0]
print(f"Employee count: {count}")
# Release connection back to pool
pool.release(connection)
print(f"Pool now has {pool.busy} busy connections")
# Close the pool
pool.close()import oracledb
def session_callback(connection, requested_tag):
"""Callback function called when session is returned from pool."""
print(f"Session callback called with tag: {requested_tag}")
# Set session state based on tag
if requested_tag == "REPORTING":
with connection.cursor() as cursor:
cursor.execute("ALTER SESSION SET optimizer_mode = FIRST_ROWS")
elif requested_tag == "BATCH":
with connection.cursor() as cursor:
cursor.execute("ALTER SESSION SET optimizer_mode = ALL_ROWS")
# Create pool with advanced configuration
pool = oracledb.create_pool(
user="hr",
password="password",
dsn="localhost/xepdb1",
min=5,
max=20,
increment=3,
getmode=oracledb.POOL_GETMODE_WAIT,
timeout=300, # 5 minutes
wait_timeout=30, # 30 seconds wait for connection
max_lifetime_session=3600, # 1 hour max session life
session_callback=session_callback,
ping_interval=60, # Ping every 60 seconds
homogeneous=True
)
# Acquire connection with specific session requirements
connection = pool.acquire(
cclass="REPORTING",
purity=oracledb.PURITY_SELF,
tag="REPORTING"
)
# Use connection for reporting queries
with connection.cursor() as cursor:
cursor.execute("SELECT department_name, COUNT(*) FROM departments d JOIN employees e ON d.department_id = e.department_id GROUP BY department_name")
for row in cursor:
print(f"Department {row[0]}: {row[1]} employees")
# Release with tag for future reuse
pool.release(connection, tag="REPORTING")
pool.close()import oracledb
# Create heterogeneous pool (different users can connect)
pool = oracledb.create_pool(
dsn="localhost/xepdb1",
min=3,
max=15,
increment=2,
homogeneous=False # Allow different users
)
# Different users can acquire connections
hr_connection = pool.acquire(user="hr", password="hr_password")
sales_connection = pool.acquire(user="sales", password="sales_password")
# Use connections with different privileges
with hr_connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM employees")
hr_count = cursor.fetchone()[0]
print(f"HR can see {hr_count} employees")
with sales_connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM customers")
sales_count = cursor.fetchone()[0]
print(f"Sales can see {sales_count} customers")
# Release connections
pool.release(hr_connection)
pool.release(sales_connection)
pool.close()import asyncio
import oracledb
async def main():
# Create async connection pool
pool = await oracledb.create_pool_async(
user="hr",
password="password",
dsn="localhost/xepdb1",
min=3,
max=12,
increment=3
)
print(f"Async pool created with {pool.opened} connections")
# Acquire async connection
connection = await pool.acquire()
async with connection.cursor() as cursor:
await cursor.execute("SELECT employee_id, first_name FROM employees WHERE ROWNUM <= 5")
async for row in cursor:
print(f"Employee {row[0]}: {row[1]}")
# Release connection
await pool.release(connection)
# Close pool
await pool.close()
asyncio.run(main())import oracledb
import time
# Create pool with monitoring
pool = oracledb.create_pool(
user="hr",
password="password",
dsn="localhost/xepdb1",
min=2,
max=8,
increment=2
)
def monitor_pool():
"""Monitor pool statistics."""
print(f"Pool Stats:")
print(f" Opened: {pool.opened}")
print(f" Busy: {pool.busy}")
print(f" Available: {pool.opened - pool.busy}")
print(f" Min: {pool.min}, Max: {pool.max}")
print()
# Initial stats
monitor_pool()
# Acquire multiple connections to see pool growth
connections = []
for i in range(5):
conn = pool.acquire()
connections.append(conn)
print(f"Acquired connection {i+1}")
monitor_pool()
# Release connections
for i, conn in enumerate(connections):
pool.release(conn)
print(f"Released connection {i+1}")
monitor_pool()
# Reconfigure pool
print("Reconfiguring pool...")
pool.reconfigure(min=4, max=12, increment=3)
monitor_pool()
pool.close()import oracledb
# Using pool and connections with context managers
pool = oracledb.create_pool(
user="hr",
password="password",
dsn="localhost/xepdb1",
min=2,
max=8
)
# Context manager automatically handles acquire/release
with pool.acquire() as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT department_id, department_name FROM departments")
for row in cursor:
print(f"Department {row[0]}: {row[1]}")
# Connection is automatically released here
# Pool remains open for reuse
print(f"Pool still has {pool.opened} connections")
pool.close()Install with Tessl CLI
npx tessl i tessl/pypi-oracledb