Apache Airflow provider package for Microsoft SQL Server (MSSQL) database integration with hooks, dialects, and connection management.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-microsoft-mssql@4.3.0Apache Airflow provider package for Microsoft SQL Server (MSSQL) database integration. This provider enables Airflow workflows to connect to and interact with Microsoft SQL Server databases through a standardized hook interface, custom SQL dialect, and connection management.
pip install apache-airflow-providers-microsoft-mssqlfrom airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.dialects.mssql import MsSqlDialectfrom airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
# Create connection to MSSQL database
hook = MsSqlHook(mssql_conn_id='my_mssql_connection')
# Execute SQL query
results = hook.get_records("SELECT * FROM my_table WHERE id > %s", parameters=[100])
# Get connection for manual operations
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM my_table")
count = cursor.fetchone()[0]
cursor.close()
conn.close()
# Use with SQLAlchemy
sqlalchemy_conn = hook.get_sqlalchemy_connection()
with sqlalchemy_conn:
result = sqlalchemy_conn.execute("SELECT * FROM my_table")
rows = result.fetchall()This provider follows Apache Airflow's provider architecture pattern:
The provider integrates with Airflow's connection management system and supports both direct pymssql connections and SQLAlchemy engine connections for flexibility.
Core MSSQL database connectivity and operations through the MsSqlHook class, which extends Airflow's standard SQL hook with MSSQL-specific features.
class MsSqlHook(DbApiHook):
"""
Interact with Microsoft SQL Server.
Parameters:
- mssql_conn_id: Airflow connection ID for MSSQL database
- sqlalchemy_scheme: Custom SQLAlchemy scheme (default: 'mssql+pymssql')
- schema: Database schema to use
"""
conn_name_attr = "mssql_conn_id"
default_conn_name = "mssql_default"
conn_type = "mssql"
hook_name = "Microsoft SQL Server"
supports_autocommit = True
DEFAULT_SQLALCHEMY_SCHEME = "mssql+pymssql"
def __init__(self, *args, sqlalchemy_scheme: str | None = None, **kwargs) -> None: ...
def get_conn(self) -> PymssqlConnection:
"""Return pymssql connection object."""
def get_uri(self) -> str:
"""Get database URI with proper SQLAlchemy scheme."""
def get_sqlalchemy_connection(self, connect_kwargs: dict | None = None, engine_kwargs: dict | None = None) -> Any:
"""Get SQLAlchemy connection object."""
def set_autocommit(self, conn: PymssqlConnection, autocommit: bool) -> None:
"""Set autocommit mode on connection."""
def get_autocommit(self, conn: PymssqlConnection) -> bool:
"""Get current autocommit state."""
def get_openlineage_database_info(self, connection) -> DatabaseInfo:
"""Return MSSQL-specific information for OpenLineage."""
def get_openlineage_database_dialect(self, connection) -> str:
"""Return database dialect ('mssql')."""
def get_openlineage_default_schema(self) -> str | None:
"""Return current schema using SCHEMA_NAME()."""
@property
def sqlalchemy_scheme(self) -> str:
"""SQLAlchemy scheme from constructor, connection extras, or default."""
@property
def dialect_name(self) -> str:
"""Return 'mssql'."""
@property
def dialect(self) -> Dialect:
"""Return MsSqlDialect instance."""MSSQL-specific SQL dialect implementation providing advanced database operations like primary key discovery and MERGE statement generation.
class MsSqlDialect(Dialect):
"""Microsoft SQL Server dialect implementation."""
def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None:
"""
Get primary key columns for a table.
Parameters:
- table: Table name to query
- schema: Optional schema name
Returns:
List of primary key column names or None if no primary keys found
"""
def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str:
"""
Generate MERGE statement for upsert operations.
Parameters:
- table: Target table name
- values: Values to insert/update
- target_fields: List of field names
Returns:
MERGE SQL statement string
"""Provider metadata and registration functionality for Airflow provider system integration.
def get_provider_info() -> dict:
"""
Return provider metadata dictionary.
Returns:
Dictionary containing:
- package-name: str
- name: str
- description: str
- integrations: list[dict]
- dialects: list[dict]
- hooks: list[dict]
- connection-types: list[dict]
"""from pymssql import Connection as PymssqlConnection
from airflow.providers.common.sql.dialects.dialect import Dialect
from airflow.providers.openlineage.sqlparser import DatabaseInfo
# Connection configuration for MSSQL
class MSSQLConnection:
"""
MSSQL connection configuration.
Attributes:
- host: SQL Server hostname/IP
- port: SQL Server port (default: 1433)
- schema: Database name
- login: Username
- password: Password
- extra: Additional connection parameters as JSON
"""To use this provider, configure an MSSQL connection in Airflow:
# Connection configuration example
{
"conn_id": "mssql_default",
"conn_type": "mssql",
"host": "localhost",
"port": 1433,
"schema": "my_database",
"login": "username",
"password": "password",
"extra": {
"sqlalchemy_scheme": "mssql+pymssql" # optional
}
}pip install apache-airflow-providers-microsoft-mssql[openlineage])The provider inherits error handling from the base DbApiHook class and pymssql driver. Common exceptions include: