or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Airflow SQLite Provider

1

2

A provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations. This package extends Airflow's SQL capabilities with SQLite-specific connection handling, enabling seamless integration of SQLite databases into data workflows.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-sqlite

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-sqlite`

10

- **Minimum Requirements**: Python >=3.10, Apache Airflow >=2.10.0, apache-airflow-providers-common-sql >=1.26.0

11

12

## Core Imports

13

14

```python

15

# Main hook import

16

from airflow.providers.sqlite.hooks.sqlite import SqliteHook

17

18

# For type hints and advanced usage

19

import sqlite3

20

from airflow.models import Connection

21

from sqlalchemy.engine import Engine, Inspector

22

from sqlalchemy.engine.url import URL

23

```

24

25

Additional imports for DataFrame operations:

26

27

```python

28

# For pandas DataFrames (optional dependency)

29

from pandas import DataFrame as PandasDataFrame

30

31

# For polars DataFrames (optional dependency)

32

from polars import DataFrame as PolarsDataFrame

33

```

34

35

## Basic Usage

36

37

```python

38

from airflow.providers.sqlite.hooks.sqlite import SqliteHook

39

40

# Initialize hook with connection ID

41

hook = SqliteHook(sqlite_conn_id='sqlite_default')

42

43

# Execute a query and get all results

44

results = hook.get_records("SELECT * FROM users WHERE active = ?", parameters=[True])

45

46

# Execute a query and get first result only

47

first_result = hook.get_first("SELECT COUNT(*) FROM users")

48

49

# Run SQL commands (INSERT, UPDATE, DELETE)

50

hook.run("INSERT INTO users (name, email) VALUES (?, ?)", parameters=["John Doe", "john@example.com"])

51

52

# Get results as pandas DataFrame (requires pandas)

53

df = hook.get_df("SELECT * FROM users", df_type="pandas")

54

55

# Get results as polars DataFrame (requires polars)

56

df = hook.get_df("SELECT * FROM users", df_type="polars")

57

58

# Bulk insert multiple rows

59

rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]

60

hook.insert_rows(table="users", rows=rows, target_fields=["name", "email"])

61

62

# Test connection

63

status, message = hook.test_connection()

64

if status:

65

print("Connection successful")

66

else:

67

print(f"Connection failed: {message}")

68

```

69

70

## Connection Configuration

71

72

SQLite connections support various URI formats:

73

74

```python

75

# File-based database (relative path)

76

sqlite:///path/to/database.db

77

78

# File-based database (absolute path)

79

sqlite:////absolute/path/to/database.db

80

81

# In-memory database

82

sqlite:///:memory:

83

84

# With query parameters

85

sqlite:///path/to/db.sqlite?mode=ro

86

sqlite:///path/to/db.sqlite?mode=rw

87

sqlite:///path/to/db.sqlite?cache=shared

88

```

89

90

## Capabilities

91

92

### Hook Class Definition

93

94

The SqliteHook class provides SQLite database integration.

95

96

```python { .api }

97

class SqliteHook(DbApiHook):

98

"""

99

Interact with SQLite databases.

100

101

Class Attributes:

102

conn_name_attr: str = "sqlite_conn_id"

103

default_conn_name: str = "sqlite_default"

104

conn_type: str = "sqlite"

105

hook_name: str = "Sqlite"

106

"""

107

108

def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwargs):

109

"""

110

Initialize SQLite hook.

111

112

Args:

113

*args: If single positional arg provided, used as connection ID

114

schema (str, optional): Database schema (typically not used with SQLite)

115

log_sql (bool): Whether to log SQL statements (default: True)

116

**kwargs: Additional keyword arguments, including connection ID via conn_name_attr

117

"""

118

```

119

120

### Connection Management

121

122

Establish and manage SQLite database connections with proper URI handling.

123

124

```python { .api }

125

def get_conn(self) -> sqlite3.dbapi2.Connection:

126

"""

127

Return SQLite connection object with proper URI conversion.

128

129

Converts SQLAlchemy URI format to sqlite3-compatible file URI format.

130

Handles file paths, in-memory databases, and query parameters.

131

132

Returns:

133

sqlite3.dbapi2.Connection: SQLite database connection

134

"""

