Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
SQL dialects provide database-specific SQL formatting and operations, abstracting differences between SQL databases including query formatting, data type handling, and schema operations.
Base dialect implementation providing common SQL formatting and operations.
class Dialect:
"""
Generic SQL dialect implementation.
Attributes:
placeholder (str): SQL placeholder character for parameters
inspector: SQLAlchemy inspector for schema operations
insert_statement_format (str): Format string for INSERT statements
replace_statement_format (str): Format string for REPLACE statements
escape_word_format (str): Format string for escaping identifiers
escape_column_names (bool): Whether to escape column names by default
"""
def __init__(self, **kwargs):
pass
def escape_word(self, word):
"""
Escape word if it's a reserved word or needs escaping.
Args:
word (str): Word to potentially escape
Returns:
str: Escaped word if necessary, original word otherwise
"""
pass
def unescape_word(self, word):
"""
Unescape escaped word.
Args:
word (str): Potentially escaped word
Returns:
str: Unescaped word
"""
pass
def extract_schema_from_table(self, table):
"""
Extract schema name from table identifier.
Args:
table (str): Table identifier (may include schema)
Returns:
tuple: (schema, table_name) or (None, table_name)
"""
pass
def get_column_names(self, table, schema=None):
"""
Get column names for specified table.
Args:
table (str): Table name
schema (str, optional): Schema name
Returns:
list: List of column names
"""
pass
def get_target_fields(self, table, schema=None):
"""
Get target fields for table operations.
Args:
table (str): Table name
schema (str, optional): Schema name
Returns:
list: List of target field names
"""
pass
def get_primary_keys(self, table, schema=None):
"""
Get primary key columns for table.
Args:
table (str): Table name
schema (str, optional): Schema name
Returns:
list: List of primary key column names
"""
pass
def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):
"""
Generate INSERT or REPLACE SQL statement.
Args:
table (str): Target table name
values (list): List of value tuples to insert
target_fields (list, optional): Target column names
replace (bool): Use REPLACE instead of INSERT
**kwargs: Additional formatting options
Returns:
str: Generated SQL statement
"""
pass
def generate_replace_sql(self, table, values, target_fields=None, **kwargs):
"""
Generate REPLACE SQL statement.
Args:
table (str): Target table name
values (list): List of value tuples to replace
target_fields (list, optional): Target column names
**kwargs: Additional formatting options
Returns:
str: Generated REPLACE SQL statement
"""
passfrom airflow.providers.common.sql.dialects.dialect import Dialect
# Create dialect instance
dialect = Dialect()
# Escape reserved words or identifiers
escaped_table = dialect.escape_word('order') # May become `order` or "order"
escaped_column = dialect.escape_word('select') # May become `select` or "select"
# Extract schema from table identifier
schema, table = dialect.extract_schema_from_table('myschema.mytable')
# schema = 'myschema', table = 'mytable'
schema, table = dialect.extract_schema_from_table('mytable')
# schema = None, table = 'mytable'# Get table metadata
columns = dialect.get_column_names('users')
# Returns: ['id', 'name', 'email', 'created_at']
columns_with_schema = dialect.get_column_names('users', schema='public')
# Returns column names for public.users table
primary_keys = dialect.get_primary_keys('users')
# Returns: ['id']
target_fields = dialect.get_target_fields('users')
# Returns fields suitable for INSERT operations# Generate INSERT statement
values = [
(1, 'John Doe', 'john@example.com'),
(2, 'Jane Smith', 'jane@example.com')
]
insert_sql = dialect.generate_insert_sql(
table='users',
values=values,
target_fields=['id', 'name', 'email']
)
# Returns: INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
# Generate REPLACE statement (if supported)
replace_sql = dialect.generate_replace_sql(
table='users',
values=values,
target_fields=['id', 'name', 'email']
)
# Returns: REPLACE INTO users (id, name, email) VALUES (%s, %s, %s)
# Use replace=True flag in generate_insert_sql
replace_sql = dialect.generate_insert_sql(
table='users',
values=values,
target_fields=['id', 'name', 'email'],
replace=True
)class PostgreSQLDialect(Dialect):
"""PostgreSQL-specific dialect."""
def __init__(self):
super().__init__()
self.placeholder = '%s'
self.escape_word_format = '"{}"'
self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'
self.replace_statement_format = '''
INSERT INTO {table} ({fields}) VALUES {values}
ON CONFLICT ({primary_keys}) DO UPDATE SET {updates}
'''
def generate_replace_sql(self, table, values, target_fields=None, **kwargs):
# PostgreSQL uses UPSERT instead of REPLACE
primary_keys = self.get_primary_keys(table)
if not primary_keys:
# Fallback to regular INSERT if no primary keys
return self.generate_insert_sql(table, values, target_fields)
# Generate UPSERT statement
fields_str = ', '.join(target_fields or self.get_target_fields(table))
values_placeholder = ', '.join(['%s'] * len(target_fields or []))
updates = ', '.join([
f'{field} = EXCLUDED.{field}'
for field in target_fields
if field not in primary_keys
])
return self.replace_statement_format.format(
table=table,
fields=fields_str,
values=f'({values_placeholder})',
primary_keys=', '.join(primary_keys),
updates=updates
)
class MySQLDialect(Dialect):
"""MySQL-specific dialect."""
def __init__(self):
super().__init__()
self.placeholder = '%s'
self.escape_word_format = '`{}`'
self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'
self.replace_statement_format = 'REPLACE INTO {table} ({fields}) VALUES {values}'class AdvancedDialect(Dialect):
"""Example of advanced dialect features."""
def __init__(self, connection, **kwargs):
super().__init__(**kwargs)
self.connection = connection
self.reserved_words = {'select', 'from', 'where', 'order', 'group'}
def escape_word(self, word):
"""Custom escaping logic."""
if word.lower() in self.reserved_words or ' ' in word:
return self.escape_word_format.format(word)
return word
def get_column_names(self, table, schema=None):
"""Get columns using database introspection."""
full_table = f'{schema}.{table}' if schema else table
cursor = self.connection.cursor()
cursor.execute(f"DESCRIBE {full_table}")
columns = [row[0] for row in cursor.fetchall()]
cursor.close()
return columns
def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):
"""Generate optimized INSERT with batch handling."""
if not values:
return None
# Escape table and field names
escaped_table = self.escape_word(table)
if target_fields:
escaped_fields = [self.escape_word(field) for field in target_fields]
fields_str = ', '.join(escaped_fields)
else:
fields_str = ', '.join([self.escape_word(f) for f in self.get_target_fields(table)])
# Generate placeholders for batch insert
single_row_placeholder = f"({', '.join([self.placeholder] * len(target_fields or []))})"
values_placeholder = ', '.join([single_row_placeholder] * len(values))
statement_format = self.replace_statement_format if replace else self.insert_statement_format
return statement_format.format(
table=escaped_table,
fields=fields_str,
values=values_placeholder
)from airflow.providers.common.sql.hooks.sql import DbApiHook
class CustomDatabaseHook(DbApiHook):
"""Custom hook with dialect support."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize dialect based on database type
self.dialect = self._get_dialect()
def _get_dialect(self):
"""Get appropriate dialect for the database."""
conn = self.get_connection(self.conn_id)
if 'postgres' in conn.conn_type:
return PostgreSQLDialect()
elif 'mysql' in conn.conn_type:
return MySQLDialect()
else:
return Dialect() # Generic dialect
def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):
"""Insert rows using dialect-specific SQL generation."""
if not rows:
return
# Use dialect to generate appropriate SQL
insert_sql = self.dialect.generate_insert_sql(
table=table,
values=rows,
target_fields=target_fields,
replace=replace
)
# Execute the generated SQL
self.run(insert_sql, parameters=rows, autocommit=True)placeholder: Parameter placeholder (e.g., '%s', '?', ':1')escape_word_format: Format for escaping identifiers (e.g., '{}', '"{}"')insert_statement_format: Template for INSERT statementsreplace_statement_format: Template for REPLACE/UPSERT statementsescape_column_names: Whether to escape column names by defaultPostgreSQL: placeholder='%s', escape_word_format='"{}"'
MySQL: placeholder='%s', escape_word_format='{}'
SQLite: placeholder='?', escape_word_format='[{}]'
SQL Server: placeholder='?', escape_word_format='[{}]'
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-sql