or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dialects.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

hooks.mddocs/

0

# Database Hooks

1

2

Database hooks provide the foundation for connecting to and interacting with SQL databases in Airflow. The DbApiHook serves as an abstract base class that provides common database operations, while specific database providers extend this class for their particular database systems.

3

4

## Capabilities

5

6

### Core Database Hook

7

8

The primary abstract base class for all SQL database hooks, providing connection management, query execution, and data operations.

9

10

```python { .api }

11

class DbApiHook:

12

"""

13

Abstract base class for SQL hooks that provides common database operations.

14

15

Attributes:

16

conn_name_attr (str): Name of the default connection attribute

17

default_conn_name (str): Default connection ID

18

supports_autocommit (bool): Whether database supports autocommit

19

supports_executemany (bool): Whether database supports executemany

20

placeholder (str): SQL placeholder character for parameters

21

connection: Database connection object

22

sqlalchemy_url: SQLAlchemy URL object

23

inspector: SQLAlchemy inspector for schema operations

24

dialect: Database dialect object for SQL formatting

25

reserved_words: Set of database reserved words

26

"""

27

28

def get_conn(self):

29

"""

30

Get database connection.

31

32

Returns:

33

Database connection object

34

"""

35

36

def get_uri(self):

37

"""

38

Extract URI from connection.

39

40

Returns:

41

str: Database connection URI

42

"""

43

44

def get_sqlalchemy_engine(self, engine_kwargs=None):

45

"""

46

Get SQLAlchemy engine for the connection.

47

48

Args:

49

engine_kwargs (dict, optional): Additional engine parameters

50

51

Returns:

52

sqlalchemy.engine.Engine: SQLAlchemy engine

53

"""

54

55

def get_df(self, sql, parameters=None, **kwargs):

56

"""

57

Execute SQL query and return results as a DataFrame.

58

59

Args:

60

sql (str): SQL query to execute

61

parameters (dict, optional): Query parameters

62

**kwargs: Additional arguments for DataFrame creation

63

64

Returns:

65

pandas.DataFrame or polars.DataFrame: Query results as DataFrame

66

"""

67

68

def get_df_by_chunks(self, sql, parameters=None, chunksize=None, **kwargs):

69

"""

70

Execute SQL query and return results as chunked DataFrames.

71

72

Args:

73

sql (str): SQL query to execute

74

parameters (dict, optional): Query parameters

75

chunksize (int, optional): Number of rows per chunk

76

**kwargs: Additional arguments for DataFrame creation

77

78

Returns:

79

Generator yielding DataFrame chunks

80

"""

81

82

def get_records(self, sql, parameters=None):

83

"""

84

Execute SQL query and return results as list of tuples.

85

86

Args:

87

sql (str): SQL query to execute

88

parameters (dict, optional): Query parameters

89

90

Returns:

91

list: List of result tuples

92

"""

93

94

def get_first(self, sql, parameters=None):

95

"""

96

Execute SQL query and return first row.

97

98

Args:

99

sql (str): SQL query to execute

100

parameters (dict, optional): Query parameters

101

102

Returns:

103

tuple or None: First result row or None if no results

104

"""

105

106

def run(self, sql, autocommit=False, parameters=None, handler=None, split_statements=True, return_last=True):

107

"""

108

Execute SQL command(s) with various options.

109

110

Args:

111

sql (str or list): SQL statement(s) to execute

112

autocommit (bool): Whether to commit automatically

113

parameters (dict, optional): Query parameters

114

handler (callable, optional): Result handler function

115

split_statements (bool): Whether to split multiple statements

116

return_last (bool): Whether to return only last result

117

118

Returns:

119

Any: Query results based on handler and return_last settings

120

"""

121

122

def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):

123

"""

124

Insert rows into database table.

125

126

Args:

127

table (str): Target table name

128

rows (list): List of row tuples to insert

129

target_fields (list, optional): Target column names

130

commit_every (int): Commit after this many rows

131

replace (bool): Whether to use REPLACE instead of INSERT

132

"""

133

134

def bulk_dump(self, table, tmp_file):

135

"""

136

Dump table contents to file.

137

138

Args:

139

table (str): Table name to dump

140

tmp_file (str): Target file path

141

"""

142

143

def bulk_load(self, table, tmp_file):

144

"""

145

Load file contents into table.

146

147

Args:

148

table (str): Target table name

149

tmp_file (str): Source file path

150

"""

151

152

def test_connection(self):

153

"""

154

Test database connection.

155

156

Returns:

157

tuple: (success: bool, message: str)

158

"""

159

```

160

161

### Database Connection Protocol

162

163

Protocol interface defining the expected database connection interface.

164

165

```python { .api }

166

class ConnectorProtocol:

167

"""

168

Protocol defining database connection interface.

169

"""

170

171

def connect(self, host: str, port: int, username: str, schema: str):

172

"""

173

Connect to database.

174

175

Args:

176

host (str): Database host

177

port (int): Database port

178

username (str): Username for connection

179

schema (str): Database schema/name

180

181

Returns:

182

Database connection object

183

"""

184

```

185

186

### Result Handler Functions

187

188

Pre-built handler functions for processing query results.

189

190

```python { .api }

191

def fetch_all_handler(cursor):

192

"""

193

Handler to fetch all query results.

194

195

Args:

196

cursor: Database cursor object

197

198

Returns:

199

list: All query results

200

"""

201

202

def fetch_one_handler(cursor):

203

"""

204

Handler to fetch first query result.

205

206

Args:

207

cursor: Database cursor object

208

209

Returns:

210

Any: First query result or None

211

"""

212

213

def return_single_query_results(sql, return_last, split_statements, result):

214

"""

215

Determine when to return single vs multiple query results.

216

217

Args:

218

sql (str or list): Original SQL statement(s)

219

return_last (bool): Whether to return only last result

220

split_statements (bool): Whether statements were split

221

result: Query results

222

223

Returns:

224

Any: Processed results based on parameters

225

"""

226

```

227

228

### Constants

229

230

```python { .api }

231

# Set of supported SQL placeholders

232

SQL_PLACEHOLDERS: frozenset = frozenset({"%s", "?"})

233

```

234

235

## Usage Examples

236

237

### Basic Database Operations

238

239

```python

240

from airflow.providers.common.sql.hooks.sql import DbApiHook

241

242

# Custom hook extending DbApiHook

243

class MyDatabaseHook(DbApiHook):

244

conn_name_attr = 'my_conn_id'

245

default_conn_name = 'my_default_conn'

246

supports_autocommit = True

247

248

def get_conn(self):

249

# Implementation specific to your database

250

pass

251

252

# Use the hook

253

hook = MyDatabaseHook(conn_id='my_database')

254

255

# Execute query and get records

256

results = hook.get_records('SELECT * FROM users WHERE active = %s', parameters=[True])

257

258

# Execute query and get DataFrame

259

df = hook.get_df('SELECT name, email FROM users LIMIT 10')

260

261

# Insert data

262

rows = [('John', 'john@example.com'), ('Jane', 'jane@example.com')]

263

hook.insert_rows('users', rows, target_fields=['name', 'email'])

264

265

# Test connection

266

success, message = hook.test_connection()

267

```

268

269

### Using Result Handlers

270

271

```python

272

from airflow.providers.common.sql.hooks.handlers import fetch_all_handler, fetch_one_handler

273

274

# Execute with custom handler

275

result = hook.run(

276

'SELECT COUNT(*) FROM orders WHERE date = %s',

277

parameters=['2023-01-01'],

278

handler=fetch_one_handler

279

)

280

281

# Get only the count value

282

count = result[0] if result else 0

283

```