0
# Apache Airflow Providers ODBC
1
2
ODBC database connectivity provider for Apache Airflow. This provider enables Airflow to connect to various database systems through ODBC (Open Database Connectivity) drivers, providing seamless integration with legacy systems, proprietary databases, and any ODBC-compliant database management systems.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-odbc
7
- **Package Type**: Airflow Provider
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-odbc`
10
- **Dependencies**: `apache-airflow>=2.10.0`, `apache-airflow-providers-common-sql>=1.20.0`, `pyodbc>=5.0.0` (5.2.0+ for Python 3.13+)
11
12
## Core Imports
13
14
```python
15
from airflow.providers.odbc.hooks.odbc import OdbcHook
16
from airflow.providers.odbc.get_provider_info import get_provider_info
17
```
18
19
## Basic Usage
20
21
```python
22
from airflow.providers.odbc.hooks.odbc import OdbcHook
23
24
# Initialize ODBC hook with connection ID
25
hook = OdbcHook(odbc_conn_id='my_odbc_connection')
26
27
# Execute a query
28
result = hook.get_records("SELECT * FROM users WHERE active = 1")
29
30
# Get connection for manual operations
31
conn = hook.get_conn()
32
cursor = conn.cursor()
33
cursor.execute("INSERT INTO logs (message) VALUES (?)", ("Task completed",))
34
conn.commit()
35
cursor.close()
36
conn.close()
37
```
38
39
Using with SQL operators:
40
41
```python
42
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
43
44
# Create table
45
create_table = SQLExecuteQueryOperator(
46
task_id="create_table",
47
sql="""
48
CREATE TABLE IF NOT EXISTS my_table (
49
id INT PRIMARY KEY,
50
name VARCHAR(100),
51
created_date DATE
52
);
53
""",
54
conn_id="my_odbc_connection",
55
autocommit=True,
56
)
57
58
# Insert data with parameters
59
insert_data = SQLExecuteQueryOperator(
60
task_id="insert_data",
61
sql="INSERT INTO my_table (id, name, created_date) VALUES (?, ?, ?)",
62
parameters=(1, "John Doe", "2025-01-01"),
63
conn_id="my_odbc_connection",
64
autocommit=True,
65
)
66
```
67
68
## Architecture
69
70
The ODBC provider extends Airflow's common SQL functionality through inheritance:
71
72
- **OdbcHook**: Extends `DbApiHook` from `airflow.providers.common.sql.hooks.sql`
73
- **Connection Management**: Handles ODBC connection strings, DSN configuration, and driver selection
74
- **SQL Operations**: Inherits standard SQL methods (execute, fetch, transaction management)
75
- **SQLAlchemy Integration**: Provides engine creation and connection handling for ORM operations
76
77
## Capabilities
78
79
### ODBC Hook
80
81
The main `OdbcHook` class provides comprehensive ODBC database connectivity with flexible configuration options, automatic connection string building, and full SQLAlchemy integration.
82
83
```python { .api }
84
class OdbcHook(DbApiHook):
85
"""Interact with ODBC data sources using pyodbc."""
86
87
DEFAULT_SQLALCHEMY_SCHEME: str = "mssql+pyodbc"
88
conn_name_attr: str = "odbc_conn_id"
89
default_conn_name: str = "odbc_default"
90
conn_type: str = "odbc"
91
hook_name: str = "ODBC"
92
supports_autocommit: bool = True
93
supports_executemany: bool = True
94
default_driver: str | None = None
95
96
def __init__(
97
self,
98
*args,
99
database: str | None = None,
100
driver: str | None = None,
101
dsn: str | None = None,
102
connect_kwargs: dict | None = None,
103
sqlalchemy_scheme: str | None = None,
104
**kwargs,
105
) -> None: ...
106
```
107
108
### Connection Properties
109
110
Access connection configuration and build ODBC connection strings dynamically.
111
112
```python { .api }
113
@property
114
def database(self) -> str | None:
115
"""Database provided in init if exists; otherwise, schema from Connection object."""
116
117
@property
118
def driver(self) -> str | None:
119
"""Driver from init param if given; else try to find one in connection extra."""
120
121
@property
122
def dsn(self) -> str | None:
123
"""DSN from init param if given; else try to find one in connection extra."""
124
125
@property
126
def odbc_connection_string(self) -> str:
127
"""ODBC connection string built from connection parameters."""
128
129
@property
130
def connect_kwargs(self) -> dict:
131
"""Effective kwargs to be passed to pyodbc.connect."""
132
133
@property
134
def sqlalchemy_scheme(self) -> str:
135
"""SQLAlchemy scheme either from constructor, connection extras or default."""
136
```
137
138
### Connection Management
139
140
Create and manage database connections with full pyodbc and SQLAlchemy support.
141
142
```python { .api }
143
def get_conn(self) -> Connection:
144
"""Return pyodbc connection object."""
145
146
def get_uri(self) -> str:
147
"""URI invoked in get_sqlalchemy_engine method."""
148
149
def get_sqlalchemy_engine(self, engine_kwargs=None):
150
"""
151
Get an sqlalchemy_engine object.
152
153
Parameters:
154
- engine_kwargs: Kwargs used in sqlalchemy.create_engine
155
156
Returns:
157
The created engine.
158
"""
159
160
def get_sqlalchemy_connection(
161
self,
162
connect_kwargs: dict | None = None,
163
engine_kwargs: dict | None = None
164
) -> Any:
165
"""SQLAlchemy connection object."""
166
167
def _make_common_data_structure(
168
self,
169
result: Sequence[Row] | Row
170
) -> list[tuple] | tuple:
171
"""
172
Transform pyodbc.Row objects returned from SQL command into namedtuples.
173
174
Parameters:
175
- result: Sequence of Row objects or single Row object from query execution
176
177
Returns:
178
List of namedtuples for multiple rows, single namedtuple for single row
179
"""
180
```
181
182
### Provider Metadata
183
184
Access provider metadata and configuration details through the standalone provider info function.
185
186
```python { .api }
187
def get_provider_info() -> dict:
188
"""
189
Return provider configuration metadata.
190
191
Returns:
192
Dictionary containing:
193
- package-name: "apache-airflow-providers-odbc"
194
- name: Provider display name
195
- description: Provider description
196
- integrations: List of integrations with external systems
197
- hooks: List of hook classes provided
198
- connection-types: List of connection types supported
199
"""
200
```
201
202
## Connection Configuration
203
204
### Airflow Connection Setup
205
206
Configure ODBC connections in Airflow UI or programmatically:
207
208
- **Connection Type**: `odbc`
209
- **Host**: Database server hostname or IP
210
- **Schema**: Database name (optional, can be specified in hook)
211
- **Login**: Username for authentication
212
- **Password**: Password for authentication
213
- **Port**: Database port (optional)
214
- **Extra**: JSON with additional ODBC parameters
215
216
### Extra Parameters
217
218
Connection `extra` field supports:
219
220
```json
221
{
222
"driver": "ODBC Driver 17 for SQL Server",
223
"dsn": "MyDataSourceName",
224
"connect_kwargs": {
225
"timeout": 30,
226
"attrs_before": {"1": "2"}
227
},
228
"sqlalchemy_scheme": "mssql+pyodbc",
229
"ApplicationIntent": "ReadOnly",
230
"Encrypt": "yes"
231
}
232
```
233
234
### Driver Configuration
235
236
Configure ODBC drivers through multiple methods:
237
238
1. **Constructor parameter**: `OdbcHook(driver="ODBC Driver 17 for SQL Server")`
239
2. **Connection extra**: Set `driver` in connection extra with `allow_driver_in_extra = True` in airflow.cfg
240
3. **Default driver**: Patch `OdbcHook.default_driver` in local_settings.py
241
4. **Hook parameters**: Use `hook_params` with SQL operators
242
243
### DSN (Data Source Name) Configuration
244
245
Use pre-configured DSN entries:
246
247
```python
248
# Via constructor
249
hook = OdbcHook(dsn="MyDSN", odbc_conn_id="my_conn")
250
251
# Via connection extra
252
# Set "dsn": "MyDSN" in connection extra field
253
```
254
255
## Error Handling
256
257
The provider raises specific exceptions for various error conditions:
258
259
- **RuntimeError**: When Airflow version is incompatible (< 2.10.0)
260
- **Configuration warnings**: When driver is specified in connection extra without proper airflow.cfg setting
261
- **SQLAlchemy validation**: Prevents injection attacks in scheme configuration
262
263
## Types
264
265
```python { .api }
266
from pyodbc import Connection, Row
267
from typing import Any, Sequence
268
from collections import namedtuple
269
270
# pyodbc types (external dependency)
271
Connection: type # pyodbc.Connection object
272
Row: type # pyodbc.Row object for query results
273
274
# Python standard library types
275
namedtuple: type # collections.namedtuple for result transformation
276
```
277
278
## Integration Patterns
279
280
### With Airflow DAGs
281
282
```python
283
from airflow import DAG
284
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
285
from datetime import datetime, timedelta
286
287
with DAG(
288
'odbc_example',
289
start_date=datetime(2025, 1, 1),
290
schedule_interval=timedelta(days=1),
291
catchup=False,
292
) as dag:
293
294
task = SQLExecuteQueryOperator(
295
task_id='query_data',
296
sql='SELECT COUNT(*) FROM users WHERE created_date = ?',
297
parameters=('{{ ds }}',),
298
conn_id='my_odbc_connection',
299
)
300
```
301
302
### With TaskFlow API
303
304
```python
305
from airflow.decorators import dag, task
306
from airflow.providers.odbc.hooks.odbc import OdbcHook
307
308
@dag(start_date=datetime(2025, 1, 1), schedule=None, catchup=False)
309
def odbc_taskflow_example():
310
311
@task
312
def extract_data():
313
hook = OdbcHook(odbc_conn_id='my_odbc_connection')
314
records = hook.get_records("SELECT * FROM source_table")
315
return records
316
317
@task
318
def process_data(records):
319
# Process the data
320
return len(records)
321
322
data = extract_data()
323
result = process_data(data)
324
325
dag_instance = odbc_taskflow_example()
326
```
327
328
### With Custom Operators
329
330
```python
331
from airflow.providers.odbc.hooks.odbc import OdbcHook
332
from airflow.models import BaseOperator
333
334
class CustomOdbcOperator(BaseOperator):
335
def __init__(self, odbc_conn_id, sql, *args, **kwargs):
336
super().__init__(*args, **kwargs)
337
self.odbc_conn_id = odbc_conn_id
338
self.sql = sql
339
340
def execute(self, context):
341
hook = OdbcHook(odbc_conn_id=self.odbc_conn_id)
342
return hook.get_records(self.sql)
343
```