or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdindex.mdpython-hook.mdsql-hook.mdtask-logging.mdversion-compatibility.md

task-logging.mddocs/

0

# Task Logging

1

2

Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns. This enables centralized log management and search capabilities for Airflow task execution.

3

4

## Capabilities

5

6

### Task Handler Class

7

8

Main logging handler that extends FileTaskHandler to write task logs to Elasticsearch with configurable formatting and indexing options.

9

10

```python { .api }

11

class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):

12

"""

13

Handler for logging tasks to Elasticsearch.

14

15

Supports writing logs to both local files and Elasticsearch with configurable

16

JSON formatting, external viewer integration, and index management.

17

"""

18

19

def __init__(

20

self,

21

base_log_folder,

22

end_of_log_mark="end_of_log",

23

write_stdout=True,

24

json_format=False,

25

json_fields="asctime, filename, lineno, levelname, message",

26

host_field="host",

27

offset_field="offset",

28

filename_template=None,

29

elasticsearch_configs=None,

30

es_kwargs=None

31

):

32

"""

33

Initialize the Elasticsearch Task Handler.

34

35

Parameters:

36

- base_log_folder: Base directory for log files

37

- end_of_log_mark: Marker string for end of log stream

38

- write_stdout: Whether to write to stdout

39

- json_format: Whether to format logs as JSON

40

- json_fields: Fields to include in JSON logs

41

- host_field: Field name for host information

42

- offset_field: Field name for log offset

43

- filename_template: Template for log file names

44

- elasticsearch_configs: Elasticsearch configuration dictionary

45

- es_kwargs: Additional Elasticsearch connection arguments

46

"""

47

48

def emit(self, record):

49

"""

50

Emit a log record to both file and Elasticsearch.

51

52

Parameters:

53

- record: LogRecord instance to emit

54

"""

55

56

def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:

57

"""

58

Set the logging context for a task instance.

59

60

Parameters:

61

- ti: TaskInstance to set context for

62

- identifier: Optional identifier for the logging context

63

"""

64

65

def close(self) -> None:

66

"""

67

Close the handler and clean up resources.

68

"""

69

70

@property

71

def log_name(self) -> str:

72

"""

73

Get the name of the current log.

74

75

Returns:

76

String name of the log

77

"""

78

79

def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str:

80

"""

81

Get the external log URL (e.g., Kibana) for a task instance.

82

83

Parameters:

84

- task_instance: TaskInstance to get URL for

85

- try_number: Try number for the task execution

86

87

Returns:

88

String URL for external log viewer

89

"""

90

91

@property

92

def supports_external_link(self) -> bool:

93

"""

94

Check if external log links are supported.

95

96

Returns:

97

Boolean indicating external link support

98

"""

99

100

@staticmethod

101

def format_url(host: str) -> str:

102

"""

103

Format the given host string to ensure it starts with 'http' and check if it represents a valid URL.

104

105

Parameters:

106

- host: The host string to format and check

107

108

Returns:

109

Properly formatted host URL string

110

111

Raises:

112

ValueError: If the host is not a valid URL

113

"""

114

```

115

116

### JSON Formatter Class

117

118

Specialized JSON formatter for Elasticsearch log entries with ISO 8601 timestamp formatting.

119

120

```python { .api }

121

class ElasticsearchJSONFormatter(JSONFormatter):

122

"""

123

Convert a log record to JSON with ISO 8601 date and time format.

124

"""

125

126

default_time_format = "%Y-%m-%dT%H:%M:%S"

127

default_msec_format = "%s.%03d"

128

default_tz_format = "%z"

129

130

def formatTime(self, record, datefmt=None):

131

"""

132

Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.

133

134

Parameters:

135

- record: LogRecord instance

136

- datefmt: Optional date format string

137

138

Returns:

139

Formatted timestamp string

140

"""

141

```

142

143

### Configuration Functions

144

145

