or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdindex.mdpython-hook.mdsql-hook.mdtask-logging.mdversion-compatibility.md

python-hook.mddocs/

0

# Elasticsearch Python Hook

1

2

Native Elasticsearch operations using the official Elasticsearch Python client. This hook provides direct access to all Elasticsearch APIs including search, indexing, cluster management, and advanced features like aggregations and machine learning.

3

4

## Capabilities

5

6

### Python Hook Class

7

8

Hook class that provides native Elasticsearch client access with full API support for all Elasticsearch operations.

9

10

```python { .api }

11

class ElasticsearchPythonHook(BaseHook):

12

"""

13

Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.

14

15

:param hosts: A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]

16

:param es_conn_args: Additional arguments you might need to enter to connect to Elasticsearch.

17

Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}

18

"""

19

20

def __init__(self, hosts: list[Any], es_conn_args: dict | None = None):

21

"""

22

Initialize the Elasticsearch Python Hook.

23

24

Parameters:

25

- hosts: List of Elasticsearch host URLs

26

- es_conn_args: Dictionary of additional connection arguments

27

"""

28

29

@cached_property

30

def get_conn(self) -> Elasticsearch:

31

"""

32

Return the Elasticsearch client (cached).

33

34

This is implemented as a cached_property, so the client connection

35

is created once and reused for subsequent calls.

36

37

Returns:

38

Cached Elasticsearch client instance with connection configured

39

"""

40

41

def search(self, query: dict[Any, Any], index: str = "_all") -> dict:

42

"""

43

Return results matching a query using Elasticsearch DSL.

44

45

Parameters:

46

- index: The index you want to query (default: "_all")

47

- query: The query you want to run as a dictionary

48

49

Returns:

50

The response 'hits' object from Elasticsearch

51

"""

52

```

53

54

### Usage Examples

55

56

#### Basic Search Operations

57

58

```python

59

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

60

61

# Initialize hook with single host

62

hook = ElasticsearchPythonHook(

63

hosts=["http://localhost:9200"]

64

)

65

66

# Simple search query

67

query = {

68

"query": {

69

"match": {

70

"message": "error"

71

}

72

}

73

}

74

75

results = hook.search(query=query, index="logs-*")

76

print(f"Found {len(results['hits'])} matching documents")

77

78

for hit in results['hits']:

79

print(f"Document ID: {hit['_id']}, Score: {hit['_score']}")

80

print(f"Source: {hit['_source']}")

81

```

82

83

#### Advanced Search with Authentication

84

85

```python

86

# Initialize with authentication and SSL settings

87

hook = ElasticsearchPythonHook(

88

hosts=["https://elasticsearch.example.com:9200"],

89

es_conn_args={

90

"basic_auth": ("username", "password"),

91

"ca_certs": "/path/to/ca.crt",

92

"verify_certs": True,

93

"ssl_show_warn": False

94

}

95

)

96

97

# Complex aggregation query

98

query = {

99

"query": {

100

"bool": {

101

"must": [

102

{"range": {"@timestamp": {"gte": "2024-01-01"}}},

103

{"term": {"service.name": "api"}}

104

]

105

}

106

},

107

"aggs": {

108

"status_codes": {

109

"terms": {

110

"field": "http.response.status_code",

111

"size": 10

112

}

113

}

114

},

115

"size": 0

116

}

117

118

results = hook.search(query=query, index="logs-2024-*")

119

status_codes = results.get('aggregations', {}).get('status_codes', {}).get('buckets', [])

120

121

for bucket in status_codes:

122

print(f"Status {bucket['key']}: {bucket['doc_count']} occurrences")

123

```

124

125

#### Using Native Elasticsearch Client

126

127

```python

128

# Get direct access to the Elasticsearch client

129

hook = ElasticsearchPythonHook(

130

hosts=["http://localhost:9200"],

131

es_conn_args={"timeout": 30}

132

)

133

134

es_client = hook.get_conn

135

136

# Index a document

137

doc = {

138

"timestamp": "2024-01-01T12:00:00",

139

"message": "Application started",

140

"level": "INFO"

141

}

142

143

response = es_client.index(

144

index="application-logs",

145

document=doc

146

)

147

print(f"Document indexed with ID: {response['_id']}")

148

149

# Get cluster health

150

health = es_client.cluster.health()

151

print(f"Cluster status: {health['status']}")

152

print(f"Number of nodes: {health['number_of_nodes']}")

153

154

# Create an index with mapping

155

mapping = {

156

"mappings": {

157

"properties": {

158

"timestamp": {"type": "date"},

159

"message": {"type": "text"},

160

"level": {"type": "keyword"}

161

}

162

}

163

}

164

165

es_client.indices.create(index="custom-logs", body=mapping)

166

```

167

168

#### Bulk Operations

169

170

```python

171

from elasticsearch import helpers

172

173

hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])

174

es_client = hook.get_conn

175

176

# Bulk index documents

177

docs = [

178

{

179

"_index": "bulk-logs",

180

"_source": {

181

"timestamp": "2024-01-01T12:00:00",

182

"message": f"Log message {i}",

183

"level": "INFO"

184

}

185

}

186

for i in range(1000)

187

]

188

189

# Use helpers for bulk operations

190

success, failed = helpers.bulk(es_client, docs)

191

print(f"Successfully indexed: {success}, Failed: {len(failed)}")

192

```

193

194

### Connection Configuration

195

196

The hook accepts various connection arguments through the `es_conn_args` parameter:

197

198

#### Authentication

199

```python

200

# Basic authentication

201

es_conn_args = {

202

"basic_auth": ("username", "password")

203

}

204

205

# API Key authentication

206

es_conn_args = {

207

"api_key": ("api_key_id", "api_key_secret")

208

}

209

210

# Bearer token authentication

211

es_conn_args = {

212

"bearer_auth": "bearer_token_string"

213

}

214

```

215

216

#### SSL/TLS Configuration

217

```python

218

es_conn_args = {

219

"ca_certs": "/path/to/ca.pem",

220

"client_cert": "/path/to/client.pem",

221

"client_key": "/path/to/client-key.pem",

222

"verify_certs": True,

223

"ssl_show_warn": False

224

}

225

```

226

227

#### Connection Tuning

228

```python

229

es_conn_args = {

230

"timeout": 30,

231

"max_retries": 3,

232

"retry_on_timeout": True,

233

"http_compress": True,

234

"headers": {"User-Agent": "My-App/1.0"}

235

}

236

```

237

238

### Notes

239

240

- The hook provides access to the full Elasticsearch Python client API

241

- All Elasticsearch operations are supported (search, index, delete, cluster management, etc.)

242

- Connection is cached using `@cached_property` for performance

243

- The hook supports all Elasticsearch client configuration options

244

- For basic search operations, use the `search()` method; for advanced operations, use `get_conn` to access the client directly