Apache Airflow provider package for Microsoft SQL Server (MSSQL) database integration with hooks, dialects, and connection management.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-microsoft-mssql@4.3.00
# Apache Airflow Microsoft SQL Server Provider
1
2
Apache Airflow provider package for Microsoft SQL Server (MSSQL) database integration. This provider enables Airflow workflows to connect to and interact with Microsoft SQL Server databases through a standardized hook interface, custom SQL dialect, and connection management.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-microsoft-mssql
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-microsoft-mssql`
9
- **Requirements**: Apache Airflow ≥2.10.0, Python ≥3.10
10
11
## Core Imports
12
13
```python
14
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
15
from airflow.providers.microsoft.mssql.dialects.mssql import MsSqlDialect
16
```
17
18
## Basic Usage
19
20
```python
21
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
22
23
# Create connection to MSSQL database
24
hook = MsSqlHook(mssql_conn_id='my_mssql_connection')
25
26
# Execute SQL query
27
results = hook.get_records("SELECT * FROM my_table WHERE id > %s", parameters=[100])
28
29
# Get connection for manual operations
30
conn = hook.get_conn()
31
cursor = conn.cursor()
32
cursor.execute("SELECT COUNT(*) FROM my_table")
33
count = cursor.fetchone()[0]
34
cursor.close()
35
conn.close()
36
37
# Use with SQLAlchemy
38
sqlalchemy_conn = hook.get_sqlalchemy_connection()
39
with sqlalchemy_conn:
40
result = sqlalchemy_conn.execute("SELECT * FROM my_table")
41
rows = result.fetchall()
42
```
43
44
## Architecture
45
46
This provider follows Apache Airflow's provider architecture pattern:
47
48
- **Hook (MsSqlHook)**: Primary interface for database operations, extending DbApiHook with MSSQL-specific functionality
49
- **Dialect (MsSqlDialect)**: MSSQL-specific SQL dialect implementation for advanced operations like primary key discovery and MERGE statements
50
- **Provider Registration**: Integration with Airflow's provider system through get_provider_info and entry points
51
52
The provider integrates with Airflow's connection management system and supports both direct pymssql connections and SQLAlchemy engine connections for flexibility.
53
54
## Capabilities
55
56
### Database Connection and Operations
57
58
Core MSSQL database connectivity and operations through the MsSqlHook class, which extends Airflow's standard SQL hook with MSSQL-specific features.
59
60
```python { .api }
61
class MsSqlHook(DbApiHook):
62
"""
63
Interact with Microsoft SQL Server.
64
65
Parameters:
66
- mssql_conn_id: Airflow connection ID for MSSQL database
67
- sqlalchemy_scheme: Custom SQLAlchemy scheme (default: 'mssql+pymssql')
68
- schema: Database schema to use
69
"""
70
71
conn_name_attr = "mssql_conn_id"
72
default_conn_name = "mssql_default"
73
conn_type = "mssql"
74
hook_name = "Microsoft SQL Server"
75
supports_autocommit = True
76
DEFAULT_SQLALCHEMY_SCHEME = "mssql+pymssql"
77
78
def __init__(self, *args, sqlalchemy_scheme: str | None = None, **kwargs) -> None: ...
79
80
def get_conn(self) -> PymssqlConnection:
81
"""Return pymssql connection object."""
82
83
def get_uri(self) -> str:
84
"""Get database URI with proper SQLAlchemy scheme."""
85
86
def get_sqlalchemy_connection(self, connect_kwargs: dict | None = None, engine_kwargs: dict | None = None) -> Any:
87
"""Get SQLAlchemy connection object."""
88
89
def set_autocommit(self, conn: PymssqlConnection, autocommit: bool) -> None:
90
"""Set autocommit mode on connection."""
91
92
def get_autocommit(self, conn: PymssqlConnection) -> bool:
93
"""Get current autocommit state."""
94
95
def get_openlineage_database_info(self, connection) -> DatabaseInfo:
96
"""Return MSSQL-specific information for OpenLineage."""
97
98
def get_openlineage_database_dialect(self, connection) -> str:
99
"""Return database dialect ('mssql')."""
100
101
def get_openlineage_default_schema(self) -> str | None:
102
"""Return current schema using SCHEMA_NAME()."""
103
104
@property
105
def sqlalchemy_scheme(self) -> str:
106
"""SQLAlchemy scheme from constructor, connection extras, or default."""
107
108
@property
109
def dialect_name(self) -> str:
110
"""Return 'mssql'."""
111
112
@property
113
def dialect(self) -> Dialect:
114
"""Return MsSqlDialect instance."""
115
```
116
117
### SQL Dialect Operations
118
119
MSSQL-specific SQL dialect implementation providing advanced database operations like primary key discovery and MERGE statement generation.
120
121
```python { .api }
122
class MsSqlDialect(Dialect):
123
"""Microsoft SQL Server dialect implementation."""
124
125
def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None:
126
"""
127
Get primary key columns for a table.
128
129
Parameters:
130
- table: Table name to query
131
- schema: Optional schema name
132
133
Returns:
134
List of primary key column names or None if no primary keys found
135
"""
136
137
def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str:
138
"""
139
Generate MERGE statement for upsert operations.
140
141
Parameters:
142
- table: Target table name
143
- values: Values to insert/update
144
- target_fields: List of field names
145
146
Returns:
147
MERGE SQL statement string
148
"""
149
```
150
151
### Provider Registration
152
153
Provider metadata and registration functionality for Airflow provider system integration.
154
155
```python { .api }
156
def get_provider_info() -> dict:
157
"""
158
Return provider metadata dictionary.
159
160
Returns:
161
Dictionary containing:
162
- package-name: str
163
- name: str
164
- description: str
165
- integrations: list[dict]
166
- dialects: list[dict]
167
- hooks: list[dict]
168
- connection-types: list[dict]
169
"""
170
```
171
172
## Types
173
174
```python { .api }
175
from pymssql import Connection as PymssqlConnection
176
from airflow.providers.common.sql.dialects.dialect import Dialect
177
from airflow.providers.openlineage.sqlparser import DatabaseInfo
178
179
# Connection configuration for MSSQL
180
class MSSQLConnection:
181
"""
182
MSSQL connection configuration.
183
184
Attributes:
185
- host: SQL Server hostname/IP
186
- port: SQL Server port (default: 1433)
187
- schema: Database name
188
- login: Username
189
- password: Password
190
- extra: Additional connection parameters as JSON
191
"""
192
```
193
194
## Connection Configuration
195
196
To use this provider, configure an MSSQL connection in Airflow:
197
198
```python
199
# Connection configuration example
200
{
201
"conn_id": "mssql_default",
202
"conn_type": "mssql",
203
"host": "localhost",
204
"port": 1433,
205
"schema": "my_database",
206
"login": "username",
207
"password": "password",
208
"extra": {
209
"sqlalchemy_scheme": "mssql+pymssql" # optional
210
}
211
}
212
```
213
214
## Dependencies
215
216
- **apache-airflow**: ≥2.10.0
217
- **apache-airflow-providers-common-sql**: ≥1.23.0
218
- **pymssql**: ≥2.3.5
219
- **methodtools**: ≥0.4.7
220
221
### Optional Dependencies
222
223
- **apache-airflow-providers-openlineage**: For data lineage tracking (install with `pip install apache-airflow-providers-microsoft-mssql[openlineage]`)
224
225
## Error Handling
226
227
The provider inherits error handling from the base DbApiHook class and pymssql driver. Common exceptions include:
228
229
- **pymssql.Error**: Base exception for pymssql-related errors
230
- **sqlalchemy.exc.SQLAlchemyError**: SQLAlchemy connection and query errors
231
- **airflow.exceptions.AirflowException**: Airflow-specific errors for connection and configuration issues