Utility functions for retrieving and managing Elasticsearch logging configuration.

146

147

```python { .api }

148

def get_es_kwargs_from_config() -> dict[str, Any]:

149

"""

150

Get Elasticsearch connection kwargs from Airflow configuration.

151

152

Returns:

153

Dictionary of Elasticsearch connection parameters

154

"""

155

```

156

157

### Response Classes

158

159

Classes for handling and accessing Elasticsearch search responses and document hits.

160

161

```python { .api }

162

class AttributeList:

163

"""

164

Helper class to provide attribute like access to List objects.

165

"""

166

167

def __init__(self, _list):

168

"""Initialize with a list object."""

169

170

def __getitem__(self, k):

171

"""Retrieve an item or a slice from the list."""

172

173

def __iter__(self):

174

"""Provide an iterator for the list."""

175

176

def __bool__(self):

177

"""Check if the list is non-empty."""

178

179

class AttributeDict:

180

"""

181

Helper class to provide attribute like access to Dictionary objects.

182

"""

183

184

def __init__(self, d):

185

"""Initialize with a dictionary object."""

186

187

def __getattr__(self, attr_name):

188

"""Retrieve an item as an attribute from the dictionary."""

189

190

def __getitem__(self, key):

191

"""Retrieve an item using a key from the dictionary."""

192

193

def to_dict(self):

194

"""Convert back to regular dictionary."""

195

196

class Hit(AttributeDict):

197

"""

198

The Hit class is used to manage and access elements in a document.

199

200

It inherits from the AttributeDict class and provides

201

attribute-like access to its elements, similar to a dictionary.

202

"""

203

204

def __init__(self, document):

205

"""Initialize with document data and metadata."""

206

207

class HitMeta(AttributeDict):

208

"""

209

The HitMeta class is used to manage and access metadata of a document.

210

211

This class inherits from the AttributeDict class and provides

212

attribute-like access to its elements.

213

"""

214

215

def __init__(self, document, exclude=("_source", "_fields")):

216

"""Initialize with document metadata, excluding specified fields."""

217

218

class ElasticSearchResponse(AttributeDict):

219

"""

220

The ElasticSearchResponse class is used to manage and access the response from an Elasticsearch search.

221

222

This class can be iterated over directly to access hits in the response. Indexing the class instance

223

with an integer or slice will also access the hits. The class also evaluates to True

224

if there are any hits in the response.

225

"""

226

227

def __init__(self, search, response, doc_class=None):

228

"""Initialize with search instance, response data, and optional document class."""

229

230

def __iter__(self) -> Iterator[Hit]:

231

"""Provide an iterator over the hits in the Elasticsearch response."""

232

233

def __getitem__(self, key):

234

"""Retrieve a specific hit or a slice of hits from the Elasticsearch response."""

235

236

def __bool__(self):

237

"""Evaluate the presence of hits in the Elasticsearch response."""

238

239

@property

240

def hits(self) -> list[Hit]:

241

"""

242

Access to the hits (results) of the Elasticsearch response.

243

244

The hits are represented as an AttributeList of Hit instances, which allow for easy,

245

attribute-like access to the hit data. Hits are lazily loaded upon first access.

246

"""

247

```

248

249

### Usage Examples

250

251

#### Basic Task Logging Setup

252

253

```python

254

from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler

255

256

# Configure handler with basic settings

257

handler = ElasticsearchTaskHandler(

258

base_log_folder="/opt/airflow/logs",

259

write_stdout=True,

260

json_format=True,

261

elasticsearch_configs={

262

"host": "localhost:9200",

263

"target_index": "airflow-logs"

264

}

265

)

266

267

# Set context for a task

268

from airflow.models import TaskInstance

269

handler.set_context(task_instance)

270

271

# The handler will automatically write logs to Elasticsearch

272

```

273

274

#### Advanced Configuration with Kibana Integration

275

276