135

136

def get_uri(self) -> str:

137

"""

138

Override DbApiHook get_uri method for SQLAlchemy engine compatibility.

139

140

Transforms Airflow connection URI to SQLAlchemy-compatible format,

141

handling SQLite-specific URI requirements.

142

143

Returns:

144

str: SQLAlchemy-compatible URI string

145

"""

146

147

def get_conn_id(self) -> str:

148

"""

149

Get the connection ID used by this hook.

150

151

Returns:

152

str: Connection ID

153

"""

154

155

def get_cursor(self):

156

"""

157

Get database cursor for executing SQL statements.

158

159

Returns:

160

sqlite3.Cursor: Database cursor object

161

"""

162

163

def test_connection(self):

164

"""

165

Test the SQLite database connection.

166

167

Returns:

168

tuple[bool, str]: (connection_success, status_message)

169

"""

170

```

171

172

### SQL Execution

173

174

Execute SQL statements with parameter binding and transaction control.

175

176

```python { .api }

177

def run(self, sql, autocommit: bool = False, parameters=None, handler=None,

178

split_statements: bool = False, return_last: bool = True):

179

"""

180

Execute SQL statement(s) with optional parameter binding.

181

182

Args:

183

sql (str | list[str]): SQL statement(s) to execute

184

autocommit (bool): Enable autocommit mode (default: False)

185

parameters (list | dict, optional): Query parameters for binding

186

handler (callable, optional): Result handler function

187

split_statements (bool): Split multiple statements (default: False)

188

return_last (bool): Return result from last statement only (default: True)

189

190

Returns:

191

any: Query results based on handler, or None for non-SELECT statements

192

"""

193

194

def get_records(self, sql: str, parameters=None) -> list[tuple]:

195

"""

196

Execute SQL query and return all records.

197

198

Args:

199

sql (str): SQL query to execute

200

parameters (list | dict, optional): Query parameters for binding

201

202

Returns:

203

list[tuple]: List of result tuples

204

"""

205

206

def get_first(self, sql: str, parameters=None):

207

"""

208

Execute SQL query and return first record.

209

210

Args:

211

sql (str): SQL query to execute

212

parameters (list | dict, optional): Query parameters for binding

213

214

Returns:

215

tuple | None: First result tuple or None if no results

216

"""

217

```

218

219

### DataFrame Operations

220

221

Convert query results to pandas or polars DataFrames for data analysis.

222

223

```python { .api }

224

def get_df(self, sql: str, parameters=None, *, df_type: str = "pandas", **kwargs):

225

"""

226

Execute SQL query and return results as DataFrame.

227

228

Args:

229

sql (str): SQL query to execute

230

parameters (list | dict, optional): Query parameters for binding

231

df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")

232

**kwargs: Additional arguments passed to DataFrame constructor

233

234

Returns:

235

PandasDataFrame | PolarsDataFrame: DataFrame with query results

236

"""

237

238

def get_df_by_chunks(self, sql: str, parameters=None, *, chunksize: int,

239

df_type: str = "pandas", **kwargs):

240

"""

241

Execute SQL query and return results as DataFrame chunks.

242

243

Args:

244

sql (str): SQL query to execute

245

parameters (list | dict, optional): Query parameters for binding

246

chunksize (int): Number of rows per chunk (required)

247

df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")

248

**kwargs: Additional arguments passed to DataFrame constructor

249

250

Yields:

251

PandasDataFrame | PolarsDataFrame: Iterator of DataFrame chunks

252

"""

253

```

254

255

### Bulk Operations

256

257

Efficiently insert multiple rows with batching and transaction control.

258

259

```python { .api }

260

def insert_rows(self, table: str, rows, target_fields=None, commit_every: int = 1000,

261

replace: bool = False, *, executemany: bool = False,

262

fast_executemany: bool = False, autocommit: bool = False, **kwargs) -> None:

263

