0
# Connection Management
1
2
Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination. These components provide the foundation for the SQL Hook's database-like interface.
3
4
## Capabilities
5
6
### Connection Factory Function
7
8
Factory function that creates configured ESConnection instances with authentication and connection parameters.
9
10
```python { .api }
11
def connect(
12
host: str = "localhost",
13
port: int = 9200,
14
user: str | None = None,
15
password: str | None = None,
16
scheme: str = "http",
17
**kwargs: Any
18
) -> ESConnection:
19
"""
20
Create an ESConnection instance with specified parameters.
21
22
Parameters:
23
- host: Elasticsearch server hostname (default: "localhost")
24
- port: Elasticsearch server port (default: 9200)
25
- user: Username for authentication (optional)
26
- password: Password for authentication (optional)
27
- scheme: Connection scheme - "http" or "https" (default: "http")
28
- **kwargs: Additional connection arguments
29
30
Returns:
31
Configured ESConnection instance
32
"""
33
```
34
35
### Connection Class
36
37
Wrapper class for elasticsearch.Elasticsearch that provides database-like connection interface with cursor support.
38
39
```python { .api }
40
class ESConnection:
41
"""
42
Wrapper class for elasticsearch.Elasticsearch.
43
44
Provides a database-like connection interface with cursor support
45
and SQL query execution capabilities.
46
"""
47
48
def __init__(
49
self,
50
host: str = "localhost",
51
port: int = 9200,
52
user: str | None = None,
53
password: str | None = None,
54
scheme: str = "http",
55
**kwargs: Any
56
):
57
"""
58
Initialize ESConnection with connection parameters.
59
60
Parameters:
61
- host: Elasticsearch server hostname
62
- port: Elasticsearch server port
63
- user: Username for authentication (optional)
64
- password: Password for authentication (optional)
65
- scheme: Connection scheme ("http" or "https")
66
- **kwargs: Additional Elasticsearch client arguments
67
"""
68
69
def cursor(self) -> ElasticsearchSQLCursor:
70
"""
71
Create a new cursor for executing SQL queries.
72
73
Returns:
74
ElasticsearchSQLCursor instance for query execution
75
"""
76
77
def close(self):
78
"""
79
Close the Elasticsearch connection.
80
"""
81
82
def commit(self):
83
"""
84
Commit transaction (no-op for Elasticsearch).
85
"""
86
87
def execute_sql(
88
self,
89
query: str,
90
params: Iterable | Mapping[str, Any] | None = None
91
) -> ObjectApiResponse:
92
"""
93
Execute a SQL query directly on the connection.
94
95
Parameters:
96
- query: SQL query string to execute
97
- params: Query parameters (optional)
98
99
Returns:
100
ObjectApiResponse from Elasticsearch SQL API
101
"""
102
```
103
104
### Cursor Class
105
106
PEP 249-like cursor class for executing SQL queries against Elasticsearch with full result pagination support.
107
108
```python { .api }
109
class ElasticsearchSQLCursor:
110
"""
111
A PEP 249-like Cursor class for Elasticsearch SQL API.
112
113
Provides standard database cursor interface for SQL query execution
114
with support for result pagination and metadata access.
115
"""
116
117
def __init__(self, es: Elasticsearch, **kwargs):
118
"""
119
Initialize cursor with Elasticsearch client and options.
120
121
Parameters:
122
- es: Elasticsearch client instance
123
- **kwargs: Additional cursor options (fetch_size, field_multi_value_leniency)
124
"""
125
126
@property
127
def response(self) -> ObjectApiResponse:
128
"""
129
Get the current query response.
130
131
Returns:
132
ObjectApiResponse from the last executed query
133
"""
134
135
@property
136
def cursor(self):
137
"""
138
Get the cursor token for pagination.
139
140
Returns:
141
Cursor token string for next page, or None if no more results
142
"""
143
144
@property
145
def rows(self):
146
"""
147
Get the rows from the current response.
148
149
Returns:
150
List of result rows from current query
151
"""
152
153
@property
154
def rowcount(self) -> int:
155
"""
156
Get the number of rows in the current result set.
157
158
Returns:
159
Integer count of rows in current result
160
"""
161
162
@property
163
def description(self) -> list[tuple]:
164
"""
165
Get column descriptions for the result set.
166
167
Returns:
168
List of (column_name, column_type) tuples
169
"""
170
171
def execute(
172
self,
173
statement: str,
174
params: Iterable | Mapping[str, Any] | None = None
175
) -> ObjectApiResponse:
176
"""
177
Execute a SQL statement.
178
179
Parameters:
180
- statement: SQL statement to execute
181
- params: Statement parameters (optional)
182
183
Returns:
184
ObjectApiResponse from Elasticsearch SQL API
185
"""
186
187
def fetchone(self):
188
"""
189
Fetch the next row from the result set.
190
191
Returns:
192
Single row as list, or None if no more rows
193
"""
194
195
def fetchmany(self, size: int | None = None):
196
"""
197
Fetch multiple rows from the result set.
198
199
Parameters:
200
- size: Number of rows to fetch (optional)
201
202
Raises:
203
NotImplementedError (not currently supported)
204
"""
205
206
def fetchall(self):
207
"""
208
Fetch all remaining rows from the result set.
209
210
Automatically handles pagination using cursor tokens.
211
212
Returns:
213
List of all remaining rows
214
"""
215
216
def close(self):
217
"""
218
Close the cursor and clean up resources.
219
"""
220
```
221
222
### Usage Examples
223
224
#### Basic Connection and Query
225
226
```python
227
from airflow.providers.elasticsearch.hooks.elasticsearch import connect
228
229
# Create connection
230
conn = connect(
231
host="localhost",
232
port=9200,
233
user="elastic",
234
password="password",
235
scheme="https"
236
)
237
238
# Execute SQL query
239
result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")
240
241
print(f"Found {len(result['rows'])} rows")
242
for row in result['rows']:
243
print(row)
244
245
# Close connection
246
conn.close()
247
```
248
249
#### Cursor-based Query Execution
250
251
```python
252
from airflow.providers.elasticsearch.hooks.elasticsearch import connect
253
254
# Create connection and cursor
255
conn = connect(host="localhost", port=9200)
256
cursor = conn.cursor()
257
258
# Execute query
259
response = cursor.execute("SELECT name, age, city FROM users WHERE age > 25")
260
261
# Access result metadata
262
print(f"Columns: {cursor.description}")
263
print(f"Row count: {cursor.rowcount}")
264
265
# Fetch results
266
first_row = cursor.fetchone()
267
print(f"First row: {first_row}")
268
269
all_rows = cursor.fetchall()
270
print(f"All rows: {len(all_rows)}")
271
272
# Clean up
273
cursor.close()
274
conn.close()
275
```
276
277
#### Parameterized Queries
278
279
```python
280
conn = connect(host="localhost", port=9200)
281
cursor = conn.cursor()
282
283
# Query with parameters
284
query = "SELECT * FROM logs WHERE level = ? AND timestamp > ?"
285
params = ["ERROR", "2024-01-01T00:00:00"]
286
287
response = cursor.execute(query, params)
288
289
# Process results
290
for row in cursor.rows:
291
print(f"Log entry: {row}")
292
293
cursor.close()
294
conn.close()
295
```
296
297
#### Pagination with Large Result Sets
298
299
```python
300
conn = connect(
301
host="localhost",
302
port=9200,
303
fetch_size=1000 # Set page size
304
)
305
cursor = conn.cursor()
306
307
# Execute large query
308
cursor.execute("SELECT * FROM large_index")
309
310
# fetchall() automatically handles pagination
311
all_results = cursor.fetchall()
312
print(f"Retrieved {len(all_results)} total rows")
313
314
cursor.close()
315
conn.close()
316
```
317
318
#### Advanced Connection Configuration
319
320
```python
321
# Connection with additional Elasticsearch client options
322
conn = connect(
323
host="elasticsearch.example.com",
324
port=9200,
325
user="service_account",
326
password="secret_password",
327
scheme="https",
328
# Additional Elasticsearch client arguments
329
verify_certs=True,
330
ca_certs="/path/to/ca.pem",
331
timeout=30,
332
max_retries=3,
333
retry_on_status_code=[502, 503, 504],
334
http_compress=True,
335
fetch_size=5000,
336
field_multi_value_leniency=True
337
)
338
339
cursor = conn.cursor()
340
cursor.execute("SELECT * FROM secure_index")
341
results = cursor.fetchall()
342
343
cursor.close()
344
conn.close()
345
```
346
347
#### Error Handling
348
349
```python
350
from elasticsearch.exceptions import ConnectionError, RequestError
351
352
conn = connect(host="localhost", port=9200)
353
cursor = conn.cursor()
354
355
try:
356
cursor.execute("SELECT * FROM nonexistent_index")
357
results = cursor.fetchall()
358
except ConnectionError as e:
359
print(f"Connection failed: {e}")
360
except RequestError as e:
361
print(f"Query error: {e}")
362
finally:
363
cursor.close()
364
conn.close()
365
```
366
367
### Configuration Options
368
369
#### Connection Parameters
370
371
The connection accepts various configuration options:
372
373
```python
374
conn = connect(
375
host="localhost", # Elasticsearch host
376
port=9200, # Elasticsearch port
377
user="username", # Authentication username
378
password="password", # Authentication password
379
scheme="https", # Connection scheme
380
381
# Elasticsearch client options
382
verify_certs=True, # SSL certificate verification
383
ca_certs="/path/to/ca.pem", # CA certificate path
384
client_cert="/path/to/cert.pem", # Client certificate
385
client_key="/path/to/key.pem", # Client private key
386
timeout=30, # Request timeout
387
max_retries=3, # Maximum retry attempts
388
http_compress=True, # HTTP compression
389
390
# Cursor options
391
fetch_size=1000, # Page size for results
392
field_multi_value_leniency=False # Multi-value field handling
393
)
394
```
395
396
### Notes
397
398
- The connection wrapper provides a database-like interface over Elasticsearch's native client
399
- All SQL operations use Elasticsearch's SQL API for query execution
400
- Cursor pagination is automatically handled for large result sets
401
- The connection supports all standard Elasticsearch client configuration options
402
- Parameter binding is supported for secure query execution
403
- The cursor implementation follows PEP 249 database API standards where applicable