or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdindex.mdpython-hook.mdsql-hook.mdtask-logging.mdversion-compatibility.md

connection-management.mddocs/

0

# Connection Management

1

2

Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination. These components provide the foundation for the SQL Hook's database-like interface.

3

4

## Capabilities

5

6

### Connection Factory Function

7

8

Factory function that creates configured ESConnection instances with authentication and connection parameters.

9

10

```python { .api }

11

def connect(

12

host: str = "localhost",

13

port: int = 9200,

14

user: str | None = None,

15

password: str | None = None,

16

scheme: str = "http",

17

**kwargs: Any

18

) -> ESConnection:

19

"""

20

Create an ESConnection instance with specified parameters.

21

22

Parameters:

23

- host: Elasticsearch server hostname (default: "localhost")

24

- port: Elasticsearch server port (default: 9200)

25

- user: Username for authentication (optional)

26

- password: Password for authentication (optional)

27

- scheme: Connection scheme - "http" or "https" (default: "http")

28

- **kwargs: Additional connection arguments

29

30

Returns:

31

Configured ESConnection instance

32

"""

33

```

34

35

### Connection Class

36

37

Wrapper class for elasticsearch.Elasticsearch that provides database-like connection interface with cursor support.

38

39

```python { .api }

40

class ESConnection:

41

"""

42

Wrapper class for elasticsearch.Elasticsearch.

43

44

Provides a database-like connection interface with cursor support

45

and SQL query execution capabilities.

46

"""

47

48

def __init__(

49

self,

50

host: str = "localhost",

51

port: int = 9200,

52

user: str | None = None,

53

password: str | None = None,

54

scheme: str = "http",

55

**kwargs: Any

56

):

57

"""

58

Initialize ESConnection with connection parameters.

59

60

Parameters:

61

- host: Elasticsearch server hostname

62

- port: Elasticsearch server port

63

- user: Username for authentication (optional)

64

- password: Password for authentication (optional)

65

- scheme: Connection scheme ("http" or "https")

66

- **kwargs: Additional Elasticsearch client arguments

67

"""

68

69

def cursor(self) -> ElasticsearchSQLCursor:

70

"""

71

Create a new cursor for executing SQL queries.

72

73

Returns:

74

ElasticsearchSQLCursor instance for query execution

75

"""

76

77

def close(self):

78

"""

79

Close the Elasticsearch connection.

80

"""

81

82

def commit(self):

83

"""

84

Commit transaction (no-op for Elasticsearch).

85

"""

86

87

def execute_sql(

88

self,

89

query: str,

90

params: Iterable | Mapping[str, Any] | None = None

91

) -> ObjectApiResponse:

92

"""

93

Execute a SQL query directly on the connection.

94

95

Parameters:

96

- query: SQL query string to execute

97

- params: Query parameters (optional)

98

99

Returns:

100

ObjectApiResponse from Elasticsearch SQL API

101

"""

102

```

103

104

### Cursor Class

105

106

PEP 249-like cursor class for executing SQL queries against Elasticsearch with full result pagination support.

107

108

```python { .api }

109

class ElasticsearchSQLCursor:

110

"""

111

A PEP 249-like Cursor class for Elasticsearch SQL API.

112

113

Provides standard database cursor interface for SQL query execution

114

with support for result pagination and metadata access.

115

"""

116

117

def __init__(self, es: Elasticsearch, **kwargs):

118

"""

119

Initialize cursor with Elasticsearch client and options.

120

121

Parameters:

122

- es: Elasticsearch client instance

123

- **kwargs: Additional cursor options (fetch_size, field_multi_value_leniency)

124

"""

125

126

@property

127

def response(self) -> ObjectApiResponse:

128

"""

129

Get the current query response.

130

131

Returns:

132

ObjectApiResponse from the last executed query

133

"""

134

135

@property

136

def cursor(self):

137

"""

138

Get the cursor token for pagination.

139

140

Returns:

141

Cursor token string for next page, or None if no more results

142

"""

143

144

@property

145

def rows(self):

146

"""

147

Get the rows from the current response.

148

149

Returns:

150

List of result rows from current query

151

"""

152

153

@property

154

def rowcount(self) -> int:

155

"""

156

Get the number of rows in the current result set.

157

158

Returns:

159

Integer count of rows in current result

160

"""

161

162

@property

163

def description(self) -> list[tuple]:

164

"""

165

Get column descriptions for the result set.

166

167

Returns:

168

List of (column_name, column_type) tuples

169

"""

170

171

def execute(

172

self,

173

statement: str,

174

params: Iterable | Mapping[str, Any] | None = None

175

) -> ObjectApiResponse:

176

"""

177

Execute a SQL statement.

178

179

Parameters:

180

- statement: SQL statement to execute

181

- params: Statement parameters (optional)

182

183

Returns:

184

ObjectApiResponse from Elasticsearch SQL API

185

"""

186

187

def fetchone(self):

188

"""

189

Fetch the next row from the result set.

190

191

Returns:

192

Single row as list, or None if no more rows

193

"""

194

195

def fetchmany(self, size: int | None = None):

196

"""

197

Fetch multiple rows from the result set.

198

199

Parameters:

200

- size: Number of rows to fetch (optional)

201

202

Raises:

203

NotImplementedError (not currently supported)

204

"""

205

206

def fetchall(self):

207

"""

208

Fetch all remaining rows from the result set.

209

210

Automatically handles pagination using cursor tokens.

211

212

Returns:

213

List of all remaining rows

214

"""

215

216

def close(self):

217

"""

218

Close the cursor and clean up resources.

219

"""

220

```

