ODBC database connectivity provider for Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-providers-odbc@4.10.0ODBC database connectivity provider for Apache Airflow. This provider enables Airflow to connect to various database systems through ODBC (Open Database Connectivity) drivers, providing seamless integration with legacy systems, proprietary databases, and any ODBC-compliant database management systems.
pip install apache-airflow-providers-odbcapache-airflow>=2.10.0, apache-airflow-providers-common-sql>=1.20.0, pyodbc>=5.0.0 (5.2.0+ for Python 3.13+)from airflow.providers.odbc.hooks.odbc import OdbcHook
from airflow.providers.odbc.get_provider_info import get_provider_infofrom airflow.providers.odbc.hooks.odbc import OdbcHook
# Initialize ODBC hook with connection ID
hook = OdbcHook(odbc_conn_id='my_odbc_connection')
# Execute a query
result = hook.get_records("SELECT * FROM users WHERE active = 1")
# Get connection for manual operations
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("INSERT INTO logs (message) VALUES (?)", ("Task completed",))
conn.commit()
cursor.close()
conn.close()Using with SQL operators:
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# Create table
create_table = SQLExecuteQueryOperator(
task_id="create_table",
sql="""
CREATE TABLE IF NOT EXISTS my_table (
id INT PRIMARY KEY,
name VARCHAR(100),
created_date DATE
);
""",
conn_id="my_odbc_connection",
autocommit=True,
)
# Insert data with parameters
insert_data = SQLExecuteQueryOperator(
task_id="insert_data",
sql="INSERT INTO my_table (id, name, created_date) VALUES (?, ?, ?)",
parameters=(1, "John Doe", "2025-01-01"),
conn_id="my_odbc_connection",
autocommit=True,
)The ODBC provider extends Airflow's common SQL functionality through inheritance:
DbApiHook from airflow.providers.common.sql.hooks.sqlThe main OdbcHook class provides comprehensive ODBC database connectivity with flexible configuration options, automatic connection string building, and full SQLAlchemy integration.
class OdbcHook(DbApiHook):
"""Interact with ODBC data sources using pyodbc."""
DEFAULT_SQLALCHEMY_SCHEME: str = "mssql+pyodbc"
conn_name_attr: str = "odbc_conn_id"
default_conn_name: str = "odbc_default"
conn_type: str = "odbc"
hook_name: str = "ODBC"
supports_autocommit: bool = True
supports_executemany: bool = True
default_driver: str | None = None
def __init__(
self,
*args,
database: str | None = None,
driver: str | None = None,
dsn: str | None = None,
connect_kwargs: dict | None = None,
sqlalchemy_scheme: str | None = None,
**kwargs,
) -> None: ...Access connection configuration and build ODBC connection strings dynamically.
@property
def database(self) -> str | None:
"""Database provided in init if exists; otherwise, schema from Connection object."""
@property
def driver(self) -> str | None:
"""Driver from init param if given; else try to find one in connection extra."""
@property
def dsn(self) -> str | None:
"""DSN from init param if given; else try to find one in connection extra."""
@property
def odbc_connection_string(self) -> str:
"""ODBC connection string built from connection parameters."""
@property
def connect_kwargs(self) -> dict:
"""Effective kwargs to be passed to pyodbc.connect."""
@property
def sqlalchemy_scheme(self) -> str:
"""SQLAlchemy scheme either from constructor, connection extras or default."""Create and manage database connections with full pyodbc and SQLAlchemy support.
def get_conn(self) -> Connection:
"""Return pyodbc connection object."""
def get_uri(self) -> str:
"""URI invoked in get_sqlalchemy_engine method."""
def get_sqlalchemy_engine(self, engine_kwargs=None):
"""
Get an sqlalchemy_engine object.
Parameters:
- engine_kwargs: Kwargs used in sqlalchemy.create_engine
Returns:
The created engine.
"""
def get_sqlalchemy_connection(
self,
connect_kwargs: dict | None = None,
engine_kwargs: dict | None = None
) -> Any:
"""SQLAlchemy connection object."""
def _make_common_data_structure(
self,
result: Sequence[Row] | Row
) -> list[tuple] | tuple:
"""
Transform pyodbc.Row objects returned from SQL command into namedtuples.
Parameters:
- result: Sequence of Row objects or single Row object from query execution
Returns:
List of namedtuples for multiple rows, single namedtuple for single row
"""Access provider metadata and configuration details through the standalone provider info function.
def get_provider_info() -> dict:
"""
Return provider configuration metadata.
Returns:
Dictionary containing:
- package-name: "apache-airflow-providers-odbc"
- name: Provider display name
- description: Provider description
- integrations: List of integrations with external systems
- hooks: List of hook classes provided
- connection-types: List of connection types supported
"""Configure ODBC connections in Airflow UI or programmatically:
odbcConnection extra field supports:
{
"driver": "ODBC Driver 17 for SQL Server",
"dsn": "MyDataSourceName",
"connect_kwargs": {
"timeout": 30,
"attrs_before": {"1": "2"}
},
"sqlalchemy_scheme": "mssql+pyodbc",
"ApplicationIntent": "ReadOnly",
"Encrypt": "yes"
}Configure ODBC drivers through multiple methods:
OdbcHook(driver="ODBC Driver 17 for SQL Server")driver in connection extra with allow_driver_in_extra = True in airflow.cfgOdbcHook.default_driver in local_settings.pyhook_params with SQL operatorsUse pre-configured DSN entries:
# Via constructor
hook = OdbcHook(dsn="MyDSN", odbc_conn_id="my_conn")
# Via connection extra
# Set "dsn": "MyDSN" in connection extra fieldThe provider raises specific exceptions for various error conditions:
from pyodbc import Connection, Row
from typing import Any, Sequence
from collections import namedtuple
# pyodbc types (external dependency)
Connection: type # pyodbc.Connection object
Row: type # pyodbc.Row object for query results
# Python standard library types
namedtuple: type # collections.namedtuple for result transformationfrom airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta
with DAG(
'odbc_example',
start_date=datetime(2025, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
task = SQLExecuteQueryOperator(
task_id='query_data',
sql='SELECT COUNT(*) FROM users WHERE created_date = ?',
parameters=('{{ ds }}',),
conn_id='my_odbc_connection',
)from airflow.decorators import dag, task
from airflow.providers.odbc.hooks.odbc import OdbcHook
@dag(start_date=datetime(2025, 1, 1), schedule=None, catchup=False)
def odbc_taskflow_example():
@task
def extract_data():
hook = OdbcHook(odbc_conn_id='my_odbc_connection')
records = hook.get_records("SELECT * FROM source_table")
return records
@task
def process_data(records):
# Process the data
return len(records)
data = extract_data()
result = process_data(data)
dag_instance = odbc_taskflow_example()from airflow.providers.odbc.hooks.odbc import OdbcHook
from airflow.models import BaseOperator
class CustomOdbcOperator(BaseOperator):
def __init__(self, odbc_conn_id, sql, *args, **kwargs):
super().__init__(*args, **kwargs)
self.odbc_conn_id = odbc_conn_id
self.sql = sql
def execute(self, context):
hook = OdbcHook(odbc_conn_id=self.odbc_conn_id)
return hook.get_records(self.sql)