or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-odbc

ODBC database connectivity provider for Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-odbc@4.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-odbc@4.10.0

0

# Apache Airflow Providers ODBC

1

2

ODBC database connectivity provider for Apache Airflow. This provider enables Airflow to connect to various database systems through ODBC (Open Database Connectivity) drivers, providing seamless integration with legacy systems, proprietary databases, and any ODBC-compliant database management systems.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-odbc

7

- **Package Type**: Airflow Provider

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-odbc`

10

- **Dependencies**: `apache-airflow>=2.10.0`, `apache-airflow-providers-common-sql>=1.20.0`, `pyodbc>=5.0.0` (5.2.0+ for Python 3.13+)

11

12

## Core Imports

13

14

```python

15

from airflow.providers.odbc.hooks.odbc import OdbcHook

16

from airflow.providers.odbc.get_provider_info import get_provider_info

17

```

18

19

## Basic Usage

20

21

```python

22

from airflow.providers.odbc.hooks.odbc import OdbcHook

23

24

# Initialize ODBC hook with connection ID

25

hook = OdbcHook(odbc_conn_id='my_odbc_connection')

26

27

# Execute a query

28

result = hook.get_records("SELECT * FROM users WHERE active = 1")

29

30

# Get connection for manual operations

31

conn = hook.get_conn()

32

cursor = conn.cursor()

33

cursor.execute("INSERT INTO logs (message) VALUES (?)", ("Task completed",))

34

conn.commit()

35

cursor.close()

36

conn.close()

37

```

38

39

Using with SQL operators:

40

41

```python

42

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

43

44

# Create table

45

create_table = SQLExecuteQueryOperator(

46

task_id="create_table",

47

sql="""

48

CREATE TABLE IF NOT EXISTS my_table (

49

id INT PRIMARY KEY,

50

name VARCHAR(100),

51

created_date DATE

52

);

53

""",

54

conn_id="my_odbc_connection",

55

autocommit=True,

56

)

57

58

# Insert data with parameters

59

insert_data = SQLExecuteQueryOperator(

60

task_id="insert_data",

61

sql="INSERT INTO my_table (id, name, created_date) VALUES (?, ?, ?)",

62

parameters=(1, "John Doe", "2025-01-01"),

63

conn_id="my_odbc_connection",

64

autocommit=True,

65

)

66

```

67

68

## Architecture

69

70

The ODBC provider extends Airflow's common SQL functionality through inheritance:

71

72

- **OdbcHook**: Extends `DbApiHook` from `airflow.providers.common.sql.hooks.sql`

73

- **Connection Management**: Handles ODBC connection strings, DSN configuration, and driver selection

74

- **SQL Operations**: Inherits standard SQL methods (execute, fetch, transaction management)

75

- **SQLAlchemy Integration**: Provides engine creation and connection handling for ORM operations

76

77

## Capabilities

78

79

### ODBC Hook

80

81

The main `OdbcHook` class provides comprehensive ODBC database connectivity with flexible configuration options, automatic connection string building, and full SQLAlchemy integration.

82

83

```python { .api }

84

class OdbcHook(DbApiHook):

85

"""Interact with ODBC data sources using pyodbc."""

86

87

DEFAULT_SQLALCHEMY_SCHEME: str = "mssql+pyodbc"

88

conn_name_attr: str = "odbc_conn_id"

89

default_conn_name: str = "odbc_default"

90

conn_type: str = "odbc"

91

hook_name: str = "ODBC"

92

supports_autocommit: bool = True

93

supports_executemany: bool = True

94

default_driver: str | None = None

95

96

def __init__(

97

self,

98

*args,

99

database: str | None = None,

100

driver: str | None = None,

101

dsn: str | None = None,

102

connect_kwargs: dict | None = None,

103

sqlalchemy_scheme: str | None = None,

104

**kwargs,

105

) -> None: ...

106

```

107

108

### Connection Properties

109

110

Access connection configuration and build ODBC connection strings dynamically.

111

112

```python { .api }

113

@property

114

def database(self) -> str | None:

115

"""Database provided in init if exists; otherwise, schema from Connection object."""

116

117

@property

118

def driver(self) -> str | None:

119

"""Driver from init param if given; else try to find one in connection extra."""

120

121

@property

122

def dsn(self) -> str | None:

123

"""DSN from init param if given; else try to find one in connection extra."""

124

125

@property

126

def odbc_connection_string(self) -> str:

127

"""ODBC connection string built from connection parameters."""

128

129

@property

130

def connect_kwargs(self) -> dict:

131

"""Effective kwargs to be passed to pyodbc.connect."""

132

133

@property

134

def sqlalchemy_scheme(self) -> str:

135

"""SQLAlchemy scheme either from constructor, connection extras or default."""

136

```

137

138

### Connection Management

139

140

Create and manage database connections with full pyodbc and SQLAlchemy support.

141

142

```python { .api }

143

def get_conn(self) -> Connection:

144

"""Return pyodbc connection object."""

145

146

def get_uri(self) -> str:

147

"""URI invoked in get_sqlalchemy_engine method."""

148

149

def get_sqlalchemy_engine(self, engine_kwargs=None):

150

"""

151

Get an sqlalchemy_engine object.

152

153

Parameters:

154

- engine_kwargs: Kwargs used in sqlalchemy.create_engine

155

156

Returns:

157

The created engine.

158

"""

159

160

def get_sqlalchemy_connection(

161

self,

162

connect_kwargs: dict | None = None,

163

engine_kwargs: dict | None = None

164

) -> Any:

