or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-odbc

ODBC database connectivity provider for Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-odbc@4.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-odbc@4.10.0

index.mddocs/

Apache Airflow Providers ODBC

ODBC database connectivity provider for Apache Airflow. This provider enables Airflow to connect to various database systems through ODBC (Open Database Connectivity) drivers, providing seamless integration with legacy systems, proprietary databases, and any ODBC-compliant database management systems.

Package Information

  • Package Name: apache-airflow-providers-odbc
  • Package Type: Airflow Provider
  • Language: Python
  • Installation: pip install apache-airflow-providers-odbc
  • Dependencies: apache-airflow>=2.10.0, apache-airflow-providers-common-sql>=1.20.0, pyodbc>=5.0.0 (5.2.0+ for Python 3.13+)

Core Imports

from airflow.providers.odbc.hooks.odbc import OdbcHook
from airflow.providers.odbc.get_provider_info import get_provider_info

Basic Usage

from airflow.providers.odbc.hooks.odbc import OdbcHook

# Initialize ODBC hook with connection ID
hook = OdbcHook(odbc_conn_id='my_odbc_connection')

# Execute a query
result = hook.get_records("SELECT * FROM users WHERE active = 1")

# Get connection for manual operations
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("INSERT INTO logs (message) VALUES (?)", ("Task completed",))
conn.commit()
cursor.close()
conn.close()

Using with SQL operators:

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# Create table
create_table = SQLExecuteQueryOperator(
    task_id="create_table",
    sql="""
    CREATE TABLE IF NOT EXISTS my_table (
        id INT PRIMARY KEY,
        name VARCHAR(100),
        created_date DATE
    );
    """,
    conn_id="my_odbc_connection",
    autocommit=True,
)

# Insert data with parameters
insert_data = SQLExecuteQueryOperator(
    task_id="insert_data",
    sql="INSERT INTO my_table (id, name, created_date) VALUES (?, ?, ?)",
    parameters=(1, "John Doe", "2025-01-01"),
    conn_id="my_odbc_connection",
    autocommit=True,
)

Architecture

The ODBC provider extends Airflow's common SQL functionality through inheritance:

  • OdbcHook: Extends DbApiHook from airflow.providers.common.sql.hooks.sql
  • Connection Management: Handles ODBC connection strings, DSN configuration, and driver selection
  • SQL Operations: Inherits standard SQL methods (execute, fetch, transaction management)
  • SQLAlchemy Integration: Provides engine creation and connection handling for ORM operations

Capabilities

ODBC Hook

The main OdbcHook class provides comprehensive ODBC database connectivity with flexible configuration options, automatic connection string building, and full SQLAlchemy integration.

class OdbcHook(DbApiHook):
    """Interact with ODBC data sources using pyodbc."""
    
    DEFAULT_SQLALCHEMY_SCHEME: str = "mssql+pyodbc"
    conn_name_attr: str = "odbc_conn_id"
    default_conn_name: str = "odbc_default"
    conn_type: str = "odbc"
    hook_name: str = "ODBC"
    supports_autocommit: bool = True
    supports_executemany: bool = True
    default_driver: str | None = None

    def __init__(
        self,
        *args,
        database: str | None = None,
        driver: str | None = None,
        dsn: str | None = None,
        connect_kwargs: dict | None = None,
        sqlalchemy_scheme: str | None = None,
        **kwargs,
    ) -> None: ...

Connection Properties

Access connection configuration and build ODBC connection strings dynamically.

@property
def database(self) -> str | None:
    """Database provided in init if exists; otherwise, schema from Connection object."""

@property
def driver(self) -> str | None:
    """Driver from init param if given; else try to find one in connection extra."""

@property
def dsn(self) -> str | None:
    """DSN from init param if given; else try to find one in connection extra."""

@property
def odbc_connection_string(self) -> str:
    """ODBC connection string built from connection parameters."""

@property
def connect_kwargs(self) -> dict:
    """Effective kwargs to be passed to pyodbc.connect."""

@property
def sqlalchemy_scheme(self) -> str:
    """SQLAlchemy scheme either from constructor, connection extras or default."""

Connection Management

Create and manage database connections with full pyodbc and SQLAlchemy support.

def get_conn(self) -> Connection:
    """Return pyodbc connection object."""

def get_uri(self) -> str:
    """URI invoked in get_sqlalchemy_engine method."""

def get_sqlalchemy_engine(self, engine_kwargs=None):
    """
    Get an sqlalchemy_engine object.
    
    Parameters:
    - engine_kwargs: Kwargs used in sqlalchemy.create_engine
    
    Returns:
    The created engine.
    """