"""

264

Insert multiple rows into table with batching and optional replacement.

265

266

Args:

267

table (str): Target table name

268

rows (Iterable): Collection of row tuples to insert

269

target_fields (list[str], optional): Column names for insertion

270

commit_every (int): Commit transaction every N rows (default: 1000)

271

replace (bool): Use REPLACE INTO instead of INSERT INTO (default: False)

272

executemany (bool): Use cursor.executemany() for batch insertion (default: False)

273

fast_executemany (bool): Use fast executemany if supported (default: False)

274

autocommit (bool): Enable autocommit mode (default: False)

275

**kwargs: Additional arguments for customization

276

"""

277

```

278

279

### SQLAlchemy Integration

280

281

Access SQLAlchemy engines and metadata for advanced database operations.

282

283

```python { .api }

284

def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:

285

"""

286

Get SQLAlchemy engine for advanced database operations.

287

288

Args:

289

engine_kwargs (dict, optional): Additional engine configuration parameters

290

291

Returns:

292

Engine: SQLAlchemy engine instance

293

"""

294

295

@property

296

def sqlalchemy_url(self) -> URL:

297

"""

298

SQLAlchemy URL object for this connection.

299

300

Returns:

301

URL: SQLAlchemy URL object

302

"""

303

304

@property

305

def inspector(self) -> Inspector:

306

"""

307

SQLAlchemy Inspector for database metadata.

308

309

Returns:

310

Inspector: Database inspector instance

311

"""

312

```

313

314

### Transaction Control

315

316

Manage database transactions and autocommit behavior.

317

318

```python { .api }

319

def get_autocommit(self, conn) -> bool:

320

"""

321

Get autocommit setting for connection.

322

323

Args:

324

conn: Database connection object

325

326

Returns:

327

bool: Current autocommit status

328

"""

329

330

def set_autocommit(self, conn, autocommit: bool) -> None:

331

"""

332

Set autocommit flag on connection.

333

334

Args:

335

conn: Database connection object

336

autocommit (bool): Autocommit setting to apply

337

"""

338

```

339

340

### Properties and Utilities

341

342

Helper methods and properties for SQL operations and metadata access.

343

344

```python { .api }

345

@property

346

def placeholder(self) -> str:

347

"""

348

SQL parameter placeholder character for SQLite.

349

350

Returns:

351

str: "?" (question mark placeholder)

352

"""

353

354

@property

355

def connection(self) -> Connection:

356

"""

357

Airflow connection object for this hook.

358

359

Returns:

360

Connection: Connection object instance

361

"""

362

363

@property

364

def connection_extra(self) -> dict:

365

"""

366

Connection extra parameters as dictionary.

367

368

Returns:

369

dict: Extra connection parameters from connection configuration

370

"""

371

372

@property

373

def last_description(self) -> list:

374

"""

375

Description from last executed cursor.

376

377

Returns:

378

list: Cursor description with column metadata

379

"""

380

381

@staticmethod

382

def split_sql_string(sql: str, strip_semicolon: bool = False) -> list[str]:

383

"""

384

Split SQL string into individual statements.

385

386

Args:

387

sql (str): SQL string with multiple statements

388

strip_semicolon (bool): Remove trailing semicolons (default: False)

389

390

Returns:

391

list[str]: List of individual SQL statements

392

"""

393

394

@staticmethod

395

def strip_sql_string(sql: str) -> str:

396

"""

397

Strip whitespace and comments from SQL string.

398

399

Args:

400

sql (str): SQL string to clean

401

402

Returns:

403

str: Cleaned SQL string

404

"""

405

```

406

407

### Provider Metadata

408

409

Access provider configuration and metadata.

410

411

```python { .api }

412

# From airflow.providers.sqlite.get_provider_info

413

def get_provider_info() -> dict:

414

"""

415

Get provider metadata including integrations and connection types.

416

417

Returns:

418

dict: Provider metadata containing:

419

- package-name: "apache-airflow-providers-sqlite"

420

- name: "SQLite"

421

- description: SQLite provider description

422

- integrations: List of SQLite integration info

423

- hooks: List of available hook modules

424

- connection-types: List of supported connection types

425

"""

426

```

427

428

## Types

429

430

```python { .api }

431

# Type aliases for clarity