```python

277

# Advanced configuration with external viewer

278

handler = ElasticsearchTaskHandler(

279

base_log_folder="/opt/airflow/logs",

280

json_format=True,

281

json_fields="asctime, filename, lineno, levelname, message, dag_id, task_id",

282

elasticsearch_configs={

283

"host": "elasticsearch.example.com:9200",

284

"target_index": "airflow-logs-{ds}",

285

"frontend": "https://kibana.example.com/app/discover?_a=(query:(language:kuery,query:'log_id: \"{log_id}\"'))",

286

"write_to_es": True,

287

"verify_certs": True

288

},

289

es_kwargs={

290

"basic_auth": ("elastic", "password"),

291

"ca_certs": "/etc/ssl/certs/ca.pem"

292

}

293

)

294

```

295

296

#### Custom Log Processing

297

298

```python

299

from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler

300

import logging

301

302

class CustomElasticsearchHandler(ElasticsearchTaskHandler):

303

def emit(self, record):

304

# Add custom fields to log record

305

record.custom_field = "custom_value"

306

record.environment = "production"

307

308

# Call parent emit method

309

super().emit(record)

310

311

def _format_log_message(self, record):

312

# Custom log message formatting

313

return f"[{record.levelname}] {record.getMessage()}"

314

315

# Use custom handler

316

handler = CustomElasticsearchHandler(

317

base_log_folder="/opt/airflow/logs",

318

json_format=True

319

)

320

```

321

322

### Configuration Options

323

324

#### Airflow Configuration

325

326

Configure in `airflow.cfg`:

327

328

```ini

329

[elasticsearch]

330

# Elasticsearch host

331

host = localhost:9200

332

333

# Log ID template for query construction

334

log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}

335

336

# End of log marker

337

end_of_log_mark = end_of_log

338

339

# Kibana frontend URL template

340

frontend = http://localhost:5601/app/kibana#/discover?_a=(query:(language:kuery,query:'log_id: "{log_id}"'))

341

342

# Write to stdout

343

write_stdout = False

344

345

# Write to Elasticsearch

346

write_to_es = True

347

348

# Target index name

349

target_index = airflow-logs

350

351

# JSON formatting

352

json_format = True

353

json_fields = asctime, filename, lineno, levelname, message

354

355

# Field mappings

356

host_field = host

357

offset_field = offset

358

359

# Index patterns for search

360

index_patterns = _all

361

index_patterns_callable =

362

363

[elasticsearch_configs]

364

# HTTP compression

365

http_compress = False

366

367

# Certificate verification

368

verify_certs = True

369

```

370

371

#### Dynamic Index Patterns

372

373

```python

374

def custom_index_pattern(task_instance):

375

"""Custom index pattern based on task instance."""

376

dag_id = task_instance.dag_id

377

execution_date = task_instance.execution_date

378

379

# Create date-based index pattern

380

date_str = execution_date.strftime("%Y.%m.%d")

381

return f"airflow-{dag_id}-{date_str}"

382

383

# Configure in airflow.cfg

384

# index_patterns_callable = mymodule.custom_index_pattern

385

```

386

387

#### External Log Viewer Integration

388

389

The handler supports integration with external log viewers like Kibana:

390

391

```python

392

# Get external log URL for a task

393

handler = ElasticsearchTaskHandler(...)

394

task_instance = TaskInstance(...)

395

396

if handler.supports_external_link:

397

external_url = handler.get_external_log_url(task_instance, try_number=1)

398

print(f"View logs in Kibana: {external_url}")

399

```

400

401

### Notes

402

403

- The handler writes logs to both local files and Elasticsearch simultaneously

404

- JSON formatting is recommended for structured log analysis

405

- Index patterns support date-based partitioning for better performance

406

- External viewer integration requires proper frontend URL configuration

407

- The handler supports authentication and SSL/TLS connections to Elasticsearch

408

- Log records are automatically enriched with task metadata (dag_id, task_id, etc.)