Python interface to Oracle Database with thin and thick connectivity modes
—
Handle Binary Large Objects (BLOB), Character Large Objects (CLOB), National Character Large Objects (NCLOB), and Binary Files (BFILE) with streaming read/write operations. LOBs provide efficient storage and manipulation of large data including documents, images, videos, and unstructured text.
Handle large object operations with streaming read/write capabilities for efficient memory usage.
class LOB:
"""Large Object for handling BLOB, CLOB, NCLOB, and BFILE data."""
# Properties
type: type # DB_TYPE_BLOB, DB_TYPE_CLOB, DB_TYPE_NCLOB, or DB_TYPE_BFILE
def read(self, offset=1, amount=None) -> bytes | str:
"""
Read data from the LOB.
Parameters:
- offset (int): Starting position (1-based)
- amount (int): Number of bytes/characters to read (None for all)
Returns:
bytes (for BLOB/BFILE) or str (for CLOB/NCLOB): LOB data
"""
def write(self, data, offset=1) -> None:
"""
Write data to the LOB.
Parameters:
- data (bytes|str): Data to write
- offset (int): Starting position (1-based)
"""
def size(self) -> int:
"""
Get the size of the LOB.
Returns:
int: Size in bytes (BLOB/BFILE) or characters (CLOB/NCLOB)
"""
def trim(self, new_size) -> None:
"""
Trim the LOB to the specified size.
Parameters:
- new_size (int): New size in bytes or characters
"""
def getchunksize(self) -> int:
"""
Get the chunk size for optimal I/O operations.
Returns:
int: Optimal chunk size for read/write operations
"""
def open(self) -> None:
"""Open the LOB for read/write operations."""
def close(self) -> None:
"""Close the LOB and release resources."""
def getfilename(self) -> tuple:
"""
Get the directory alias and filename for BFILE LOBs.
Returns:
tuple: (directory_alias, filename) for BFILE LOBs
Raises:
TypeError: If LOB is not a BFILE
"""
def setfilename(self, directory_alias, filename) -> None:
"""
Set the directory alias and filename for BFILE LOBs.
Parameters:
- directory_alias (str): Oracle directory object name
- filename (str): File name within the directory
Raises:
TypeError: If LOB is not a BFILE
"""
def fileexists(self) -> bool:
"""
Check if the BFILE exists on the file system.
Returns:
bool: True if file exists
Raises:
TypeError: If LOB is not a BFILE
"""
def isopen(self) -> bool:
"""
Check if the LOB is currently open.
Returns:
bool: True if LOB is open
"""Asynchronous version of LOB class with async/await support for all operations.
class AsyncLOB:
"""Asynchronous Large Object for handling BLOB, CLOB, NCLOB, and BFILE data."""
# Properties (same as LOB)
type: type
async def read(self, offset=1, amount=None) -> bytes | str:
"""
Read data from the LOB asynchronously.
Parameters:
- offset (int): Starting position (1-based)
- amount (int): Number of bytes/characters to read (None for all)
Returns:
bytes (for BLOB/BFILE) or str (for CLOB/NCLOB): LOB data
"""
async def write(self, data, offset=1) -> None:
"""
Write data to the LOB asynchronously.
Parameters:
- data (bytes|str): Data to write
- offset (int): Starting position (1-based)
"""
async def size(self) -> int:
"""
Get the size of the LOB asynchronously.
Returns:
int: Size in bytes (BLOB/BFILE) or characters (CLOB/NCLOB)
"""
async def trim(self, new_size) -> None:
"""
Trim the LOB to the specified size asynchronously.
Parameters:
- new_size (int): New size in bytes or characters
"""
async def open(self) -> None:
"""Open the LOB for read/write operations asynchronously."""
async def close(self) -> None:
"""Close the LOB and release resources asynchronously."""Constants for identifying different LOB types.
# LOB Type Constants
DB_TYPE_BLOB: type # Binary Large Object
DB_TYPE_CLOB: type # Character Large Object
DB_TYPE_NCLOB: type # National Character Large Object
DB_TYPE_BFILE: type # Binary File (external file reference)
# Legacy aliases
BLOB: type # Alias for DB_TYPE_BLOB
CLOB: type # Alias for DB_TYPE_CLOB
NCLOB: type # Alias for DB_TYPE_NCLOB
BFILE: type # Alias for DB_TYPE_BFILEimport oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
with connection.cursor() as cursor:
# Create a table with BLOB column
cursor.execute("""
CREATE TABLE documents (
id NUMBER PRIMARY KEY,
name VARCHAR2(100),
content BLOB
)
""")
# Insert binary data
with open("document.pdf", "rb") as f:
pdf_data = f.read()
cursor.execute("""
INSERT INTO documents (id, name, content)
VALUES (:1, :2, :3)
""", [1, "Sample Document", pdf_data])
connection.commit()
# Read BLOB data
cursor.execute("SELECT content FROM documents WHERE id = :1", [1])
lob = cursor.fetchone()[0]
# Read entire BLOB
blob_data = lob.read()
print(f"BLOB size: {len(blob_data)} bytes")
# Write BLOB data to file
with open("downloaded_document.pdf", "wb") as f:
f.write(blob_data)
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
with connection.cursor() as cursor:
# Create table with CLOB column
cursor.execute("""
CREATE TABLE articles (
id NUMBER PRIMARY KEY,
title VARCHAR2(200),
content CLOB
)
""")
# Insert large text content
large_text = "This is a very long article content..." * 1000
cursor.execute("""
INSERT INTO articles (id, title, content)
VALUES (:1, :2, :3)
""", [1, "Sample Article", large_text])
connection.commit()
# Create a temporary CLOB
temp_clob = connection.createlob(oracledb.DB_TYPE_CLOB)
temp_clob.write("Temporary CLOB content")
cursor.execute("""
INSERT INTO articles (id, title, content)
VALUES (:1, :2, :3)
""", [2, "Temp Article", temp_clob])
connection.commit()
# Read CLOB data in chunks
cursor.execute("SELECT content FROM articles WHERE id = :1", [1])
clob = cursor.fetchone()[0]
chunk_size = clob.getchunksize()
print(f"Optimal chunk size: {chunk_size}")
# Read CLOB in chunks
offset = 1
total_size = clob.size()
while offset <= total_size:
chunk = clob.read(offset, chunk_size)
print(f"Read chunk of {len(chunk)} characters starting at {offset}")
offset += len(chunk)
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
with connection.cursor() as cursor:
# Create empty BLOB
cursor.execute("""
INSERT INTO documents (id, name, content)
VALUES (:1, :2, EMPTY_BLOB())
""", [2, "Large File"])
connection.commit()
# Get the BLOB for writing
cursor.execute("""
SELECT content FROM documents WHERE id = :1 FOR UPDATE
""", [2])
blob = cursor.fetchone()[0]
# Stream write large file
with open("large_file.bin", "rb") as f:
chunk_size = blob.getchunksize()
offset = 1
while True:
chunk = f.read(chunk_size)
if not chunk:
break
blob.write(chunk, offset)
offset += len(chunk)
print(f"Written {len(chunk)} bytes at offset {offset - len(chunk)}")
connection.commit()
print(f"Final BLOB size: {blob.size()} bytes")
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
with connection.cursor() as cursor:
# Create directory object (requires DBA privileges)
cursor.execute("CREATE OR REPLACE DIRECTORY FILE_DIR AS '/path/to/files'")
# Create table with BFILE column
cursor.execute("""
CREATE TABLE file_references (
id NUMBER PRIMARY KEY,
name VARCHAR2(100),
file_ref BFILE
)
""")
# Create BFILE reference
cursor.execute("""
INSERT INTO file_references (id, name, file_ref)
VALUES (:1, :2, BFILENAME('FILE_DIR', 'example.txt'))
""", [1, "External File"])
connection.commit()
# Read BFILE
cursor.execute("SELECT file_ref FROM file_references WHERE id = :1", [1])
bfile = cursor.fetchone()[0]
# Check if file exists
if bfile.fileexists():
# Get file information
directory, filename = bfile.getfilename()
print(f"File: {directory}/{filename}")
# Open and read BFILE
bfile.open()
file_content = bfile.read()
print(f"File content: {file_content.decode('utf-8')}")
bfile.close()
else:
print("File does not exist")
connection.close()import asyncio
import oracledb
async def main():
connection = await oracledb.connect_async(user="hr", password="password", dsn="localhost/xepdb1")
async with connection.cursor() as cursor:
# Create table
await cursor.execute("""
CREATE TABLE async_docs (
id NUMBER PRIMARY KEY,
content CLOB
)
""")
# Insert large text
large_text = "Async CLOB content " * 10000
await cursor.execute("""
INSERT INTO async_docs (id, content) VALUES (:1, :2)
""", [1, large_text])
await connection.commit()
# Read CLOB asynchronously
await cursor.execute("SELECT content FROM async_docs WHERE id = :1", [1])
result = await cursor.fetchone()
clob = result[0]
# Async read
content = await clob.read()
print(f"Async read CLOB size: {len(content)} characters")
# Async size
size = await clob.size()
print(f"CLOB size: {size}")
await connection.close()
asyncio.run(main())import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
with connection.cursor() as cursor:
# Create table with CLOB
cursor.execute("""
CREATE TABLE text_docs (
id NUMBER PRIMARY KEY,
content CLOB
)
""")
# Insert initial content
cursor.execute("""
INSERT INTO text_docs (id, content)
VALUES (:1, :2)
""", [1, "Initial content for manipulation"])
connection.commit()
# Get CLOB for manipulation
cursor.execute("""
SELECT content FROM text_docs WHERE id = :1 FOR UPDATE
""", [1])
clob = cursor.fetchone()[0]
# Append content
current_size = clob.size()
clob.write("\nAppended content", current_size + 1)
# Insert content at specific position
clob.write(" [INSERTED] ", 10)
# Read modified content
final_content = clob.read()
print(f"Modified content: {final_content}")
# Trim CLOB
clob.trim(50) # Keep only first 50 characters
trimmed_content = clob.read()
print(f"Trimmed content: {trimmed_content}")
connection.commit()
connection.close()import oracledb
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
def efficient_lob_processing():
"""Demonstrate efficient LOB processing techniques."""
with connection.cursor() as cursor:
# Use optimal chunk size for I/O
cursor.execute("SELECT content FROM large_documents WHERE id = :1", [1])
blob = cursor.fetchone()[0]
# Get optimal chunk size
chunk_size = blob.getchunksize()
# Process BLOB in chunks to manage memory
total_size = blob.size()
processed = 0
offset = 1
while processed < total_size:
# Read chunk
chunk = blob.read(offset, chunk_size)
# Process chunk (example: calculate checksum)
# process_chunk(chunk)
processed += len(chunk)
offset += len(chunk)
print(f"Processed {processed}/{total_size} bytes ({processed/total_size*100:.1f}%)")
def temp_lob_usage():
"""Demonstrate temporary LOB creation and usage."""
# Create temporary BLOB
temp_blob = connection.createlob(oracledb.DB_TYPE_BLOB)
try:
# Write data to temporary BLOB
temp_blob.write(b"Temporary binary data")
# Use temporary BLOB in SQL
with connection.cursor() as cursor:
cursor.execute("""
INSERT INTO temp_storage (id, data) VALUES (:1, :2)
""", [1, temp_blob])
connection.commit()
finally:
# Always close temporary LOBs
temp_blob.close()
# Run examples
efficient_lob_processing()
temp_lob_usage()
connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-oracledb