CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-sql

Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dialects.mddocs/

SQL Dialects

SQL dialects provide database-specific SQL formatting and operations, abstracting differences between SQL databases including query formatting, data type handling, and schema operations.

Capabilities

Generic SQL Dialect

Base dialect implementation providing common SQL formatting and operations.

class Dialect:
    """
    Generic SQL dialect implementation.
    
    Attributes:
        placeholder (str): SQL placeholder character for parameters
        inspector: SQLAlchemy inspector for schema operations
        insert_statement_format (str): Format string for INSERT statements
        replace_statement_format (str): Format string for REPLACE statements  
        escape_word_format (str): Format string for escaping identifiers
        escape_column_names (bool): Whether to escape column names by default
    """
    
    def __init__(self, **kwargs):
        pass
    
    def escape_word(self, word):
        """
        Escape word if it's a reserved word or needs escaping.
        
        Args:
            word (str): Word to potentially escape
            
        Returns:
            str: Escaped word if necessary, original word otherwise
        """
        pass
    
    def unescape_word(self, word):
        """
        Unescape escaped word.
        
        Args:
            word (str): Potentially escaped word
            
        Returns:
            str: Unescaped word
        """
        pass
    
    def extract_schema_from_table(self, table):
        """
        Extract schema name from table identifier.
        
        Args:
            table (str): Table identifier (may include schema)
            
        Returns:
            tuple: (schema, table_name) or (None, table_name)
        """
        pass
    
    def get_column_names(self, table, schema=None):
        """
        Get column names for specified table.
        
        Args:
            table (str): Table name
            schema (str, optional): Schema name
            
        Returns:
            list: List of column names
        """
        pass
    
    def get_target_fields(self, table, schema=None):
        """
        Get target fields for table operations.
        
        Args:
            table (str): Table name
            schema (str, optional): Schema name
            
        Returns:
            list: List of target field names
        """
        pass
    
    def get_primary_keys(self, table, schema=None):
        """
        Get primary key columns for table.
        
        Args:
            table (str): Table name
            schema (str, optional): Schema name
            
        Returns:
            list: List of primary key column names
        """
        pass
    
    def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):
        """
        Generate INSERT or REPLACE SQL statement.
        
        Args:
            table (str): Target table name
            values (list): List of value tuples to insert
            target_fields (list, optional): Target column names
            replace (bool): Use REPLACE instead of INSERT
            **kwargs: Additional formatting options
            
        Returns:
            str: Generated SQL statement
        """
        pass
    
    def generate_replace_sql(self, table, values, target_fields=None, **kwargs):
        """
        Generate REPLACE SQL statement.
        
        Args:
            table (str): Target table name
            values (list): List of value tuples to replace
            target_fields (list, optional): Target column names
            **kwargs: Additional formatting options
            
        Returns:
            str: Generated REPLACE SQL statement
        """
        pass

Usage Examples

Basic Dialect Usage

from airflow.providers.common.sql.dialects.dialect import Dialect

# Create dialect instance
dialect = Dialect()

# Escape reserved words or identifiers
escaped_table = dialect.escape_word('order')  # May become `order` or "order"
escaped_column = dialect.escape_word('select')  # May become `select` or "select"

# Extract schema from table identifier
schema, table = dialect.extract_schema_from_table('myschema.mytable')
# schema = 'myschema', table = 'mytable'

schema, table = dialect.extract_schema_from_table('mytable')
# schema = None, table = 'mytable'

Schema Operations

# Get table metadata
columns = dialect.get_column_names('users')
# Returns: ['id', 'name', 'email', 'created_at']

columns_with_schema = dialect.get_column_names('users', schema='public')
# Returns column names for public.users table

primary_keys = dialect.get_primary_keys('users')
# Returns: ['id']

target_fields = dialect.get_target_fields('users')
# Returns fields suitable for INSERT operations

SQL Generation

# Generate INSERT statement
values = [
    (1, 'John Doe', 'john@example.com'),
    (2, 'Jane Smith', 'jane@example.com')
]

insert_sql = dialect.generate_insert_sql(
    table='users',
    values=values,
    target_fields=['id', 'name', 'email']
)
# Returns: INSERT INTO users (id, name, email) VALUES (%s, %s, %s)

# Generate REPLACE statement (if supported)
replace_sql = dialect.generate_replace_sql(
    table='users',
    values=values,
    target_fields=['id', 'name', 'email']
)
# Returns: REPLACE INTO users (id, name, email) VALUES (%s, %s, %s)

# Use replace=True flag in generate_insert_sql
replace_sql = dialect.generate_insert_sql(
    table='users',
    values=values,
    target_fields=['id', 'name', 'email'],
    replace=True
)

Custom Dialect Implementation

