Provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-sqlite@4.1.00
# Apache Airflow SQLite Provider
1
2
A provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations. This package extends Airflow's SQL capabilities with SQLite-specific connection handling, enabling seamless integration of SQLite databases into data workflows.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-sqlite
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-sqlite`
10
- **Minimum Requirements**: Python >=3.10, Apache Airflow >=2.10.0, apache-airflow-providers-common-sql >=1.26.0
11
12
## Core Imports
13
14
```python
15
# Main hook import
16
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
17
18
# For type hints and advanced usage
19
import sqlite3
20
from airflow.models import Connection
21
from sqlalchemy.engine import Engine, Inspector
22
from sqlalchemy.engine.url import URL
23
```
24
25
Additional imports for DataFrame operations:
26
27
```python
28
# For pandas DataFrames (optional dependency)
29
from pandas import DataFrame as PandasDataFrame
30
31
# For polars DataFrames (optional dependency)
32
from polars import DataFrame as PolarsDataFrame
33
```
34
35
## Basic Usage
36
37
```python
38
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
39
40
# Initialize hook with connection ID
41
hook = SqliteHook(sqlite_conn_id='sqlite_default')
42
43
# Execute a query and get all results
44
results = hook.get_records("SELECT * FROM users WHERE active = ?", parameters=[True])
45
46
# Execute a query and get first result only
47
first_result = hook.get_first("SELECT COUNT(*) FROM users")
48
49
# Run SQL commands (INSERT, UPDATE, DELETE)
50
hook.run("INSERT INTO users (name, email) VALUES (?, ?)", parameters=["John Doe", "john@example.com"])
51
52
# Get results as pandas DataFrame (requires pandas)
53
df = hook.get_df("SELECT * FROM users", df_type="pandas")
54
55
# Get results as polars DataFrame (requires polars)
56
df = hook.get_df("SELECT * FROM users", df_type="polars")
57
58
# Bulk insert multiple rows
59
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
60
hook.insert_rows(table="users", rows=rows, target_fields=["name", "email"])
61
62
# Test connection
63
status, message = hook.test_connection()
64
if status:
65
print("Connection successful")
66
else:
67
print(f"Connection failed: {message}")
68
```
69
70
## Connection Configuration
71
72
SQLite connections support various URI formats:
73
74
```python
75
# File-based database (relative path)
76
sqlite:///path/to/database.db
77
78
# File-based database (absolute path)
79
sqlite:////absolute/path/to/database.db
80
81
# In-memory database
82
sqlite:///:memory:
83
84
# With query parameters
85
sqlite:///path/to/db.sqlite?mode=ro
86
sqlite:///path/to/db.sqlite?mode=rw
87
sqlite:///path/to/db.sqlite?cache=shared
88
```
89
90
## Capabilities
91
92
### Hook Class Definition
93
94
The SqliteHook class provides SQLite database integration.
95
96
```python { .api }
97
class SqliteHook(DbApiHook):
98
"""
99
Interact with SQLite databases.
100
101
Class Attributes:
102
conn_name_attr: str = "sqlite_conn_id"
103
default_conn_name: str = "sqlite_default"
104
conn_type: str = "sqlite"
105
hook_name: str = "Sqlite"
106
"""
107
108
def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwargs):
109
"""
110
Initialize SQLite hook.
111
112
Args:
113
*args: If single positional arg provided, used as connection ID
114
schema (str, optional): Database schema (typically not used with SQLite)
115
log_sql (bool): Whether to log SQL statements (default: True)
116
**kwargs: Additional keyword arguments, including connection ID via conn_name_attr
117
"""
118
```
119
120
### Connection Management
121
122
Establish and manage SQLite database connections with proper URI handling.
123
124
```python { .api }
125
def get_conn(self) -> sqlite3.dbapi2.Connection:
126
"""
127
Return SQLite connection object with proper URI conversion.
128
129
Converts SQLAlchemy URI format to sqlite3-compatible file URI format.
130
Handles file paths, in-memory databases, and query parameters.
131
132
Returns:
133
sqlite3.dbapi2.Connection: SQLite database connection
134
"""
135
136
def get_uri(self) -> str:
137
"""
138
Override DbApiHook get_uri method for SQLAlchemy engine compatibility.
139
140
Transforms Airflow connection URI to SQLAlchemy-compatible format,
141
handling SQLite-specific URI requirements.
142
143
Returns:
144
str: SQLAlchemy-compatible URI string
145
"""
146
147
def get_conn_id(self) -> str:
148
"""
149
Get the connection ID used by this hook.
150
151
Returns:
152
str: Connection ID
153
"""
154
155
def get_cursor(self):
156
"""
157
Get database cursor for executing SQL statements.
158
159
Returns:
160
sqlite3.Cursor: Database cursor object
161
"""
162
163
def test_connection(self):
164
"""
165
Test the SQLite database connection.
166
167
Returns:
168
tuple[bool, str]: (connection_success, status_message)
169
"""
170
```
171
172
### SQL Execution
173
174
Execute SQL statements with parameter binding and transaction control.
175
176
```python { .api }
177
def run(self, sql, autocommit: bool = False, parameters=None, handler=None,
178
split_statements: bool = False, return_last: bool = True):
179
"""
180
Execute SQL statement(s) with optional parameter binding.
181
182
Args:
183
sql (str | list[str]): SQL statement(s) to execute
184
autocommit (bool): Enable autocommit mode (default: False)
185
parameters (list | dict, optional): Query parameters for binding
186
handler (callable, optional): Result handler function
187
split_statements (bool): Split multiple statements (default: False)
188
return_last (bool): Return result from last statement only (default: True)
189
190
Returns:
191
any: Query results based on handler, or None for non-SELECT statements
192
"""
193
194
def get_records(self, sql: str, parameters=None) -> list[tuple]:
195
"""
196
Execute SQL query and return all records.
197
198
Args:
199
sql (str): SQL query to execute
200
parameters (list | dict, optional): Query parameters for binding
201
202
Returns:
203
list[tuple]: List of result tuples
204
"""
205
206
def get_first(self, sql: str, parameters=None):
207
"""
208
Execute SQL query and return first record.
209
210
Args:
211
sql (str): SQL query to execute
212
parameters (list | dict, optional): Query parameters for binding
213
214
Returns:
215
tuple | None: First result tuple or None if no results
216
"""
217
```
218
219
### DataFrame Operations
220
221
Convert query results to pandas or polars DataFrames for data analysis.
222
223
```python { .api }
224
def get_df(self, sql: str, parameters=None, *, df_type: str = "pandas", **kwargs):
225
"""
226
Execute SQL query and return results as DataFrame.
227
228
Args:
229
sql (str): SQL query to execute
230
parameters (list | dict, optional): Query parameters for binding
231
df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
232
**kwargs: Additional arguments passed to DataFrame constructor
233
234
Returns:
235
PandasDataFrame | PolarsDataFrame: DataFrame with query results
236
"""
237
238
def get_df_by_chunks(self, sql: str, parameters=None, *, chunksize: int,
239
df_type: str = "pandas", **kwargs):
240
"""
241
Execute SQL query and return results as DataFrame chunks.
242
243
Args:
244
sql (str): SQL query to execute
245
parameters (list | dict, optional): Query parameters for binding
246
chunksize (int): Number of rows per chunk (required)
247
df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
248
**kwargs: Additional arguments passed to DataFrame constructor
249
250
Yields:
251
PandasDataFrame | PolarsDataFrame: Iterator of DataFrame chunks
252
"""
253
```
254
255
### Bulk Operations
256
257
Efficiently insert multiple rows with batching and transaction control.
258
259
```python { .api }
260
def insert_rows(self, table: str, rows, target_fields=None, commit_every: int = 1000,
261
replace: bool = False, *, executemany: bool = False,
262
fast_executemany: bool = False, autocommit: bool = False, **kwargs) -> None:
263
"""
264
Insert multiple rows into table with batching and optional replacement.
265
266
Args:
267
table (str): Target table name
268
rows (Iterable): Collection of row tuples to insert
269
target_fields (list[str], optional): Column names for insertion
270
commit_every (int): Commit transaction every N rows (default: 1000)
271
replace (bool): Use REPLACE INTO instead of INSERT INTO (default: False)
272
executemany (bool): Use cursor.executemany() for batch insertion (default: False)
273
fast_executemany (bool): Use fast executemany if supported (default: False)
274
autocommit (bool): Enable autocommit mode (default: False)
275
**kwargs: Additional arguments for customization
276
"""
277
```
278
279
### SQLAlchemy Integration
280
281
Access SQLAlchemy engines and metadata for advanced database operations.
282
283
```python { .api }
284
def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:
285
"""
286
Get SQLAlchemy engine for advanced database operations.
287
288
Args:
289
engine_kwargs (dict, optional): Additional engine configuration parameters
290
291
Returns:
292
Engine: SQLAlchemy engine instance
293
"""
294
295
@property
296
def sqlalchemy_url(self) -> URL:
297
"""
298
SQLAlchemy URL object for this connection.
299
300
Returns:
301
URL: SQLAlchemy URL object
302
"""
303
304
@property
305
def inspector(self) -> Inspector:
306
"""
307
SQLAlchemy Inspector for database metadata.
308
309
Returns:
310
Inspector: Database inspector instance
311
"""
312
```
313
314
### Transaction Control
315
316
Manage database transactions and autocommit behavior.
317
318
```python { .api }
319
def get_autocommit(self, conn) -> bool:
320
"""
321
Get autocommit setting for connection.
322
323
Args:
324
conn: Database connection object
325
326
Returns:
327
bool: Current autocommit status
328
"""
329
330
def set_autocommit(self, conn, autocommit: bool) -> None:
331
"""
332
Set autocommit flag on connection.
333
334
Args:
335
conn: Database connection object
336
autocommit (bool): Autocommit setting to apply
337
"""
338
```
339
340
### Properties and Utilities
341
342
Helper methods and properties for SQL operations and metadata access.
343
344
```python { .api }
345
@property
346
def placeholder(self) -> str:
347
"""
348
SQL parameter placeholder character for SQLite.
349
350
Returns:
351
str: "?" (question mark placeholder)
352
"""
353
354
@property
355
def connection(self) -> Connection:
356
"""
357
Airflow connection object for this hook.
358
359
Returns:
360
Connection: Connection object instance
361
"""
362
363
@property
364
def connection_extra(self) -> dict:
365
"""
366
Connection extra parameters as dictionary.
367
368
Returns:
369
dict: Extra connection parameters from connection configuration
370
"""
371
372
@property
373
def last_description(self) -> list:
374
"""
375
Description from last executed cursor.
376
377
Returns:
378
list: Cursor description with column metadata
379
"""
380
381
@staticmethod
382
def split_sql_string(sql: str, strip_semicolon: bool = False) -> list[str]:
383
"""
384
Split SQL string into individual statements.
385
386
Args:
387
sql (str): SQL string with multiple statements
388
strip_semicolon (bool): Remove trailing semicolons (default: False)
389
390
Returns:
391
list[str]: List of individual SQL statements
392
"""
393
394
@staticmethod
395
def strip_sql_string(sql: str) -> str:
396
"""
397
Strip whitespace and comments from SQL string.
398
399
Args:
400
sql (str): SQL string to clean
401
402
Returns:
403
str: Cleaned SQL string
404
"""
405
```
406
407
### Provider Metadata
408
409
Access provider configuration and metadata.
410
411
```python { .api }
412
# From airflow.providers.sqlite.get_provider_info
413
def get_provider_info() -> dict:
414
"""
415
Get provider metadata including integrations and connection types.
416
417
Returns:
418
dict: Provider metadata containing:
419
- package-name: "apache-airflow-providers-sqlite"
420
- name: "SQLite"
421
- description: SQLite provider description
422
- integrations: List of SQLite integration info
423
- hooks: List of available hook modules
424
- connection-types: List of supported connection types
425
"""
426
```
427
428
## Types
429
430
```python { .api }
431
# Type aliases for clarity
432
PandasDataFrame = "pandas.DataFrame"
433
PolarsDataFrame = "polars.DataFrame"
434
Connection = "airflow.models.Connection"
435
Engine = "sqlalchemy.engine.Engine"
436
Inspector = "sqlalchemy.engine.Inspector"
437
URL = "sqlalchemy.engine.URL"
438
```
439
440
## Error Handling
441
442
The SQLite hook handles common database errors and connection issues:
443
444
- **Connection errors**: Invalid file paths, permission issues, database corruption
445
- **SQL errors**: Syntax errors, constraint violations, table/column not found
446
- **Transaction errors**: Deadlocks, lock timeouts, rollback scenarios
447
- **URI format errors**: Invalid connection string formats, parameter parsing
448
449
Common error patterns:
450
451
```python
452
import sqlite3
453
from airflow.exceptions import AirflowException
454
455
try:
456
hook = SqliteHook(sqlite_conn_id='my_sqlite_conn')
457
results = hook.get_records("SELECT * FROM users")
458
except sqlite3.Error as e:
459
# Handle SQLite-specific errors
460
print(f"Database error: {e}")
461
except AirflowException as e:
462
# Handle Airflow-specific errors (connection not found, etc.)
463
print(f"Airflow error: {e}")
464
except Exception as e:
465
# Handle other errors
466
print(f"General error: {e}")
467
```
468
469
## Usage Examples
470
471
### Working with In-Memory Databases
472
473
```python
474
# Connection URI: sqlite:///:memory:
475
hook = SqliteHook(sqlite_conn_id='sqlite_memory')
476
hook.run("CREATE TABLE temp_data (id INTEGER, value TEXT)")
477
hook.insert_rows("temp_data", [(1, "test"), (2, "data")])
478
results = hook.get_records("SELECT * FROM temp_data")
479
```
480
481
### File Database with Custom Parameters
482
483
```python
484
# Connection URI: sqlite:///path/to/db.sqlite?mode=rw&cache=shared
485
hook = SqliteHook(sqlite_conn_id='sqlite_file')
486
results = hook.get_df("SELECT * FROM large_table", df_type="pandas")
487
```
488
489
### Batch Processing with Chunked DataFrames
490
491
```python
492
# Process large results in chunks to manage memory
493
for chunk_df in hook.get_df_by_chunks("SELECT * FROM big_table", chunksize=1000):
494
# Process each chunk
495
processed_chunk = chunk_df.groupby('category').sum()
496
# Save or further process results
497
print(f"Processed chunk with {len(chunk_df)} rows")
498
```
499
500
### Transaction Management
501
502
```python
503
# Manual transaction control
504
with hook._create_autocommit_connection(autocommit=False) as conn:
505
cursor = conn.cursor()
506
try:
507
cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
508
cursor.execute("UPDATE users SET active = 1 WHERE name = ?", ("Alice",))
509
conn.commit()
510
except Exception:
511
conn.rollback()
512
raise
513
```
514
515
### Using SQLAlchemy Engine
516
517
```python
518
# Get SQLAlchemy engine for advanced operations
519
engine = hook.get_sqlalchemy_engine()
520
with engine.connect() as conn:
521
result = conn.execute("SELECT * FROM users")
522
for row in result:
523
print(row)
524
```
525
526
## Connection Configuration
527
528
- **Connection Type**: `sqlite`
529
- **Hook Class**: `airflow.providers.sqlite.hooks.sqlite.SqliteHook`
530
- **Supported URI Schemes**: `sqlite://`
531
- **Default Connection ID**: `sqlite_default`
532
533
## Dependencies
534
535
- **Required**: `apache-airflow>=2.10.0`, `apache-airflow-providers-common-sql>=1.26.0`
536
- **Python**: >=3.10
537
- **Optional**: `pandas` (for pandas DataFrame support), `polars` (for polars DataFrame support)
538
539
## Provider Information
540
541
- **Package Name**: apache-airflow-providers-sqlite
542
- **Provider Name**: SQLite
543
- **Integration**: SQLite database integration
544
- **External Documentation**: https://www.sqlite.org/index.html