0
# Task Logging
1
2
Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns. This enables centralized log management and search capabilities for Airflow task execution.
3
4
## Capabilities
5
6
### Task Handler Class
7
8
Main logging handler that extends FileTaskHandler to write task logs to Elasticsearch with configurable formatting and indexing options.
9
10
```python { .api }
11
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
12
"""
13
Handler for logging tasks to Elasticsearch.
14
15
Supports writing logs to both local files and Elasticsearch with configurable
16
JSON formatting, external viewer integration, and index management.
17
"""
18
19
def __init__(
20
self,
21
base_log_folder,
22
end_of_log_mark="end_of_log",
23
write_stdout=True,
24
json_format=False,
25
json_fields="asctime, filename, lineno, levelname, message",
26
host_field="host",
27
offset_field="offset",
28
filename_template=None,
29
elasticsearch_configs=None,
30
es_kwargs=None
31
):
32
"""
33
Initialize the Elasticsearch Task Handler.
34
35
Parameters:
36
- base_log_folder: Base directory for log files
37
- end_of_log_mark: Marker string for end of log stream
38
- write_stdout: Whether to write to stdout
39
- json_format: Whether to format logs as JSON
40
- json_fields: Fields to include in JSON logs
41
- host_field: Field name for host information
42
- offset_field: Field name for log offset
43
- filename_template: Template for log file names
44
- elasticsearch_configs: Elasticsearch configuration dictionary
45
- es_kwargs: Additional Elasticsearch connection arguments
46
"""
47
48
def emit(self, record):
49
"""
50
Emit a log record to both file and Elasticsearch.
51
52
Parameters:
53
- record: LogRecord instance to emit
54
"""
55
56
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
57
"""
58
Set the logging context for a task instance.
59
60
Parameters:
61
- ti: TaskInstance to set context for
62
- identifier: Optional identifier for the logging context
63
"""
64
65
def close(self) -> None:
66
"""
67
Close the handler and clean up resources.
68
"""
69
70
@property
71
def log_name(self) -> str:
72
"""
73
Get the name of the current log.
74
75
Returns:
76
String name of the log
77
"""
78
79
def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str:
80
"""
81
Get the external log URL (e.g., Kibana) for a task instance.
82
83
Parameters:
84
- task_instance: TaskInstance to get URL for
85
- try_number: Try number for the task execution
86
87
Returns:
88
String URL for external log viewer
89
"""
90
91
@property
92
def supports_external_link(self) -> bool:
93
"""
94
Check if external log links are supported.
95
96
Returns:
97
Boolean indicating external link support
98
"""
99
100
@staticmethod
101
def format_url(host: str) -> str:
102
"""
103
Format the given host string to ensure it starts with 'http' and check if it represents a valid URL.
104
105
Parameters:
106
- host: The host string to format and check
107
108
Returns:
109
Properly formatted host URL string
110
111
Raises:
112
ValueError: If the host is not a valid URL
113
"""
114
```
115
116
### JSON Formatter Class
117
118
Specialized JSON formatter for Elasticsearch log entries with ISO 8601 timestamp formatting.
119
120
```python { .api }
121
class ElasticsearchJSONFormatter(JSONFormatter):
122
"""
123
Convert a log record to JSON with ISO 8601 date and time format.
124
"""
125
126
default_time_format = "%Y-%m-%dT%H:%M:%S"
127
default_msec_format = "%s.%03d"
128
default_tz_format = "%z"
129
130
def formatTime(self, record, datefmt=None):
131
"""
132
Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.
133
134
Parameters:
135
- record: LogRecord instance
136
- datefmt: Optional date format string
137
138
Returns:
139
Formatted timestamp string
140
"""
141
```
142
143
### Configuration Functions
144
145
Utility functions for retrieving and managing Elasticsearch logging configuration.
146
147
```python { .api }
148
def get_es_kwargs_from_config() -> dict[str, Any]:
149
"""
150
Get Elasticsearch connection kwargs from Airflow configuration.
151
152
Returns:
153
Dictionary of Elasticsearch connection parameters
154
"""
155
```
156
157
### Response Classes
158
159
Classes for handling and accessing Elasticsearch search responses and document hits.
160
161
```python { .api }
162
class AttributeList:
163
"""
164
Helper class to provide attribute like access to List objects.
165
"""
166
167
def __init__(self, _list):
168
"""Initialize with a list object."""
169
170
def __getitem__(self, k):
171
"""Retrieve an item or a slice from the list."""
172
173
def __iter__(self):
174
"""Provide an iterator for the list."""
175
176
def __bool__(self):
177
"""Check if the list is non-empty."""
178
179
class AttributeDict:
180
"""
181
Helper class to provide attribute like access to Dictionary objects.
182
"""
183
184
def __init__(self, d):
185
"""Initialize with a dictionary object."""
186
187
def __getattr__(self, attr_name):
188
"""Retrieve an item as an attribute from the dictionary."""
189
190
def __getitem__(self, key):
191
"""Retrieve an item using a key from the dictionary."""
192
193
def to_dict(self):
194
"""Convert back to regular dictionary."""
195
196
class Hit(AttributeDict):
197
"""
198
The Hit class is used to manage and access elements in a document.
199
200
It inherits from the AttributeDict class and provides
201
attribute-like access to its elements, similar to a dictionary.
202
"""
203
204
def __init__(self, document):
205
"""Initialize with document data and metadata."""
206
207
class HitMeta(AttributeDict):
208
"""
209
The HitMeta class is used to manage and access metadata of a document.
210
211
This class inherits from the AttributeDict class and provides
212
attribute-like access to its elements.
213
"""
214
215
def __init__(self, document, exclude=("_source", "_fields")):
216
"""Initialize with document metadata, excluding specified fields."""
217
218
class ElasticSearchResponse(AttributeDict):
219
"""
220
The ElasticSearchResponse class is used to manage and access the response from an Elasticsearch search.
221
222
This class can be iterated over directly to access hits in the response. Indexing the class instance
223
with an integer or slice will also access the hits. The class also evaluates to True
224
if there are any hits in the response.
225
"""
226
227
def __init__(self, search, response, doc_class=None):
228
"""Initialize with search instance, response data, and optional document class."""
229
230
def __iter__(self) -> Iterator[Hit]:
231
"""Provide an iterator over the hits in the Elasticsearch response."""
232
233
def __getitem__(self, key):
234
"""Retrieve a specific hit or a slice of hits from the Elasticsearch response."""
235
236
def __bool__(self):
237
"""Evaluate the presence of hits in the Elasticsearch response."""
238
239
@property
240
def hits(self) -> list[Hit]:
241
"""
242
Access to the hits (results) of the Elasticsearch response.
243
244
The hits are represented as an AttributeList of Hit instances, which allow for easy,
245
attribute-like access to the hit data. Hits are lazily loaded upon first access.
246
"""
247
```
248
249
### Usage Examples
250
251
#### Basic Task Logging Setup
252
253
```python
254
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
255
256
# Configure handler with basic settings
257
handler = ElasticsearchTaskHandler(
258
base_log_folder="/opt/airflow/logs",
259
write_stdout=True,
260
json_format=True,
261
elasticsearch_configs={
262
"host": "localhost:9200",
263
"target_index": "airflow-logs"
264
}
265
)
266
267
# Set context for a task
268
from airflow.models import TaskInstance
269
handler.set_context(task_instance)
270
271
# The handler will automatically write logs to Elasticsearch
272
```
273
274
#### Advanced Configuration with Kibana Integration
275
276
```python
277
# Advanced configuration with external viewer
278
handler = ElasticsearchTaskHandler(
279
base_log_folder="/opt/airflow/logs",
280
json_format=True,
281
json_fields="asctime, filename, lineno, levelname, message, dag_id, task_id",
282
elasticsearch_configs={
283
"host": "elasticsearch.example.com:9200",
284
"target_index": "airflow-logs-{ds}",
285
"frontend": "https://kibana.example.com/app/discover?_a=(query:(language:kuery,query:'log_id: \"{log_id}\"'))",
286
"write_to_es": True,
287
"verify_certs": True
288
},
289
es_kwargs={
290
"basic_auth": ("elastic", "password"),
291
"ca_certs": "/etc/ssl/certs/ca.pem"
292
}
293
)
294
```
295
296
#### Custom Log Processing
297
298
```python
299
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
300
import logging
301
302
class CustomElasticsearchHandler(ElasticsearchTaskHandler):
303
def emit(self, record):
304
# Add custom fields to log record
305
record.custom_field = "custom_value"
306
record.environment = "production"
307
308
# Call parent emit method
309
super().emit(record)
310
311
def _format_log_message(self, record):
312
# Custom log message formatting
313
return f"[{record.levelname}] {record.getMessage()}"
314
315
# Use custom handler
316
handler = CustomElasticsearchHandler(
317
base_log_folder="/opt/airflow/logs",
318
json_format=True
319
)
320
```
321
322
### Configuration Options
323
324
#### Airflow Configuration
325
326
Configure in `airflow.cfg`:
327
328
```ini
329
[elasticsearch]
330
# Elasticsearch host
331
host = localhost:9200
332
333
# Log ID template for query construction
334
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
335
336
# End of log marker
337
end_of_log_mark = end_of_log
338
339
# Kibana frontend URL template
340
frontend = http://localhost:5601/app/kibana#/discover?_a=(query:(language:kuery,query:'log_id: "{log_id}"'))
341
342
# Write to stdout
343
write_stdout = False
344
345
# Write to Elasticsearch
346
write_to_es = True
347
348
# Target index name
349
target_index = airflow-logs
350
351
# JSON formatting
352
json_format = True
353
json_fields = asctime, filename, lineno, levelname, message
354
355
# Field mappings
356
host_field = host
357
offset_field = offset
358
359
# Index patterns for search
360
index_patterns = _all
361
index_patterns_callable =
362
363
[elasticsearch_configs]
364
# HTTP compression
365
http_compress = False
366
367
# Certificate verification
368
verify_certs = True
369
```
370
371
#### Dynamic Index Patterns
372
373
```python
374
def custom_index_pattern(task_instance):
375
"""Custom index pattern based on task instance."""
376
dag_id = task_instance.dag_id
377
execution_date = task_instance.execution_date
378
379
# Create date-based index pattern
380
date_str = execution_date.strftime("%Y.%m.%d")
381
return f"airflow-{dag_id}-{date_str}"
382
383
# Configure in airflow.cfg
384
# index_patterns_callable = mymodule.custom_index_pattern
385
```
386
387
#### External Log Viewer Integration
388
389
The handler supports integration with external log viewers like Kibana:
390
391
```python
392
# Get external log URL for a task
393
handler = ElasticsearchTaskHandler(...)
394
task_instance = TaskInstance(...)
395
396
if handler.supports_external_link:
397
external_url = handler.get_external_log_url(task_instance, try_number=1)
398
print(f"View logs in Kibana: {external_url}")
399
```
400
401
### Notes
402
403
- The handler writes logs to both local files and Elasticsearch simultaneously
404
- JSON formatting is recommended for structured log analysis
405
- Index patterns support date-based partitioning for better performance
406
- External viewer integration requires proper frontend URL configuration
407
- The handler supports authentication and SSL/TLS connections to Elasticsearch
408
- Log records are automatically enriched with task metadata (dag_id, task_id, etc.)