def get_sqlalchemy_connection(
    self, 
    connect_kwargs: dict | None = None, 
    engine_kwargs: dict | None = None
) -> Any:
    """SQLAlchemy connection object."""

def _make_common_data_structure(
    self, 
    result: Sequence[Row] | Row
) -> list[tuple] | tuple:
    """
    Transform pyodbc.Row objects returned from SQL command into namedtuples.
    
    Parameters:
    - result: Sequence of Row objects or single Row object from query execution
    
    Returns:
    List of namedtuples for multiple rows, single namedtuple for single row
    """

Provider Metadata

Access provider metadata and configuration details through the standalone provider info function.

def get_provider_info() -> dict:
    """
    Return provider configuration metadata.
    
    Returns:
    Dictionary containing:
    - package-name: "apache-airflow-providers-odbc"
    - name: Provider display name
    - description: Provider description
    - integrations: List of integrations with external systems
    - hooks: List of hook classes provided
    - connection-types: List of connection types supported
    """

Connection Configuration

Airflow Connection Setup

Configure ODBC connections in Airflow UI or programmatically:

  • Connection Type: odbc
  • Host: Database server hostname or IP
  • Schema: Database name (optional, can be specified in hook)
  • Login: Username for authentication
  • Password: Password for authentication
  • Port: Database port (optional)
  • Extra: JSON with additional ODBC parameters

Extra Parameters

Connection extra field supports:

{
  "driver": "ODBC Driver 17 for SQL Server",
  "dsn": "MyDataSourceName",
  "connect_kwargs": {
    "timeout": 30,
    "attrs_before": {"1": "2"}
  },
  "sqlalchemy_scheme": "mssql+pyodbc",
  "ApplicationIntent": "ReadOnly",
  "Encrypt": "yes"
}

Driver Configuration

Configure ODBC drivers through multiple methods:

  1. Constructor parameter: OdbcHook(driver="ODBC Driver 17 for SQL Server")
  2. Connection extra: Set driver in connection extra with allow_driver_in_extra = True in airflow.cfg
  3. Default driver: Patch OdbcHook.default_driver in local_settings.py
  4. Hook parameters: Use hook_params with SQL operators

DSN (Data Source Name) Configuration

Use pre-configured DSN entries:

# Via constructor
hook = OdbcHook(dsn="MyDSN", odbc_conn_id="my_conn")

# Via connection extra
# Set "dsn": "MyDSN" in connection extra field

Error Handling

The provider raises specific exceptions for various error conditions:

  • RuntimeError: When Airflow version is incompatible (< 2.10.0)
  • Configuration warnings: When driver is specified in connection extra without proper airflow.cfg setting
  • SQLAlchemy validation: Prevents injection attacks in scheme configuration

Types

from pyodbc import Connection, Row
from typing import Any, Sequence
from collections import namedtuple

# pyodbc types (external dependency)
Connection: type  # pyodbc.Connection object
Row: type  # pyodbc.Row object for query results

# Python standard library types
namedtuple: type  # collections.namedtuple for result transformation

Integration Patterns

With Airflow DAGs

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta

with DAG(
    'odbc_example',
    start_date=datetime(2025, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
) as dag:
    
    task = SQLExecuteQueryOperator(
        task_id='query_data',
        sql='SELECT COUNT(*) FROM users WHERE created_date = ?',
        parameters=('{{ ds }}',),
        conn_id='my_odbc_connection',
    )

With TaskFlow API

from airflow.decorators import dag, task
from airflow.providers.odbc.hooks.odbc import OdbcHook

@dag(start_date=datetime(2025, 1, 1), schedule=None, catchup=False)
def odbc_taskflow_example():
    
    @task
    def extract_data():
        hook = OdbcHook(odbc_conn_id='my_odbc_connection')
        records = hook.get_records("SELECT * FROM source_table")
        return records
    
    @task
    def process_data(records):
        # Process the data
        return len(records)
    
    data = extract_data()
    result = process_data(data)

dag_instance = odbc_taskflow_example()

With Custom Operators

from airflow.providers.odbc.hooks.odbc import OdbcHook
from airflow.models import BaseOperator

class CustomOdbcOperator(BaseOperator):
    def __init__(self, odbc_conn_id, sql, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.odbc_conn_id = odbc_conn_id
        self.sql = sql
    
    def execute(self, context):
        hook = OdbcHook(odbc_conn_id=self.odbc_conn_id)
        return hook.get_records(self.sql)