Pure-Python PostgreSQL driver providing DB-API 2.0 compliant database connectivity with native pg8000 API and comprehensive PostgreSQL data type support.
—
Modern streamlined interface optimized for direct query execution with automatic parameter handling and simplified result processing, providing an alternative to the legacy DB-API 2.0 interface.
import pg8000.native
# Available exports from pg8000.native
conn = pg8000.native.Connection(...) # Native Connection class
pg8000.native.identifier("table_name") # Identifier escaping function
pg8000.native.literal(value) # Literal value formatting function
# Type constants are also available
from pg8000.native import INTEGER, TEXT, BOOLEAN, JSON, UUID_TYPENote: PreparedStatement objects are created through the Connection.prepare() method and are not directly importable.
Streamlined connection class with direct query execution and simplified parameter handling.
class Connection:
"""
Native pg8000 connection with modern API.
Properties:
- columns: list - Column information from last query
- row_count: int - Number of rows affected by last query
"""
def __init__(
self,
user: str,
host: str = "localhost",
database: str = None,
port: int = 5432,
password: str = None,
source_address: tuple = None,
unix_sock: str = None,
ssl_context = None,
timeout: float = None,
tcp_keepalive: bool = True,
application_name: str = None,
replication: str = None,
startup_params: dict = None,
sock = None
):
"""
Create native connection to PostgreSQL database.
Parameters:
- user: Username for database authentication
- host: Database server hostname or IP address
- database: Database name to connect to
- port: Database server port number
- password: Password for database authentication
- source_address: (host, port) tuple for local socket binding
- unix_sock: Path to Unix domain socket for local connections
- ssl_context: SSL context for encrypted connections
- timeout: Socket timeout in seconds
- tcp_keepalive: Enable TCP keepalive for connection health
- application_name: Application name for connection identification
- replication: Replication mode ('database' or 'physical')
- startup_params: Additional connection parameters dict
- sock: Existing socket connection for database communication
Raises:
InterfaceError: Connection parameter or interface errors
OperationalError: Database connection errors
"""
def run(self, sql: str, stream=None, types: dict = None, **params) -> list:
"""
Execute SQL statement with named parameters and return results.
Parameters:
- sql: SQL statement with :param placeholders
- stream: Optional stream for COPY operations
- types: Dict mapping parameter names to PostgreSQL type OIDs
- **params: Named parameters for SQL statement
Returns:
List of result rows as lists, or None for non-SELECT statements
Raises:
ProgrammingError: Invalid SQL syntax
DataError: Invalid data values
OperationalError: Database execution errors
"""
def prepare(self, sql: str) -> PreparedStatement:
"""
Create a prepared statement for efficient repeated execution.
Parameters:
- sql: SQL statement to prepare with :param placeholders
Returns:
Native PreparedStatement object
Raises:
ProgrammingError: Invalid SQL syntax
"""
def close(self) -> None:
"""
Close the database connection and free resources.
"""Prepared statement optimized for the native interface with simplified parameter handling.
class PreparedStatement:
"""
Native prepared statement for efficient repeated execution.
Properties:
- columns: list - Column information from prepared statement
"""
def run(self, stream=None, **params) -> list:
"""
Execute prepared statement with named parameters.
Parameters:
- stream: Optional stream for COPY operations
- **params: Named parameter values
Returns:
List of result rows as lists, or None for non-SELECT statements
Raises:
DataError: Invalid parameter values
OperationalError: Execution errors
"""
def close(self) -> None:
"""
Close the prepared statement and free resources.
"""Utility functions for parsing SQL queries with named parameters.
def to_statement(query: str) -> tuple:
"""
Parse SQL query with named parameters into parameterized form.
Parameters:
- query: SQL query string with :param placeholders
Returns:
Tuple of (parsed_sql, param_function)
- parsed_sql: SQL with placeholders converted to $1, $2, etc.
- param_function: Function to extract parameter values
Raises:
ProgrammingError: Invalid parameter syntax
"""Internal enumeration for query parsing states.
class State(Enum):
"""
Parser states for SQL query parsing.
Values:
- OUT: Outside any special context
- IN_SQ: Inside single-quoted string
- IN_QI: Inside quoted identifier
- IN_ES: Inside escape sequence
- IN_PN: Inside parameter name
- IN_CO: Inside comment
- IN_DQ: Inside double-quoted string
- IN_DP: Inside dollar-quoted string
"""
OUT = "OUT"
IN_SQ = "IN_SQ"
IN_QI = "IN_QI"
IN_ES = "IN_ES"
IN_PN = "IN_PN"
IN_CO = "IN_CO"
IN_DQ = "IN_DQ"
IN_DP = "IN_DP"import pg8000.native
# Connect using native interface
conn = pg8000.native.Connection(
user="myuser",
password="mypass",
database="mydb"
)
# Execute query with named parameters
results = conn.run(
"SELECT id, name, email FROM users WHERE active = :active AND age > :min_age",
active=True,
min_age=18
)
# Process results
for row in results:
print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")
# Check column information
print("Columns:", conn.columns)
print("Row count:", conn.row_count)
conn.close()import pg8000.native
import datetime
conn = pg8000.native.Connection(
user="myuser",
password="mypass",
database="mydb"
)
# Insert new record
conn.run(
"""INSERT INTO users (name, email, created_at, active)
VALUES (:name, :email, :created, :active)""",
name="Jane Smith",
email="jane@example.com",
created=datetime.datetime.now(),
active=True
)
print(f"Inserted {conn.row_count} row(s)")
# Update existing records
conn.run(
"UPDATE users SET last_login = :login_time WHERE email = :email",
login_time=datetime.datetime.now(),
email="jane@example.com"
)
print(f"Updated {conn.row_count} row(s)")
conn.close()import pg8000.native
conn = pg8000.native.Connection(
user="myuser",
password="mypass",
database="mydb"
)
# Prepare statement for repeated execution
stmt = conn.prepare(
"INSERT INTO event_log (event_type, user_id, message, timestamp) VALUES (:type, :user, :msg, :ts)"
)
# Execute multiple times with different parameters
import datetime
events = [
("login", 123, "User logged in", datetime.datetime.now()),
("page_view", 123, "Viewed dashboard", datetime.datetime.now()),
("logout", 123, "User logged out", datetime.datetime.now())
]
for event_type, user_id, message, timestamp in events:
result = stmt.run(
type=event_type,
user=user_id,
msg=message,
ts=timestamp
)
print(f"Logged event: {event_type}")
# Clean up
stmt.close()
conn.close()import pg8000.native
from pg8000 import INTEGER, TEXT, BOOLEAN
conn = pg8000.native.Connection(
user="myuser",
password="mypass",
database="mydb"
)
# Execute query with explicit type specification
results = conn.run(
"""SELECT u.id, u.name, u.email, p.title
FROM users u
JOIN posts p ON u.id = p.user_id
WHERE u.active = :active AND p.published = :published
ORDER BY p.created_at DESC
LIMIT :limit""",
types={
'active': BOOLEAN,
'published': BOOLEAN,
'limit': INTEGER
},
active=True,
published=True,
limit=10
)
# Process results
for row in results:
user_id, name, email, post_title = row
print(f"User: {name} ({email}) - Post: {post_title}")
conn.close()Install with Tessl CLI
npx tessl i tessl/pypi-pg8000