or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-elasticsearch@6.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-elasticsearch@6.3.0

0

# Apache Airflow Elasticsearch Provider

1

2

A provider package that enables Elasticsearch integration for Apache Airflow workflows. This package provides hooks for connecting to Elasticsearch clusters, logging capabilities that write task logs directly to Elasticsearch indexes, and SQL query execution against Elasticsearch using the SQL API.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-elasticsearch

7

- **Package Type**: pip

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-elasticsearch`

10

- **Version**: 6.3.2

11

- **Requirements**:

12

- apache-airflow >= 2.10.0

13

- elasticsearch >= 8.10, < 9

14

- apache-airflow-providers-common-sql >= 1.27.0

15

16

## Core Imports

17

18

Standard imports for using the provider:

19

20

```python

21

# Main package version

22

from airflow.providers.elasticsearch import __version__

23

24

# SQL Hook for database-like operations

25

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook

26

27

# Python Hook for native Elasticsearch operations

28

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

29

30

# Task logging handler

31

from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler

32

33

# JSON formatter for structured logging

34

from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter

35

36

# Response classes for handling Elasticsearch results

37

from airflow.providers.elasticsearch.log.es_response import (

38

AttributeList, AttributeDict, Hit, HitMeta, ElasticSearchResponse

39

)

40

41

# Version compatibility utilities

42

from airflow.providers.elasticsearch.version_compat import (

43

get_base_airflow_version_tuple, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS,

44

BaseHook, EsLogMsgType

45

)

46

```

47

48

Connection components:

49

50

```python

51

# Direct connection utilities

52

from airflow.providers.elasticsearch.hooks.elasticsearch import connect, ESConnection, ElasticsearchSQLCursor

53

```

54

55

## Basic Usage

56

57

### Setting up an Elasticsearch Connection

58

59

```python

60

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook

61

62

# Using Airflow connection

63

hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')

64

conn = hook.get_conn()

65

66

# Execute SQL query

67

result = conn.execute_sql("SELECT * FROM my_index LIMIT 10")

68

print(result)

69

```

70

71

### Basic Search Operations

72

73

```python

74

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

75

76

# Initialize with hosts

77

hook = ElasticsearchPythonHook(

78

hosts=["http://localhost:9200"],

79

es_conn_args={"basic_auth": ("user", "password")}

80

)

81

82

# Perform search

83

query = {

84

"query": {

85

"match": {

86

"message": "error"

87

}

88

}

89

}

90

91

results = hook.search(query=query, index="logs-*")

92

print(f"Found {len(results['hits'])} results")

93

```

94

95

### Logging Configuration

96

97

```python

98

# In airflow.cfg

99

[elasticsearch]

100

host = localhost:9200

101

write_to_es = True

102

target_index = airflow-logs

103

json_format = True

104

```

105

106

## Architecture

107

108

The provider follows Airflow's standard provider architecture with three main components:

109

110

- **Hooks**: Provide connection interfaces to Elasticsearch clusters

111

- **Logging**: Integrate Elasticsearch as a logging backend for task outputs

112

- **Version Compatibility**: Handle version differences between Airflow releases

113

114

The package supports both SQL-based queries through the Elasticsearch SQL API and native Elasticsearch operations through the Python client, with configurable logging that can write task outputs to Elasticsearch indexes with optional JSON formatting and Kibana frontend integration.

115

116

## Capabilities

117

118

### Elasticsearch SQL Hook

119

120

Database-like interface using Elasticsearch's SQL API with PEP 249 compliance. Supports connection management, cursor operations, and SQL query execution against Elasticsearch indexes.

121

122

```python { .api }

123

class ElasticsearchSQLHook(DbApiHook):

124

def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs): ...

125

def get_conn(self) -> ESConnection: ...

126

def get_uri(self) -> str: ...

127

```

128

129

[SQL Hook](./sql-hook.md)

130

131

### Elasticsearch Python Hook

132

133

Native Elasticsearch operations using the official Python client. Provides direct access to Elasticsearch APIs for search, indexing, and cluster management operations.

134

135

```python { .api }

136

class ElasticsearchPythonHook(BaseHook):

137

def __init__(self, hosts: list[Any], es_conn_args: dict | None = None): ...

138

def get_conn(self) -> Elasticsearch: ... # Note: This is a cached_property in implementation

139

def search(self, query: dict[Any, Any], index: str = "_all") -> dict: ...

140

```

141

142

[Python Hook](./python-hook.md)

143

144

### Task Logging

145

146

Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns.

147

148

```python { .api }

149

class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):

150

def __init__(

151

self,

152

base_log_folder: str,

153

end_of_log_mark: str,

154

write_stdout: bool,

155

json_format: bool,

156

json_fields: str,

157

write_to_es: bool = False,

158

target_index: str = "airflow-logs",

159

host_field: str = "host",

160

offset_field: str = "offset",

161

host: str = "http://localhost:9200",

162

frontend: str = "localhost:5601",

163

index_patterns: str = ...,

164

index_patterns_callable: str = "",

165

es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",

166

**kwargs

167

): ...

168

def emit(self, record): ...

169

def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: ...

170

def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: ...

171

```

172

173

[Task Logging](./task-logging.md)

174

175

### Connection Management

176

177

Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination.

178

179

```python { .api }

180

def connect(host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any) -> ESConnection: ...

181

182

class ESConnection:

183

def __init__(self, host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any): ...

184

def cursor(self) -> ElasticsearchSQLCursor: ...

185

def execute_sql(self, query: str, params: Iterable | Mapping[str, Any] | None = None) -> ObjectApiResponse: ...

186

```

187

188

[Connection Management](./connection-management.md)

189

190

### Version Compatibility

191

192

Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility.

193

194

```python { .api }

195

def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...

196

197

AIRFLOW_V_3_0_PLUS: bool # Version compatibility flag

198

AIRFLOW_V_3_1_PLUS: bool # Version compatibility flag

199

BaseHook: type # Base hook class, conditionally imported

200

EsLogMsgType: type # Type alias for log message types

201

```

202

203

[Version Compatibility](./version-compatibility.md)