432

PandasDataFrame = "pandas.DataFrame"

433

PolarsDataFrame = "polars.DataFrame"

434

Connection = "airflow.models.Connection"

435

Engine = "sqlalchemy.engine.Engine"

436

Inspector = "sqlalchemy.engine.Inspector"

437

URL = "sqlalchemy.engine.URL"

438

```

439

440

## Error Handling

441

442

The SQLite hook handles common database errors and connection issues:

443

444

- **Connection errors**: Invalid file paths, permission issues, database corruption

445

- **SQL errors**: Syntax errors, constraint violations, table/column not found

446

- **Transaction errors**: Deadlocks, lock timeouts, rollback scenarios

447

- **URI format errors**: Invalid connection string formats, parameter parsing

448

449

Common error patterns:

450

451

```python

452

import sqlite3

453

from airflow.exceptions import AirflowException

454

455

try:

456

hook = SqliteHook(sqlite_conn_id='my_sqlite_conn')

457

results = hook.get_records("SELECT * FROM users")

458

except sqlite3.Error as e:

459

# Handle SQLite-specific errors

460

print(f"Database error: {e}")

461

except AirflowException as e:

462

# Handle Airflow-specific errors (connection not found, etc.)

463

print(f"Airflow error: {e}")

464

except Exception as e:

465

# Handle other errors

466

print(f"General error: {e}")

467

```

468

469

## Usage Examples

470

471

### Working with In-Memory Databases

472

473

```python

474

# Connection URI: sqlite:///:memory:

475

hook = SqliteHook(sqlite_conn_id='sqlite_memory')

476

hook.run("CREATE TABLE temp_data (id INTEGER, value TEXT)")

477

hook.insert_rows("temp_data", [(1, "test"), (2, "data")])

478

results = hook.get_records("SELECT * FROM temp_data")

479

```

480

481

### File Database with Custom Parameters

482

483

```python

484

# Connection URI: sqlite:///path/to/db.sqlite?mode=rw&cache=shared

485

hook = SqliteHook(sqlite_conn_id='sqlite_file')

486

results = hook.get_df("SELECT * FROM large_table", df_type="pandas")

487

```

488

489

### Batch Processing with Chunked DataFrames

490

491

```python

492

# Process large results in chunks to manage memory

493

for chunk_df in hook.get_df_by_chunks("SELECT * FROM big_table", chunksize=1000):

494

# Process each chunk

495

processed_chunk = chunk_df.groupby('category').sum()

496

# Save or further process results

497

print(f"Processed chunk with {len(chunk_df)} rows")

498

```

499

500

### Transaction Management

501

502

```python

503

# Manual transaction control

504

with hook._create_autocommit_connection(autocommit=False) as conn:

505

cursor = conn.cursor()

506

try:

507

cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))

508

cursor.execute("UPDATE users SET active = 1 WHERE name = ?", ("Alice",))

509

conn.commit()

510

except Exception:

511

conn.rollback()

512

raise

513

```

514

515

### Using SQLAlchemy Engine

516

517

```python

518

# Get SQLAlchemy engine for advanced operations

519

engine = hook.get_sqlalchemy_engine()

520

with engine.connect() as conn:

521

result = conn.execute("SELECT * FROM users")

522

for row in result:

523

print(row)

524

```

525

526

## Connection Configuration

527

528

- **Connection Type**: `sqlite`

529

- **Hook Class**: `airflow.providers.sqlite.hooks.sqlite.SqliteHook`

530

- **Supported URI Schemes**: `sqlite://`

531

- **Default Connection ID**: `sqlite_default`

532

533

## Dependencies

534

535

- **Required**: `apache-airflow>=2.10.0`, `apache-airflow-providers-common-sql>=1.26.0`

536

- **Python**: >=3.10

537

- **Optional**: `pandas` (for pandas DataFrame support), `polars` (for polars DataFrame support)

538

539

## Provider Information

540

541

- **Package Name**: apache-airflow-providers-sqlite

542

- **Provider Name**: SQLite

543

- **Integration**: SQLite database integration

544

- **External Documentation**: https://www.sqlite.org/index.html