or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-microsoft-mssql

Apache Airflow provider package for Microsoft SQL Server (MSSQL) database integration with hooks, dialects, and connection management.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-microsoft-mssql@4.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-microsoft-mssql@4.3.0

index.mddocs/

Apache Airflow Microsoft SQL Server Provider

Apache Airflow provider package for Microsoft SQL Server (MSSQL) database integration. This provider enables Airflow workflows to connect to and interact with Microsoft SQL Server databases through a standardized hook interface, custom SQL dialect, and connection management.

Package Information

  • Package Name: apache-airflow-providers-microsoft-mssql
  • Language: Python
  • Installation: pip install apache-airflow-providers-microsoft-mssql
  • Requirements: Apache Airflow ≥2.10.0, Python ≥3.10

Core Imports

from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.dialects.mssql import MsSqlDialect

Basic Usage

from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook

# Create connection to MSSQL database
hook = MsSqlHook(mssql_conn_id='my_mssql_connection')

# Execute SQL query
results = hook.get_records("SELECT * FROM my_table WHERE id > %s", parameters=[100])

# Get connection for manual operations
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM my_table")
count = cursor.fetchone()[0]
cursor.close()
conn.close()

# Use with SQLAlchemy
sqlalchemy_conn = hook.get_sqlalchemy_connection()
with sqlalchemy_conn:
    result = sqlalchemy_conn.execute("SELECT * FROM my_table")
    rows = result.fetchall()

Architecture

This provider follows Apache Airflow's provider architecture pattern:

  • Hook (MsSqlHook): Primary interface for database operations, extending DbApiHook with MSSQL-specific functionality
  • Dialect (MsSqlDialect): MSSQL-specific SQL dialect implementation for advanced operations like primary key discovery and MERGE statements
  • Provider Registration: Integration with Airflow's provider system through get_provider_info and entry points

The provider integrates with Airflow's connection management system and supports both direct pymssql connections and SQLAlchemy engine connections for flexibility.

Capabilities

Database Connection and Operations

Core MSSQL database connectivity and operations through the MsSqlHook class, which extends Airflow's standard SQL hook with MSSQL-specific features.

class MsSqlHook(DbApiHook):
    """
    Interact with Microsoft SQL Server.
    
    Parameters:
    - mssql_conn_id: Airflow connection ID for MSSQL database
    - sqlalchemy_scheme: Custom SQLAlchemy scheme (default: 'mssql+pymssql')
    - schema: Database schema to use
    """
    
    conn_name_attr = "mssql_conn_id"
    default_conn_name = "mssql_default" 
    conn_type = "mssql"
    hook_name = "Microsoft SQL Server"
    supports_autocommit = True
    DEFAULT_SQLALCHEMY_SCHEME = "mssql+pymssql"

    def __init__(self, *args, sqlalchemy_scheme: str | None = None, **kwargs) -> None: ...
    
    def get_conn(self) -> PymssqlConnection:
        """Return pymssql connection object."""
    
    def get_uri(self) -> str:
        """Get database URI with proper SQLAlchemy scheme."""
    
    def get_sqlalchemy_connection(self, connect_kwargs: dict | None = None, engine_kwargs: dict | None = None) -> Any:
        """Get SQLAlchemy connection object."""
        
    def set_autocommit(self, conn: PymssqlConnection, autocommit: bool) -> None:
        """Set autocommit mode on connection."""
        
    def get_autocommit(self, conn: PymssqlConnection) -> bool:
        """Get current autocommit state."""

    def get_openlineage_database_info(self, connection) -> DatabaseInfo:
        """Return MSSQL-specific information for OpenLineage."""
        
    def get_openlineage_database_dialect(self, connection) -> str:
        """Return database dialect ('mssql')."""
        
    def get_openlineage_default_schema(self) -> str | None:
        """Return current schema using SCHEMA_NAME()."""

    @property
    def sqlalchemy_scheme(self) -> str:
        """SQLAlchemy scheme from constructor, connection extras, or default."""
        
    @property  
    def dialect_name(self) -> str:
        """Return 'mssql'."""
        
    @property
    def dialect(self) -> Dialect:
        """Return MsSqlDialect instance."""

SQL Dialect Operations

MSSQL-specific SQL dialect implementation providing advanced database operations like primary key discovery and MERGE statement generation.

class MsSqlDialect(Dialect):
    """Microsoft SQL Server dialect implementation."""
    
    def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None:
        """
        Get primary key columns for a table.
        
        Parameters:
        - table: Table name to query
        - schema: Optional schema name
        
        Returns:
        List of primary key column names or None if no primary keys found
        """
        
    def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str:
        """
        Generate MERGE statement for upsert operations.
        
        Parameters:
        - table: Target table name
        - values: Values to insert/update
        - target_fields: List of field names
        
        Returns:
        MERGE SQL statement string
        """

Provider Registration

Provider metadata and registration functionality for Airflow provider system integration.

def get_provider_info() -> dict:
    """
    Return provider metadata dictionary.
    
    Returns:
    Dictionary containing:
    - package-name: str
    - name: str  
    - description: str
    - integrations: list[dict]
    - dialects: list[dict]
    - hooks: list[dict]
    - connection-types: list[dict]
    """

Types

from pymssql import Connection as PymssqlConnection
from airflow.providers.common.sql.dialects.dialect import Dialect
from airflow.providers.openlineage.sqlparser import DatabaseInfo

# Connection configuration for MSSQL
class MSSQLConnection:
    """
    MSSQL connection configuration.
    
    Attributes:
    - host: SQL Server hostname/IP
    - port: SQL Server port (default: 1433)
    - schema: Database name
    - login: Username
    - password: Password
    - extra: Additional connection parameters as JSON
    """

Connection Configuration

To use this provider, configure an MSSQL connection in Airflow:

# Connection configuration example
{
    "conn_id": "mssql_default",
    "conn_type": "mssql", 
    "host": "localhost",
    "port": 1433,
    "schema": "my_database",
    "login": "username",
    "password": "password",
    "extra": {
        "sqlalchemy_scheme": "mssql+pymssql"  # optional
    }
}

Dependencies

  • apache-airflow: ≥2.10.0
  • apache-airflow-providers-common-sql: ≥1.23.0
  • pymssql: ≥2.3.5
  • methodtools: ≥0.4.7

Optional Dependencies

  • apache-airflow-providers-openlineage: For data lineage tracking (install with pip install apache-airflow-providers-microsoft-mssql[openlineage])

Error Handling

The provider inherits error handling from the base DbApiHook class and pymssql driver. Common exceptions include:

  • pymssql.Error: Base exception for pymssql-related errors
  • sqlalchemy.exc.SQLAlchemyError: SQLAlchemy connection and query errors
  • airflow.exceptions.AirflowException: Airflow-specific errors for connection and configuration issues