class PostgreSQLDialect(Dialect):
    """PostgreSQL-specific dialect."""
    
    def __init__(self):
        super().__init__()
        self.placeholder = '%s'
        self.escape_word_format = '"{}"'
        self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'
        self.replace_statement_format = '''
            INSERT INTO {table} ({fields}) VALUES {values}
            ON CONFLICT ({primary_keys}) DO UPDATE SET {updates}
        '''
    
    def generate_replace_sql(self, table, values, target_fields=None, **kwargs):
        # PostgreSQL uses UPSERT instead of REPLACE
        primary_keys = self.get_primary_keys(table)
        
        if not primary_keys:
            # Fallback to regular INSERT if no primary keys
            return self.generate_insert_sql(table, values, target_fields)
        
        # Generate UPSERT statement
        fields_str = ', '.join(target_fields or self.get_target_fields(table))
        values_placeholder = ', '.join(['%s'] * len(target_fields or []))
        
        updates = ', '.join([
            f'{field} = EXCLUDED.{field}' 
            for field in target_fields 
            if field not in primary_keys
        ])
        
        return self.replace_statement_format.format(
            table=table,
            fields=fields_str,
            values=f'({values_placeholder})',
            primary_keys=', '.join(primary_keys),
            updates=updates
        )

class MySQLDialect(Dialect):
    """MySQL-specific dialect."""
    
    def __init__(self):
        super().__init__()
        self.placeholder = '%s'
        self.escape_word_format = '`{}`'
        self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'
        self.replace_statement_format = 'REPLACE INTO {table} ({fields}) VALUES {values}'

Advanced Dialect Features

class AdvancedDialect(Dialect):
    """Example of advanced dialect features."""
    
    def __init__(self, connection, **kwargs):
        super().__init__(**kwargs)
        self.connection = connection
        self.reserved_words = {'select', 'from', 'where', 'order', 'group'}
    
    def escape_word(self, word):
        """Custom escaping logic."""
        if word.lower() in self.reserved_words or ' ' in word:
            return self.escape_word_format.format(word)
        return word
    
    def get_column_names(self, table, schema=None):
        """Get columns using database introspection."""
        full_table = f'{schema}.{table}' if schema else table
        
        cursor = self.connection.cursor()
        cursor.execute(f"DESCRIBE {full_table}")
        columns = [row[0] for row in cursor.fetchall()]
        cursor.close()
        
        return columns
    
    def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):
        """Generate optimized INSERT with batch handling."""
        if not values:
            return None
        
        # Escape table and field names
        escaped_table = self.escape_word(table)
        
        if target_fields:
            escaped_fields = [self.escape_word(field) for field in target_fields]
            fields_str = ', '.join(escaped_fields)
        else:
            fields_str = ', '.join([self.escape_word(f) for f in self.get_target_fields(table)])
        
        # Generate placeholders for batch insert
        single_row_placeholder = f"({', '.join([self.placeholder] * len(target_fields or []))})"
        values_placeholder = ', '.join([single_row_placeholder] * len(values))
        
        statement_format = self.replace_statement_format if replace else self.insert_statement_format
        
        return statement_format.format(
            table=escaped_table,
            fields=fields_str,
            values=values_placeholder
        )

Dialect Integration with Hooks

from airflow.providers.common.sql.hooks.sql import DbApiHook

class CustomDatabaseHook(DbApiHook):
    """Custom hook with dialect support."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Initialize dialect based on database type
        self.dialect = self._get_dialect()
    
    def _get_dialect(self):
        """Get appropriate dialect for the database."""
        conn = self.get_connection(self.conn_id)
        
        if 'postgres' in conn.conn_type:
            return PostgreSQLDialect()
        elif 'mysql' in conn.conn_type:
            return MySQLDialect()
        else:
            return Dialect()  # Generic dialect
    
    def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):
        """Insert rows using dialect-specific SQL generation."""
        if not rows:
            return
        
        # Use dialect to generate appropriate SQL
        insert_sql = self.dialect.generate_insert_sql(
            table=table,
            values=rows,
            target_fields=target_fields,
            replace=replace
        )
        
        # Execute the generated SQL
        self.run(insert_sql, parameters=rows, autocommit=True)

Dialect Properties

Standard Properties

  • placeholder: Parameter placeholder (e.g., '%s', '?', ':1')
  • escape_word_format: Format for escaping identifiers (e.g., '{}', '"{}"')
  • insert_statement_format: Template for INSERT statements
  • replace_statement_format: Template for REPLACE/UPSERT statements
  • escape_column_names: Whether to escape column names by default

Database-Specific Examples

PostgreSQL: placeholder='%s', escape_word_format='"{}"' MySQL: placeholder='%s', escape_word_format='{}' SQLite: placeholder='?', escape_word_format='[{}]' SQL Server: placeholder='?', escape_word_format='[{}]'

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-sql

docs

dialects.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json