221

222

### Usage Examples

223

224

#### Basic Connection and Query

225

226

```python

227

from airflow.providers.elasticsearch.hooks.elasticsearch import connect

228

229

# Create connection

230

conn = connect(

231

host="localhost",

232

port=9200,

233

user="elastic",

234

password="password",

235

scheme="https"

236

)

237

238

# Execute SQL query

239

result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")

240

241

print(f"Found {len(result['rows'])} rows")

242

for row in result['rows']:

243

print(row)

244

245

# Close connection

246

conn.close()

247

```

248

249

#### Cursor-based Query Execution

250

251

```python

252

from airflow.providers.elasticsearch.hooks.elasticsearch import connect

253

254

# Create connection and cursor

255

conn = connect(host="localhost", port=9200)

256

cursor = conn.cursor()

257

258

# Execute query

259

response = cursor.execute("SELECT name, age, city FROM users WHERE age > 25")

260

261

# Access result metadata

262

print(f"Columns: {cursor.description}")

263

print(f"Row count: {cursor.rowcount}")

264

265

# Fetch results

266

first_row = cursor.fetchone()

267

print(f"First row: {first_row}")

268

269

all_rows = cursor.fetchall()

270

print(f"All rows: {len(all_rows)}")

271

272

# Clean up

273

cursor.close()

274

conn.close()

275

```

276

277

#### Parameterized Queries

278

279

```python

280

conn = connect(host="localhost", port=9200)

281

cursor = conn.cursor()

282

283

# Query with parameters

284

query = "SELECT * FROM logs WHERE level = ? AND timestamp > ?"

285

params = ["ERROR", "2024-01-01T00:00:00"]

286

287

response = cursor.execute(query, params)

288

289

# Process results

290

for row in cursor.rows:

291

print(f"Log entry: {row}")

292

293

cursor.close()

294

conn.close()

295

```

296

297

#### Pagination with Large Result Sets

298

299

```python

300

conn = connect(

301

host="localhost",

302

port=9200,

303

fetch_size=1000 # Set page size

304

)

305

cursor = conn.cursor()

306

307

# Execute large query

308

cursor.execute("SELECT * FROM large_index")

309

310

# fetchall() automatically handles pagination

311

all_results = cursor.fetchall()

312

print(f"Retrieved {len(all_results)} total rows")

313

314

cursor.close()

315

conn.close()

316

```

317

318

#### Advanced Connection Configuration

319

320

```python

321

# Connection with additional Elasticsearch client options

322

conn = connect(

323

host="elasticsearch.example.com",

324

port=9200,

325

user="service_account",

326

password="secret_password",

327

scheme="https",

328

# Additional Elasticsearch client arguments

329

verify_certs=True,

330

ca_certs="/path/to/ca.pem",

331

timeout=30,

332

max_retries=3,

333

retry_on_status_code=[502, 503, 504],

334

http_compress=True,

335

fetch_size=5000,

336

field_multi_value_leniency=True

337

)

338

339

cursor = conn.cursor()

340

cursor.execute("SELECT * FROM secure_index")

341

results = cursor.fetchall()

342

343

cursor.close()

344

conn.close()

345

```

346

347

#### Error Handling

348

349

```python

350

from elasticsearch.exceptions import ConnectionError, RequestError

351

352

conn = connect(host="localhost", port=9200)

353

cursor = conn.cursor()

354

355

try:

356

cursor.execute("SELECT * FROM nonexistent_index")

357

results = cursor.fetchall()

358

except ConnectionError as e:

359

print(f"Connection failed: {e}")

360

except RequestError as e:

361

print(f"Query error: {e}")

362

finally:

363

cursor.close()

364

conn.close()

365

```

366

367

### Configuration Options

368

369

#### Connection Parameters

370

371

The connection accepts various configuration options:

372

373

```python

374

conn = connect(

375

host="localhost", # Elasticsearch host

376

port=9200, # Elasticsearch port

377

user="username", # Authentication username

378

password="password", # Authentication password

379

scheme="https", # Connection scheme

380

381

# Elasticsearch client options

382

verify_certs=True, # SSL certificate verification

383

ca_certs="/path/to/ca.pem", # CA certificate path

384

client_cert="/path/to/cert.pem", # Client certificate

385

client_key="/path/to/key.pem", # Client private key

386

timeout=30, # Request timeout

387

max_retries=3, # Maximum retry attempts

388

http_compress=True, # HTTP compression

389

390

# Cursor options

391

fetch_size=1000, # Page size for results

392

field_multi_value_leniency=False # Multi-value field handling

393

)

394

```

395

396

### Notes

397

398

- The connection wrapper provides a database-like interface over Elasticsearch's native client

399

- All SQL operations use Elasticsearch's SQL API for query execution

400

- Cursor pagination is automatically handled for large result sets

401

- The connection supports all standard Elasticsearch client configuration options

402

- Parameter binding is supported for secure query execution

403

- The cursor implementation follows PEP 249 database API standards where applicable