Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions
—
Support for Oracle user-defined types including object types, collections (VARRAYs and nested tables), and object attribute access for complex data structures and custom types.
Retrieve and work with Oracle user-defined object types from the database.
class Connection:
def gettype(self, name: str) -> ObjectType:
"""
Get Oracle object type by name.
Parameters:
- name (str): Object type name (schema.type_name or type_name)
Returns:
ObjectType object for creating instances
"""class ObjectType:
@property
def schema(self) -> str:
"""Object type schema name"""
@property
def name(self) -> str:
"""Object type name"""
@property
def attributes(self) -> list:
"""List of object attributes (ObjectAttr objects)"""
@property
def iscollection(self) -> bool:
"""Whether type is a collection (VARRAY or nested table)"""
def newobject(self) -> Object:
"""
Create new instance of object type.
Returns:
Object instance ready for attribute assignment
"""Usage examples:
# Database setup (run in SQL*Plus or similar):
# CREATE TYPE address_type AS OBJECT (
# street VARCHAR2(100),
# city VARCHAR2(50),
# state VARCHAR2(20),
# zip_code VARCHAR2(10)
# );
#
# CREATE TYPE employee_type AS OBJECT (
# emp_id NUMBER,
# name VARCHAR2(100),
# address address_type,
# hire_date DATE
# );
# Get object types
address_type = connection.gettype("ADDRESS_TYPE")
employee_type = connection.gettype("EMPLOYEE_TYPE")
print(f"Employee type schema: {employee_type.schema}")
print(f"Employee type name: {employee_type.name}")
print(f"Is collection: {employee_type.iscollection}")
# List attributes
for attr in employee_type.attributes:
print(f"Attribute: {attr.name}, Type: {attr.type}")Create and work with object instances.
class Object:
def copy(self) -> Object:
"""Create copy of object"""
def aslist(self) -> list:
"""Convert object to list (for collections)"""
def asdict(self) -> dict:
"""Convert object to dictionary"""Usage examples:
# Create address object
address = address_type.newobject()
address.STREET = "123 Main Street"
address.CITY = "Anytown"
address.STATE = "CA"
address.ZIP_CODE = "12345"
# Create employee object
employee = employee_type.newobject()
employee.EMP_ID = 1001
employee.NAME = "John Doe"
employee.ADDRESS = address
employee.HIRE_DATE = datetime.date(2023, 1, 15)
# Insert into database
cursor.execute("INSERT INTO employees VALUES (:1)", (employee,))
connection.commit()
# Copy object
employee_copy = employee.copy()
employee_copy.EMP_ID = 1002
employee_copy.NAME = "Jane Smith"
# Convert to dictionary
emp_dict = employee.asdict()
print(f"Employee data: {emp_dict}")Work with Oracle collection types (VARRAYs and nested tables).
class Object:
def append(self, value) -> None:
"""Append element to collection"""
def extend(self, sequence) -> None:
"""Extend collection with sequence of values"""
def getelement(self, index: int):
"""Get collection element at index (1-based)"""
def setelement(self, index: int, value) -> None:
"""Set collection element at index (1-based)"""
def exists(self, index: int) -> bool:
"""Check if collection element exists at index"""
def delete(self, index: int) -> None:
"""Delete collection element at index"""
def trim(self, count: int) -> None:
"""Remove count elements from end of collection"""
def size(self) -> int:
"""Get number of elements in collection"""
def first(self) -> int:
"""Get first valid index in collection"""
def last(self) -> int:
"""Get last valid index in collection"""
def next(self, index: int) -> int:
"""Get next valid index after given index"""
def prev(self, index: int) -> int:
"""Get previous valid index before given index"""Usage examples:
# Database setup for collections:
# CREATE TYPE phone_list AS VARRAY(5) OF VARCHAR2(20);
# CREATE TYPE employee_with_phones AS OBJECT (
# emp_id NUMBER,
# name VARCHAR2(100),
# phones phone_list
# );
# Get collection type
phone_list_type = connection.gettype("PHONE_LIST")
emp_phones_type = connection.gettype("EMPLOYEE_WITH_PHONES")
# Create collection
phones = phone_list_type.newobject()
phones.append("555-1234")
phones.append("555-5678")
phones.append("555-9012")
# Create employee with phone collection
emp_with_phones = emp_phones_type.newobject()
emp_with_phones.EMP_ID = 2001
emp_with_phones.NAME = "Alice Johnson"
emp_with_phones.PHONES = phones
# Work with collection elements
print(f"Number of phones: {phones.size()}")
print(f"First phone: {phones.getelement(1)}") # Oracle uses 1-based indexing
print(f"Last phone: {phones.getelement(phones.size())}")
# Modify collection
phones.setelement(2, "555-WORK") # Change second phone
phones.append("555-HOME") # Add another phone
# Check if element exists
if phones.exists(1):
print(f"First phone exists: {phones.getelement(1)}")
# Delete element
phones.delete(3) # Remove third phone
# Convert collection to Python list
phone_list = phones.aslist()
print(f"Phone list: {phone_list}")
# Extend collection
additional_phones = ["555-CELL", "555-FAX"]
phones.extend(additional_phones)Access and work with object attribute metadata.
class ObjectAttr:
@property
def name(self) -> str:
"""Attribute name"""
@property
def type(self):
"""Attribute type information"""Usage examples:
# Examine object type attributes
for attr in employee_type.attributes:
print(f"Attribute: {attr.name}")
print(f"Type: {attr.type}")
# Access nested object attributes
if attr.name == "ADDRESS":
address_attrs = attr.type.attributes if hasattr(attr.type, 'attributes') else []
for nested_attr in address_attrs:
print(f" Nested: {nested_attr.name} - {nested_attr.type}")Handle complex nested object structures:
# Database setup for nested objects:
# CREATE TYPE department_type AS OBJECT (
# dept_id NUMBER,
# dept_name VARCHAR2(50),
# manager employee_type
# );
dept_type = connection.gettype("DEPARTMENT_TYPE")
# Create nested object structure
manager = employee_type.newobject()
manager.EMP_ID = 5001
manager.NAME = "Jane Manager"
manager.ADDRESS = address # Reuse address from previous example
manager.HIRE_DATE = datetime.date(2020, 1, 1)
department = dept_type.newobject()
department.DEPT_ID = 100
department.DEPT_NAME = "Engineering"
department.MANAGER = manager
# Insert complex object
cursor.execute("INSERT INTO departments VALUES (:1)", (department,))
connection.commit()
# Query and access nested data
cursor.execute("SELECT * FROM departments WHERE dept_id = :1", (100,))
dept_row = cursor.fetchone()[0]
print(f"Department: {dept_row.DEPT_NAME}")
print(f"Manager: {dept_row.MANAGER.NAME}")
print(f"Manager Address: {dept_row.MANAGER.ADDRESS.CITY}, {dept_row.MANAGER.ADDRESS.STATE}")Work with collections containing object instances:
# Database setup for object collections:
# CREATE TYPE employee_list AS TABLE OF employee_type;
# CREATE TYPE project_type AS OBJECT (
# project_id NUMBER,
# project_name VARCHAR2(100),
# team_members employee_list
# );
employee_list_type = connection.gettype("EMPLOYEE_LIST")
project_type = connection.gettype("PROJECT_TYPE")
# Create collection of employees
team = employee_list_type.newobject()
# Add team members
for i, name in enumerate(["Alice", "Bob", "Charlie"], 1):
member = employee_type.newobject()
member.EMP_ID = 3000 + i
member.NAME = name
member.ADDRESS = address # Shared address for example
member.HIRE_DATE = datetime.date(2023, 1, i)
team.append(member)
# Create project with team
project = project_type.newobject()
project.PROJECT_ID = 1001
project.PROJECT_NAME = "Database Migration"
project.TEAM_MEMBERS = team
# Insert project
cursor.execute("INSERT INTO projects VALUES (:1)", (project,))
connection.commit()
# Query and process team members
cursor.execute("SELECT * FROM projects WHERE project_id = :1", (1001,))
proj_row = cursor.fetchone()[0]
print(f"Project: {proj_row.PROJECT_NAME}")
print("Team members:")
for i in range(1, proj_row.TEAM_MEMBERS.size() + 1):
member = proj_row.TEAM_MEMBERS.getelement(i)
print(f" {member.EMP_ID}: {member.NAME}")Optimize object operations for better performance:
def bulk_object_operations(object_type, data_list):
"""Efficiently create and insert multiple objects"""
objects = []
for data in data_list:
obj = object_type.newobject()
# Set attributes from data dictionary
for attr_name, value in data.items():
setattr(obj, attr_name.upper(), value)
objects.append(obj)
# Bulk insert
cursor = connection.cursor()
cursor.executemany("INSERT INTO object_table VALUES (:1)",
[(obj,) for obj in objects])
connection.commit()
cursor.close()
def cache_object_types(connection):
"""Cache frequently used object types"""
type_cache = {}
# Pre-load common types
common_types = ["EMPLOYEE_TYPE", "ADDRESS_TYPE", "DEPARTMENT_TYPE"]
for type_name in common_types:
try:
type_cache[type_name] = connection.gettype(type_name)
except cx_Oracle.DatabaseError:
print(f"Warning: Type {type_name} not found")
return type_cache
# Usage
type_cache = cache_object_types(connection)
employee_type = type_cache.get("EMPLOYEE_TYPE")Convert objects to/from standard Python data structures:
def object_to_dict(obj):
"""Convert Oracle object to Python dictionary recursively"""
if obj is None:
return None
result = {}
obj_dict = obj.asdict()
for attr_name, value in obj_dict.items():
if hasattr(value, 'asdict'): # Nested object
result[attr_name] = object_to_dict(value)
elif hasattr(value, 'aslist'): # Collection
result[attr_name] = [
object_to_dict(item) if hasattr(item, 'asdict') else item
for item in value.aslist()
]
else:
result[attr_name] = value
return result
def dict_to_object(data_dict, object_type):
"""Convert Python dictionary to Oracle object"""
obj = object_type.newobject()
for attr_name, value in data_dict.items():
if isinstance(value, dict):
# Handle nested objects (requires type information)
pass # Implementation depends on specific schema
elif isinstance(value, list):
# Handle collections (requires type information)
pass # Implementation depends on specific schema
else:
setattr(obj, attr_name.upper(), value)
return obj
# Usage
emp_dict = object_to_dict(employee)
print(f"Employee as dict: {emp_dict}")Handle object-related errors and exceptions:
try:
# Object operations
obj_type = connection.gettype("NONEXISTENT_TYPE")
except cx_Oracle.DatabaseError as e:
error_obj, = e.args
if error_obj.code == 942: # Object does not exist
print("Object type not found")
elif error_obj.code == 22303: # Type not found
print("Type not found in database")
else:
print(f"Object type error: {error_obj.message}")
try:
# Object attribute access
obj = obj_type.newobject()
obj.NONEXISTENT_ATTR = "value"
except AttributeError as e:
print(f"Attribute error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cx-oracle