165

"""SQLAlchemy connection object."""

166

167

def _make_common_data_structure(

168

self,

169

result: Sequence[Row] | Row

170

) -> list[tuple] | tuple:

171

"""

172

Transform pyodbc.Row objects returned from SQL command into namedtuples.

173

174

Parameters:

175

- result: Sequence of Row objects or single Row object from query execution

176

177

Returns:

178

List of namedtuples for multiple rows, single namedtuple for single row

179

"""

180

```

181

182

### Provider Metadata

183

184

Access provider metadata and configuration details through the standalone provider info function.

185

186

```python { .api }

187

def get_provider_info() -> dict:

188

"""

189

Return provider configuration metadata.

190

191

Returns:

192

Dictionary containing:

193

- package-name: "apache-airflow-providers-odbc"

194

- name: Provider display name

195

- description: Provider description

196

- integrations: List of integrations with external systems

197

- hooks: List of hook classes provided

198

- connection-types: List of connection types supported

199

"""

200

```

201

202

## Connection Configuration

203

204

### Airflow Connection Setup

205

206

Configure ODBC connections in Airflow UI or programmatically:

207

208

- **Connection Type**: `odbc`

209

- **Host**: Database server hostname or IP

210

- **Schema**: Database name (optional, can be specified in hook)

211

- **Login**: Username for authentication

212

- **Password**: Password for authentication

213

- **Port**: Database port (optional)

214

- **Extra**: JSON with additional ODBC parameters

215

216

### Extra Parameters

217

218

Connection `extra` field supports:

219

220

```json

221

{

222

"driver": "ODBC Driver 17 for SQL Server",

223

"dsn": "MyDataSourceName",

224

"connect_kwargs": {

225

"timeout": 30,

226

"attrs_before": {"1": "2"}

227

},

228

"sqlalchemy_scheme": "mssql+pyodbc",

229

"ApplicationIntent": "ReadOnly",

230

"Encrypt": "yes"

231

}

232

```

233

234

### Driver Configuration

235

236

Configure ODBC drivers through multiple methods:

237

238

1. **Constructor parameter**: `OdbcHook(driver="ODBC Driver 17 for SQL Server")`

239

2. **Connection extra**: Set `driver` in connection extra with `allow_driver_in_extra = True` in airflow.cfg

240

3. **Default driver**: Patch `OdbcHook.default_driver` in local_settings.py

241

4. **Hook parameters**: Use `hook_params` with SQL operators

242

243

### DSN (Data Source Name) Configuration

244

245

Use pre-configured DSN entries:

246

247

```python

248

# Via constructor

249

hook = OdbcHook(dsn="MyDSN", odbc_conn_id="my_conn")

250

251

# Via connection extra

252

# Set "dsn": "MyDSN" in connection extra field

253

```

254

255

## Error Handling

256

257

The provider raises specific exceptions for various error conditions:

258

259

- **RuntimeError**: When Airflow version is incompatible (< 2.10.0)

260

- **Configuration warnings**: When driver is specified in connection extra without proper airflow.cfg setting

261

- **SQLAlchemy validation**: Prevents injection attacks in scheme configuration

262

263

## Types

264

265

```python { .api }

266

from pyodbc import Connection, Row

267

from typing import Any, Sequence

268

from collections import namedtuple

269

270

# pyodbc types (external dependency)

271

Connection: type # pyodbc.Connection object

272

Row: type # pyodbc.Row object for query results

273

274

# Python standard library types

275

namedtuple: type # collections.namedtuple for result transformation

276

```

277

278

## Integration Patterns

279

280

### With Airflow DAGs

281

282

```python

283

from airflow import DAG

284

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

285

from datetime import datetime, timedelta

286

287

with DAG(

288

'odbc_example',

289

start_date=datetime(2025, 1, 1),

290

schedule_interval=timedelta(days=1),

291

catchup=False,

292

) as dag:

293

294

task = SQLExecuteQueryOperator(

295

task_id='query_data',

296

sql='SELECT COUNT(*) FROM users WHERE created_date = ?',

297

parameters=('{{ ds }}',),

298

conn_id='my_odbc_connection',

299

)

300

```

301

302

### With TaskFlow API

303

304

```python

305

from airflow.decorators import dag, task

306

from airflow.providers.odbc.hooks.odbc import OdbcHook

307

308

@dag(start_date=datetime(2025, 1, 1), schedule=None, catchup=False)

309

def odbc_taskflow_example():

310

311

@task

312

def extract_data():

313

hook = OdbcHook(odbc_conn_id='my_odbc_connection')

314

records = hook.get_records("SELECT * FROM source_table")

315

return records

316

317

@task

318

def process_data(records):

319

# Process the data

320

return len(records)

321

322

data = extract_data()

323

result = process_data(data)

324

325

dag_instance = odbc_taskflow_example()

326

```

327

328

### With Custom Operators

329

330

```python

331

from airflow.providers.odbc.hooks.odbc import OdbcHook

332

from airflow.models import BaseOperator

333

334

class CustomOdbcOperator(BaseOperator):

335

def __init__(self, odbc_conn_id, sql, *args, **kwargs):

336

super().__init__(*args, **kwargs)

337

self.odbc_conn_id = odbc_conn_id

338

self.sql = sql

339

340

def execute(self, context):

341

hook = OdbcHook(odbc_conn_id=self.odbc_conn_id)

342

return hook.get_records(self.sql)

343

```