Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions
—
Document-oriented database operations for JSON documents with collection management, document operations, and query capabilities providing a NoSQL-style interface to Oracle Database.
Access SODA functionality through database connections for document operations.
class Connection:
def getSodaDatabase(self) -> SodaDatabase:
"""
Get SODA database object for document operations.
Returns:
SodaDatabase object for collection and document management
"""class SodaDatabase:
def createCollection(self, name: str, metadata=None) -> SodaCollection:
"""
Create new document collection.
Parameters:
- name (str): Collection name
- metadata (dict): Collection metadata specification
Returns:
SodaCollection object
"""
def openCollection(self, name: str) -> SodaCollection:
"""
Open existing document collection.
Parameters:
- name (str): Collection name
Returns:
SodaCollection object or None if not found
"""
def getCollectionNames(self, startName=None, limit=None) -> list:
"""
Get list of collection names.
Parameters:
- startName (str): Starting name for filtering
- limit (int): Maximum number of names to return
Returns:
List of collection names
"""
def createDocument(self, content, key=None, mediaType=None) -> SodaDoc:
"""
Create document from content.
Parameters:
- content: Document content (dict, str, or bytes)
- key (str): Document key (auto-generated if None)
- mediaType (str): Media type (default: application/json)
Returns:
SodaDoc object
"""Usage examples:
# Get SODA database
soda_db = connection.getSodaDatabase()
# Create collection with default settings
employees_coll = soda_db.createCollection("employees")
# Create collection with custom metadata
metadata = {
"keyColumn": {"name": "ID"},
"contentColumn": {"name": "JSON_DOCUMENT", "jsonFormat": "OSON"},
"versionColumn": {"name": "VERSION"},
"lastModifiedColumn": {"name": "LAST_MODIFIED"},
"creationTimeColumn": {"name": "CREATED_ON"}
}
products_coll = soda_db.createCollection("products", metadata)
# Open existing collection
orders_coll = soda_db.openCollection("orders")
if orders_coll is None:
print("Orders collection not found")
# List all collections
collection_names = soda_db.getCollectionNames()
print(f"Collections: {collection_names}")
# List collections starting with 'emp'
emp_collections = soda_db.getCollectionNames(startName="emp", limit=10)Create and manage individual documents within collections.
class SodaDoc:
@property
def key(self) -> str:
"""Document key (unique identifier)"""
@property
def content(self) -> dict:
"""Document content as dictionary"""
@property
def mediaType(self) -> str:
"""Document media type"""
@property
def version(self) -> str:
"""Document version"""
@property
def createdOn(self) -> str:
"""Document creation timestamp"""
@property
def lastModified(self) -> str:
"""Document last modification timestamp"""
def getContent(self) -> dict:
"""Get document content as dictionary"""
def getContentAsBytes(self) -> bytes:
"""Get document content as bytes"""
def getContentAsString(self) -> str:
"""Get document content as JSON string"""Usage examples:
# Create documents from different content types
employee_dict = {
"id": 1001,
"name": "John Doe",
"department": "Engineering",
"salary": 75000,
"skills": ["Python", "Oracle", "SQL"]
}
# Create document from dictionary
emp_doc = soda_db.createDocument(employee_dict)
print(f"Created document with key: {emp_doc.key}")
# Create document with specific key
emp_doc_2 = soda_db.createDocument(
{"id": 1002, "name": "Jane Smith", "department": "Sales"},
key="emp_1002"
)
# Create document from JSON string
json_str = '{"id": 1003, "name": "Bob Johnson", "active": true}'
emp_doc_3 = soda_db.createDocument(json_str)
# Access document properties
print(f"Document key: {emp_doc.key}")
print(f"Content: {emp_doc.content}")
print(f"Media type: {emp_doc.mediaType}")
print(f"Version: {emp_doc.version}")Manage documents within collections with insert, update, and delete operations.
class SodaCollection:
@property
def name(self) -> str:
"""Collection name"""
@property
def metadata(self) -> dict:
"""Collection metadata"""
def insertOne(self, doc) -> SodaDoc:
"""
Insert single document into collection.
Parameters:
- doc: Document content (dict, SodaDoc, or JSON string)
Returns:
SodaDoc representing inserted document with generated metadata
"""
def insertMany(self, docs: list) -> list:
"""
Insert multiple documents into collection.
Parameters:
- docs (list): List of documents to insert
Returns:
List of SodaDoc objects representing inserted documents
"""
def insertOneAndGet(self, doc) -> SodaDoc:
"""
Insert document and return complete document with metadata.
Parameters:
- doc: Document content to insert
Returns:
Complete SodaDoc with all metadata
"""
def save(self, doc) -> None:
"""
Save document (insert or replace).
Parameters:
- doc: Document to save
"""
def saveAndGet(self, doc) -> SodaDoc:
"""
Save document and return complete document with metadata.
Parameters:
- doc: Document to save
Returns:
Complete SodaDoc with all metadata
"""
def truncate(self) -> None:
"""Remove all documents from collection"""
def drop(self) -> bool:
"""
Drop collection and all documents.
Returns:
True if collection was dropped, False if not found
"""Usage examples:
# Insert single document
result = employees_coll.insertOne({
"id": 2001,
"name": "Alice Brown",
"department": "Marketing",
"hire_date": "2023-01-15"
})
print(f"Inserted document with key: {result.key}")
# Insert multiple documents
new_employees = [
{"id": 2002, "name": "Charlie Davis", "department": "IT"},
{"id": 2003, "name": "Diana Wilson", "department": "HR"},
{"id": 2004, "name": "Eve Miller", "department": "Finance"}
]
results = employees_coll.insertMany(new_employees)
print(f"Inserted {len(results)} documents")
# Insert and get complete document
doc_with_metadata = employees_coll.insertOneAndGet({
"id": 2005,
"name": "Frank Garcia",
"department": "Operations"
})
print(f"Document created on: {doc_with_metadata.createdOn}")
# Save or update document
employee_update = {
"id": 2001,
"name": "Alice Brown",
"department": "Marketing",
"salary": 65000, # Added salary
"promotion_date": "2023-06-01" # Added promotion
}
employees_coll.save(employee_update)Find and retrieve documents using various query methods.
class SodaCollection:
def find(self) -> SodaOperation:
"""
Create operation for finding documents.
Returns:
SodaOperation object for building queries
"""class SodaOperation:
def key(self, key: str) -> SodaOperation:
"""Filter by document key"""
def keys(self, keys: list) -> SodaOperation:
"""Filter by multiple document keys"""
def filter(self, filterSpec: dict) -> SodaOperation:
"""Apply JSON filter specification"""
def version(self, version: str) -> SodaOperation:
"""Filter by document version"""
def limit(self, limit: int) -> SodaOperation:
"""Limit number of results"""
def skip(self, skip: int) -> SodaOperation:
"""Skip number of documents"""
def count(self) -> int:
"""Count matching documents"""
def getCursor(self) -> SodaDocCursor:
"""Get cursor for iterating results"""
def getDocuments(self) -> list:
"""Get all matching documents as list"""
def getOne(self) -> SodaDoc:
"""Get first matching document"""
def replaceOne(self, doc) -> SodaDoc:
"""Replace first matching document"""
def replaceOneAndGet(self, doc) -> SodaDoc:
"""Replace first matching document and return result"""
def remove(self) -> int:
"""
Remove matching documents.
Returns:
Number of documents removed
"""Usage examples:
# Find document by key
doc = employees_coll.find().key("emp_1002").getOne()
if doc:
print(f"Found employee: {doc.content['name']}")
# Find documents by multiple keys
keys = ["emp_1001", "emp_1002", "emp_1003"]
docs = employees_coll.find().keys(keys).getDocuments()
print(f"Found {len(docs)} employees")
# Query with JSON filter - find by department
dept_filter = {"department": "Engineering"}
eng_employees = employees_coll.find().filter(dept_filter).getDocuments()
for emp in eng_employees:
print(f"Engineer: {emp.content['name']}")
# Complex filter - salary range query
salary_filter = {"salary": {"$gte": 50000, "$lte": 80000}}
mid_range = employees_coll.find().filter(salary_filter).getDocuments()
# Query with limit and skip (pagination)
page_size = 10
page_num = 2
page_docs = (employees_coll.find()
.skip((page_num - 1) * page_size)
.limit(page_size)
.getDocuments())
# Count documents matching criteria
total_engineers = employees_coll.find().filter({"department": "Engineering"}).count()
print(f"Total engineers: {total_engineers}")
# Update documents using query
employees_coll.find().filter({"department": "IT"}).replaceOne({
"department": "Information Technology",
"updated": True
})
# Remove documents
removed_count = employees_coll.find().filter({"active": False}).remove()
print(f"Removed {removed_count} inactive employees")Iterate through large result sets efficiently using cursors.
class SodaDocCursor:
def getNext(self) -> SodaDoc:
"""
Get next document from cursor.
Returns:
Next SodaDoc or None if no more documents
"""
def close(self) -> None:
"""Close cursor and free resources"""Usage examples:
# Iterate through all documents using cursor
cursor = employees_coll.find().getCursor()
try:
while True:
doc = cursor.getNext()
if doc is None:
break
employee = doc.content
print(f"Employee {employee['id']}: {employee['name']}")
finally:
cursor.close()
# Context manager for automatic cursor cleanup
with employees_coll.find().getCursor() as cursor:
doc = cursor.getNext()
while doc:
# Process document
print(f"Processing: {doc.key}")
doc = cursor.getNext()Create and manage indexes for better query performance.
class SodaCollection:
def createIndex(self, spec: dict) -> None:
"""
Create index on collection.
Parameters:
- spec (dict): Index specification
"""
def dropIndex(self, name: str, force=False) -> bool:
"""
Drop index from collection.
Parameters:
- name (str): Index name
- force (bool): Force drop even if index is being used
Returns:
True if index was dropped
"""
def getDataGuide(self) -> SodaDoc:
"""
Get collection data guide (schema summary).
Returns:
SodaDoc containing data guide information
"""Usage examples:
# Create simple index on department field
dept_index = {
"name": "dept_idx",
"fields": [{"path": "department", "datatype": "varchar2", "maxLength": 100}]
}
employees_coll.createIndex(dept_index)
# Create composite index
composite_index = {
"name": "dept_salary_idx",
"fields": [
{"path": "department", "datatype": "varchar2", "maxLength": 100},
{"path": "salary", "datatype": "number"}
]
}
employees_coll.createIndex(composite_index)
# Create functional index
functional_index = {
"name": "upper_name_idx",
"fields": [{"path": "upper(name)", "datatype": "varchar2", "maxLength": 200}]
}
employees_coll.createIndex(functional_index)
# Drop index
success = employees_coll.dropIndex("dept_idx")
print(f"Index dropped: {success}")
# Get data guide to understand document structure
data_guide = employees_coll.getDataGuide()
if data_guide:
guide_content = data_guide.content
print("Collection schema summary:")
print(json.dumps(guide_content, indent=2))Efficiently handle large numbers of documents:
def bulk_insert_employees(collection, employee_data):
"""Insert large number of employees efficiently"""
batch_size = 1000
for i in range(0, len(employee_data), batch_size):
batch = employee_data[i:i + batch_size]
results = collection.insertMany(batch)
print(f"Inserted batch: {len(results)} documents")
def bulk_update_salaries(collection, salary_increases):
"""Update multiple employee salaries"""
for emp_id, new_salary in salary_increases.items():
collection.find().filter({"id": emp_id}).replaceOne({
"salary": new_salary,
"last_updated": "2023-12-01"
})
# Usage
large_employee_list = [{"id": i, "name": f"Employee {i}"} for i in range(1, 10001)]
bulk_insert_employees(employees_coll, large_employee_list)
salary_updates = {1001: 80000, 1002: 75000, 1003: 85000}
bulk_update_salaries(employees_coll, salary_updates)Use MongoDB-style query operators in filters:
# Comparison operators
employees_coll.find().filter({"salary": {"$gt": 70000}}) # Greater than
employees_coll.find().filter({"salary": {"$gte": 70000}}) # Greater than or equal
employees_coll.find().filter({"salary": {"$lt": 50000}}) # Less than
employees_coll.find().filter({"salary": {"$lte": 50000}}) # Less than or equal
employees_coll.find().filter({"salary": {"$ne": 60000}}) # Not equal
# Array operators
employees_coll.find().filter({"skills": {"$in": ["Python", "Java"]}}) # In array
employees_coll.find().filter({"skills": {"$nin": ["COBOL", "Fortran"]}}) # Not in array
employees_coll.find().filter({"skills": {"$all": ["Python", "SQL"]}}) # All elements
# Logical operators
employees_coll.find().filter({
"$and": [
{"department": "Engineering"},
{"salary": {"$gt": 70000}}
]
})
employees_coll.find().filter({
"$or": [
{"department": "Sales"},
{"department": "Marketing"}
]
})
# Existence and type checking
employees_coll.find().filter({"email": {"$exists": True}}) # Field exists
employees_coll.find().filter({"salary": {"$type": "number"}}) # Field type
# Regular expressions
employees_coll.find().filter({"name": {"$regex": "^John.*"}}) # Name starts with "John"Handle SODA-specific errors and exceptions:
try:
# SODA operations
collection = soda_db.createCollection("test_collection")
doc = collection.insertOne({"test": "data"})
except cx_Oracle.DatabaseError as e:
error_obj, = e.args
if error_obj.code == 40842: # Collection already exists
print("Collection already exists")
collection = soda_db.openCollection("test_collection")
elif error_obj.code == 40623: # Invalid document
print("Invalid document format")
else:
print(f"SODA error: {error_obj.message}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Cleanup resources
if 'cursor' in locals():
cursor.close()Install with Tessl CLI
npx tessl i tessl/pypi-cx-oracle