Python client for the Impala distributed query engine and HiveServer2 implementations
—
Full DB API 2.0 compliant interface providing connection management, query execution, and result fetching for HiveServer2 implementations (Impala and Hive).
Establishes and manages connections to HiveServer2 with comprehensive authentication and transport options.
def connect(host='localhost', port=21050, database=None, timeout=None,
use_ssl=False, ca_cert=None, auth_mechanism='NOSASL', user=None,
password=None, kerberos_service_name='impala', use_ldap=None,
ldap_user=None, ldap_password=None, use_kerberos=None,
protocol=None, krb_host=None, use_http_transport=False,
http_path='', auth_cookie_names=None, http_cookie_names=None,
retries=3, jwt=None, user_agent=None,
get_user_custom_headers_func=None):
"""
Get a connection to HiveServer2 (HS2).
Parameters:
host (str): The hostname for HS2. For Impala, this can be any of the impalads.
port (int): The port number for HS2. Default is 21050 for Impala.
database (str): The default database. If None, implementation-dependent.
timeout (int): Connection timeout in seconds. Default is no timeout.
use_ssl (bool): Enable SSL.
ca_cert (str): Local path to the third-party CA certificate.
auth_mechanism (str): Authentication mechanism ('NOSASL', 'PLAIN', 'GSSAPI', 'LDAP', 'JWT').
user (str): LDAP user, if applicable.
password (str): LDAP password, if applicable.
kerberos_service_name (str): Service principal name. Default is 'impala'.
use_http_transport (bool): Use HTTP transport instead of binary transport.
http_path (str): Path in the HTTP URL when using HTTP transport.
auth_cookie_names (list or str): Cookie names for cookie-based authentication.
http_cookie_names (list or str): Cookie names for session management.
retries (int): Number of connection retries. Default is 3.
jwt (str): JSON Web Token for JWT authentication.
user_agent (str): Custom user agent string.
get_user_custom_headers_func (callable): Function to get custom HTTP headers.
Returns:
HiveServer2Connection: Connection object implementing DB API 2.0
"""Connection objects provide DB API 2.0 compliant database connection management.
class HiveServer2Connection:
"""Main connection class implementing DB API 2.0 Connection interface."""
def close(self):
"""Close the connection."""
def commit(self):
"""Commit current transaction (no-op for Impala/Hive)."""
def rollback(self):
"""Rollback current transaction (no-op for Impala/Hive)."""
def cursor(self, user=None, configuration=None, convert_types=True,
dictify=False, fetch_error=True, close_finished_queries=True,
convert_strings_to_unicode=True):
"""
Return a new cursor object using the connection.
Parameters:
user (str): Optional user for the cursor session
configuration (dict): Configuration overlay for the HS2 session
convert_types (bool): Convert timestamps and decimals to Python types
dictify (bool): Return rows as dictionaries instead of tuples
fetch_error (bool): Whether to fetch error details on query failure
close_finished_queries (bool): Auto-close finished queries
convert_strings_to_unicode (bool): Convert strings to Unicode
Returns:
HiveServer2Cursor or HiveServer2DictCursor: Cursor object
"""
def reconnect(self):
"""Reconnect to the database."""
def kerberized(self):
"""Check if connection uses Kerberos authentication."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""Cursor objects provide DB API 2.0 compliant query execution and result fetching.
class HiveServer2Cursor:
"""Cursor implementation for executing queries."""
# Properties
description: list
"""Sequence of 7-item sequences describing each result column."""
rowcount: int
"""Number of rows that the last execute() produced or affected."""
arraysize: int
"""Read/write attribute specifying number of rows to fetch at a time."""
buffersize: int
"""Buffer size for fetching results."""
# Methods
def execute(self, query, parameters=None):
"""
Execute a database operation (query or command).
Parameters:
query (str): SQL query to execute
parameters (dict): Query parameters for substitution
"""
def executemany(self, query, seq_of_parameters):
"""
Execute a database operation repeatedly.
Parameters:
query (str): SQL query to execute
seq_of_parameters (sequence): Sequence of parameter dictionaries
"""
def fetchone(self):
"""
Fetch the next row of a query result set.
Returns:
tuple or None: Next row or None when no more data available
"""
def fetchmany(self, size=None):
"""
Fetch the next set of rows of a query result set.
Parameters:
size (int): Number of rows to fetch. Uses arraysize if None.
Returns:
list: List of tuples representing rows
"""
def fetchall(self):
"""
Fetch all (remaining) rows of a query result set.
Returns:
list: List of tuples representing all rows
"""
def close(self):
"""Close the cursor."""
def get_profile(self):
"""Get the query profile information."""
def get_summary(self):
"""Get the query execution summary."""
def get_log(self):
"""Get the query execution log."""
def ping(self):
"""Ping the server to check connection status."""
def __iter__(self):
"""Iterator interface for cursor results."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""Specialized cursor that returns results as dictionaries instead of tuples.
class HiveServer2DictCursor(HiveServer2Cursor):
"""
Dictionary cursor implementation that returns rows as dictionaries.
Inherits from HiveServer2Cursor but returns each row as a dictionary
with column names as keys instead of tuples.
"""
def fetchone(self):
"""
Fetch the next row as a dictionary.
Returns:
dict or None: Next row as dictionary with column names as keys,
or None when no more data available
"""
def fetchmany(self, size=None):
"""
Fetch the next set of rows as dictionaries.
Parameters:
size (int): Number of rows to fetch. Uses arraysize if None.
Returns:
list: List of dictionaries representing rows
"""
def fetchall(self):
"""
Fetch all (remaining) rows as dictionaries.
Returns:
list: List of dictionaries representing all rows
"""Type objects for proper data type handling according to DB API 2.0 specification.
# Type objects for data type identification
STRING: _DBAPITypeObject
"""Type object for string data types. Matches 'STRING' type."""
BINARY: _DBAPITypeObject
"""Type object for binary data types. Matches 'BINARY' type."""
NUMBER: _DBAPITypeObject
"""
Type object for numeric data types.
Matches: 'BOOLEAN', 'TINYINT', 'SMALLINT', 'INT', 'BIGINT', 'FLOAT', 'DOUBLE', 'DECIMAL'
"""
DATETIME: _DBAPITypeObject
"""Type object for datetime data types. Matches 'TIMESTAMP' type."""
DATE: _DBAPITypeObject
"""Type object for date data types. Matches 'DATE' type."""
ROWID: _DBAPITypeObject
"""Type object for row identifier data types. Empty values set."""DB API 2.0 compliant date and time constructors.
def Date(year, month, day):
"""
Construct a date value.
Parameters:
year (int): Year
month (int): Month (1-12)
day (int): Day (1-31)
Returns:
datetime.date: Date object
"""
def Time(hour, minute, second):
"""
Construct a time value.
Parameters:
hour (int): Hour (0-23)
minute (int): Minute (0-59)
second (int): Second (0-59)
Returns:
datetime.time: Time object
"""
def Timestamp(year, month, day, hour, minute, second):
"""
Construct a timestamp value.
Parameters:
year (int): Year
month (int): Month (1-12)
day (int): Day (1-31)
hour (int): Hour (0-23)
minute (int): Minute (0-59)
second (int): Second (0-59)
Returns:
datetime.datetime: Timestamp object
"""
def DateFromTicks(ticks):
"""
Construct a date from Unix timestamp.
Parameters:
ticks (float): Unix timestamp
Returns:
datetime.date: Date object
"""
def TimeFromTicks(ticks):
"""
Construct a time from Unix timestamp.
Parameters:
ticks (float): Unix timestamp
Returns:
datetime.time: Time object
"""
def TimestampFromTicks(ticks):
"""
Construct a timestamp from Unix timestamp.
Parameters:
ticks (float): Unix timestamp
Returns:
datetime.datetime: Timestamp object
"""
def Binary(data):
"""
Construct binary data object.
Parameters:
data (bytes): Binary data
Returns:
memoryview: Binary data object
"""DB API 2.0 module-level constants indicating compliance and capabilities.
apilevel = '2.0'
"""String constant stating the supported DB API level."""
threadsafety = 1
"""Integer constant stating the thread safety level."""
paramstyle = 'pyformat'
"""String constant stating the parameter style."""
AUTH_MECHANISMS = ['NOSASL', 'PLAIN', 'GSSAPI', 'LDAP', 'JWT']
"""List of supported authentication mechanisms."""from impala.dbapi import connect
# Connect with basic authentication
conn = connect(
host='impala-cluster.example.com',
port=21050,
auth_mechanism='PLAIN',
user='username',
password='password'
)
# Create cursor and execute query
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM my_table")
result = cursor.fetchone()
print(f"Row count: {result[0]}")
# Clean up
cursor.close()
conn.close()from impala.dbapi import connect
# Using connection as context manager
with connect(host='impala-host', port=21050) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM my_table LIMIT 5")
for row in cursor.fetchall():
print(row)from impala.dbapi import connect
conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()
# Parameterized query using pyformat style
query = "SELECT * FROM users WHERE age > %(min_age)s AND city = %(city)s"
params = {'min_age': 25, 'city': 'San Francisco'}
cursor.execute(query, params)
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()from impala.dbapi import connect
# Connect using Kerberos authentication
conn = connect(
host='secure-impala.example.com',
port=21050,
auth_mechanism='GSSAPI',
kerberos_service_name='impala',
use_ssl=True
)
cursor = conn.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Impala version: {version[0]}")
cursor.close()
conn.close()Install with Tessl CLI
npx tessl i tessl/pypi-impyla