0
# HDFS Logging Integration
1
2
Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure. This allows organizations to store all task execution logs in their data lake for long-term retention and analysis.
3
4
## Capabilities
5
6
### HDFS Task Handler
7
8
Main handler class for managing task logs in HDFS with automatic upload and retrieval capabilities.
9
10
```python { .api }
11
class HdfsTaskHandler:
12
"""
13
HDFS Task Handler for storing and retrieving Airflow task logs in HDFS.
14
15
Extends airflow FileTaskHandler and uploads to and reads from HDFS storage.
16
"""
17
18
def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
19
"""
20
Initialize HDFS task handler.
21
22
Parameters:
23
base_log_folder: Local base folder for temporary log storage
24
hdfs_log_folder: HDFS folder for permanent log storage
25
**kwargs: Additional configuration options including delete_local_copy
26
"""
27
28
def set_context(self, ti, *, identifier: str | None = None) -> None:
29
"""
30
Set the task instance context for log handling.
31
32
Parameters:
33
ti: TaskInstance object
34
identifier: Optional identifier for the context
35
"""
36
37
def close(self) -> None:
38
"""
39
Close handler and upload local log file to HDFS.
40
41
Automatically uploads logs when upload_on_close is True and marks
42
handler as closed to prevent duplicate uploads.
43
"""
44
```
45
46
### Remote Log I/O Operations
47
48
Low-level class for handling HDFS log upload and retrieval operations.
49
50
```python { .api }
51
class HdfsRemoteLogIO:
52
"""
53
Handles remote log I/O operations for HDFS storage.
54
55
Attributes:
56
remote_base (str): Remote base path in HDFS
57
base_log_folder (Path): Local base log folder path
58
delete_local_copy (bool): Whether to delete local copies after upload
59
processors (tuple): Log processors (empty tuple)
60
"""
61
62
def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:
63
"""
64
Upload the given log path to HDFS remote storage.
65
66
Parameters:
67
path: Local log file path to upload
68
ti: Task instance for context
69
"""
70
71
def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]:
72
"""
73
Read log file from HDFS remote storage.
74
75
Parameters:
76
relative_path: Relative path to log file in HDFS
77
ti: Task instance for context
78
79
Returns:
80
tuple: (LogSourceInfo, LogMessages) where LogSourceInfo contains metadata
81
and LogMessages contains the actual log content lines
82
"""
83
84
@property
85
def hook(self):
86
"""
87
Get WebHDFS hook instance for HDFS operations.
88
89
Returns:
90
WebHDFSHook: Configured hook using REMOTE_LOG_CONN_ID from config
91
"""
92
```
93
94
## Configuration
95
96
### Airflow Configuration
97
98
Configure HDFS logging in `airflow.cfg`:
99
100
```ini
101
[logging]
102
# Enable remote logging
103
remote_logging = True
104
105
# HDFS connection for log storage
106
remote_log_conn_id = hdfs_logs
107
108
# Base log folder in HDFS
109
remote_base_log_folder = hdfs://namenode:9000/airflow/logs
110
111
# Task handler class
112
task_log_reader = airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler
113
114
# Local log cleanup
115
delete_local_logs = True
116
117
[core]
118
# Set logging configuration file
119
logging_config_class = airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG
120
```
121
122
### Connection Setup
123
124
Create HDFS connection for logging:
125
126
```python
127
# Connection configuration for HDFS logs
128
conn_id = 'hdfs_logs'
129
conn_type = 'webhdfs'
130
host = 'namenode.hadoop.cluster'
131
port = 9870
132
login = 'airflow'
133
schema = 'webhdfs/v1'
134
135
# For Kerberos environments
136
extras = {
137
"use_ssl": True,
138
"verify": True
139
}
140
```
141
142
### Handler Configuration
143
144
Configure the HDFS task handler in your logging configuration:
145
146
```python
147
# In airflow_local_settings.py or custom logging config
148
149
LOGGING_CONFIG = {
150
'version': 1,
151
'disable_existing_loggers': False,
152
'handlers': {
153
'hdfs_task': {
154
'class': 'airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler',
155
'base_log_folder': '/opt/airflow/logs',
156
'hdfs_log_folder': 'hdfs://namenode:9000/airflow/logs',
157
'delete_local_copy': True,
158
}
159
},
160
'loggers': {
161
'airflow.task': {
162
'handlers': ['hdfs_task'],
163
'level': 'INFO',
164
'propagate': False,
165
}
166
}
167
}
168
```
169
170
## Usage Examples
171
172
### Basic HDFS Logging Setup
173
174
Enable HDFS logging for all tasks:
175
176
```python
177
# airflow.cfg configuration
178
[logging]
179
remote_logging = True
180
remote_log_conn_id = production_hdfs
181
remote_base_log_folder = hdfs://cluster:9000/logs/airflow
182
task_log_reader = airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler
183
delete_local_logs = True
184
185
# DAG tasks will automatically use HDFS logging
186
from airflow import DAG
187
from airflow.operators.python import PythonOperator
188
from datetime import datetime
189
190
def my_task():
191
print("This log will be stored in HDFS")
192
# Task logic here
193
194
dag = DAG('hdfs_logged_dag', start_date=datetime(2024, 1, 1))
195
196
task = PythonOperator(
197
task_id='logged_task',
198
python_callable=my_task,
199
dag=dag
200
)
201
# Logs automatically uploaded to HDFS after task completion
202
```
203
204
### Programmatic Log Access
205
206
Access task logs stored in HDFS programmatically:
207
208
```python
209
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler, HdfsRemoteLogIO
210
from airflow.models import TaskInstance, DagRun
211
from pathlib import Path
212
213
def retrieve_task_logs(dag_id: str, task_id: str, execution_date: str):
214
"""Retrieve task logs from HDFS storage."""
215
216
# Initialize log I/O handler
217
log_io = HdfsRemoteLogIO(
218
remote_base='/airflow/logs',
219
base_log_folder=Path('/tmp/airflow/logs'),
220
delete_local_copy=False
221
)
222
223
# Construct log path
224
log_path = f"{dag_id}/{task_id}/{execution_date}/1.log"
225
226
# Create mock task instance for context
227
class MockTI:
228
dag_id = dag_id
229
task_id = task_id
230
execution_date = execution_date
231
232
mock_ti = MockTI()
233
234
# Read logs from HDFS
235
messages, logs = log_io.read(log_path, mock_ti)
236
237
return {
238
'messages': messages,
239
'logs': logs,
240
'path': log_path
241
}
242
243
# Example usage
244
log_data = retrieve_task_logs(
245
dag_id='data_pipeline',
246
task_id='extract_data',
247
execution_date='2024-01-15T10:00:00+00:00'
248
)
249
250
print("Log messages:", log_data['messages'])
251
print("Log content:", '\n'.join(log_data['logs']))
252
```
253
254
### Custom Handler Configuration
255
256
Create custom HDFS task handler with specific settings:
257
258
```python
259
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
260
import logging
261
262
class CustomHdfsTaskHandler(HdfsTaskHandler):
263
"""Custom HDFS task handler with additional features."""
264
265
def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
266
# Custom configuration
267
kwargs.setdefault('delete_local_copy', True)
268
super().__init__(base_log_folder, hdfs_log_folder, **kwargs)
269
270
# Add custom formatting
271
formatter = logging.Formatter(
272
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
273
)
274
if self.handler:
275
self.handler.setFormatter(formatter)
276
277
def close(self):
278
"""Custom close with additional cleanup."""
279
# Add custom logic before upload
280
self.log.info("Uploading task logs to HDFS with custom handler")
281
super().close()
282
self.log.info("Log upload completed")
283
284
# Use in logging configuration
285
CUSTOM_LOGGING_CONFIG = {
286
'handlers': {
287
'custom_hdfs_task': {
288
'class': '__main__.CustomHdfsTaskHandler',
289
'base_log_folder': '/opt/airflow/logs',
290
'hdfs_log_folder': 'hdfs://namenode:9000/logs/airflow',
291
'delete_local_copy': True,
292
}
293
}
294
}
295
```
296
297
### Log Retention and Management
298
299
Implement log retention policies with HDFS:
300
301
```python
302
from airflow import DAG
303
from airflow.operators.python import PythonOperator
304
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
305
from datetime import datetime, timedelta
306
import re
307
308
def cleanup_old_logs():
309
"""Clean up old log files from HDFS based on retention policy."""
310
311
hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')
312
client = hook.get_conn()
313
314
# Define retention period (e.g., 90 days)
315
cutoff_date = datetime.now() - timedelta(days=90)
316
317
# List all DAG directories in logs
318
log_base = '/airflow/logs'
319
dag_dirs = client.list(log_base)
320
321
for dag_dir in dag_dirs:
322
dag_path = f"{log_base}/{dag_dir}"
323
324
try:
325
# List task directories
326
task_dirs = client.list(dag_path)
327
328
for task_dir in task_dirs:
329
task_path = f"{dag_path}/{task_dir}"
330
331
# List execution date directories
332
exec_dirs = client.list(task_path)
333
334
for exec_dir in exec_dirs:
335
# Parse execution date from directory name
336
date_match = re.match(r'(\d{4}-\d{2}-\d{2})', exec_dir)
337
if date_match:
338
exec_date = datetime.strptime(date_match.group(1), '%Y-%m-%d')
339
340
if exec_date < cutoff_date:
341
# Delete old log directory
342
old_path = f"{task_path}/{exec_dir}"
343
client.delete(old_path, recursive=True)
344
print(f"Deleted old logs: {old_path}")
345
346
except Exception as e:
347
print(f"Error processing {dag_path}: {e}")
348
349
# DAG for log cleanup
350
cleanup_dag = DAG(
351
'hdfs_log_cleanup',
352
default_args={'owner': 'admin'},
353
description='Clean up old HDFS logs',
354
schedule_interval='@weekly', # Run weekly
355
start_date=datetime(2024, 1, 1),
356
catchup=False
357
)
358
359
cleanup_task = PythonOperator(
360
task_id='cleanup_old_logs',
361
python_callable=cleanup_old_logs,
362
dag=cleanup_dag
363
)
364
```
365
366
### Monitoring Log Upload Status
367
368
Monitor HDFS log upload success and failures:
369
370
```python
371
from airflow import DAG
372
from airflow.operators.python import PythonOperator
373
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
374
from airflow.models import TaskInstance, DagRun
375
from airflow.utils.state import State
376
from datetime import datetime, timedelta
377
378
def check_log_upload_status():
379
"""Check if logs were successfully uploaded to HDFS."""
380
381
hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')
382
383
# Get recent task instances
384
from airflow.models import Session
385
session = Session()
386
387
recent_tasks = session.query(TaskInstance).filter(
388
TaskInstance.end_date > datetime.now() - timedelta(hours=1),
389
TaskInstance.state == State.SUCCESS
390
).all()
391
392
upload_stats = {'success': 0, 'missing': 0, 'errors': []}
393
394
for ti in recent_tasks:
395
try:
396
# Construct expected log path
397
log_path = f"/airflow/logs/{ti.dag_id}/{ti.task_id}/{ti.execution_date.strftime('%Y-%m-%dT%H:%M:%S+00:00')}/1.log"
398
399
# Check if log exists in HDFS
400
if hook.check_for_path(log_path):
401
upload_stats['success'] += 1
402
else:
403
upload_stats['missing'] += 1
404
upload_stats['errors'].append(f"Missing log: {log_path}")
405
406
except Exception as e:
407
upload_stats['errors'].append(f"Error checking {ti}: {e}")
408
409
session.close()
410
411
# Report results
412
print(f"Log upload status: {upload_stats['success']} successful, {upload_stats['missing']} missing")
413
414
if upload_stats['errors']:
415
print("Errors found:")
416
for error in upload_stats['errors'][:10]: # Show first 10 errors
417
print(f" - {error}")
418
419
return upload_stats
420
421
# Monitoring DAG
422
monitoring_dag = DAG(
423
'hdfs_log_monitoring',
424
default_args={'owner': 'admin'},
425
description='Monitor HDFS log uploads',
426
schedule_interval=timedelta(hours=1),
427
start_date=datetime(2024, 1, 1),
428
catchup=False
429
)
430
431
monitor_task = PythonOperator(
432
task_id='check_log_uploads',
433
python_callable=check_log_upload_status,
434
dag=monitoring_dag
435
)
436
```
437
438
## Integration with Log Analysis
439
440
### Log Aggregation and Analysis
441
442
Use HDFS-stored logs for analysis and monitoring:
443
444
```python
445
from airflow import DAG
446
from airflow.operators.python import PythonOperator
447
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
448
from datetime import datetime, timedelta
449
import json
450
import re
451
452
def analyze_task_performance():
453
"""Analyze task performance from HDFS logs."""
454
455
hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')
456
457
# Analysis results
458
performance_data = {
459
'task_durations': {},
460
'error_patterns': {},
461
'memory_usage': []
462
}
463
464
# Get logs from last 24 hours
465
log_base = '/airflow/logs'
466
467
# Example: analyze specific DAG logs
468
dag_logs = f"{log_base}/data_pipeline"
469
470
try:
471
task_dirs = hook.get_conn().list(dag_logs)
472
473
for task_dir in task_dirs:
474
task_path = f"{dag_logs}/{task_dir}"
475
476
# Get recent execution logs
477
exec_dirs = hook.get_conn().list(task_path)
478
479
for exec_dir in sorted(exec_dirs)[-5:]: # Last 5 executions
480
log_file = f"{task_path}/{exec_dir}/1.log"
481
482
if hook.check_for_path(log_file):
483
# Read and analyze log content
484
log_content = hook.read_file(log_file).decode('utf-8')
485
486
# Extract task duration
487
duration_match = re.search(r'Task exited with return code 0.*?(\d+\.\d+)s', log_content)
488
if duration_match:
489
duration = float(duration_match.group(1))
490
if task_dir not in performance_data['task_durations']:
491
performance_data['task_durations'][task_dir] = []
492
performance_data['task_durations'][task_dir].append(duration)
493
494
# Extract error patterns
495
error_lines = [line for line in log_content.split('\n') if 'ERROR' in line]
496
for error_line in error_lines:
497
error_type = error_line.split('ERROR')[1].strip()[:50]
498
if error_type not in performance_data['error_patterns']:
499
performance_data['error_patterns'][error_type] = 0
500
performance_data['error_patterns'][error_type] += 1
501
502
except Exception as e:
503
print(f"Error analyzing logs: {e}")
504
505
# Generate performance report
506
print("=== Task Performance Analysis ===")
507
for task, durations in performance_data['task_durations'].items():
508
avg_duration = sum(durations) / len(durations)
509
print(f"{task}: avg {avg_duration:.2f}s, executions: {len(durations)}")
510
511
print("\n=== Error Patterns ===")
512
for error, count in sorted(performance_data['error_patterns'].items(), key=lambda x: x[1], reverse=True)[:5]:
513
print(f"{error}: {count} occurrences")
514
515
return performance_data
516
517
# Analysis DAG
518
analysis_dag = DAG(
519
'hdfs_log_analysis',
520
default_args={'owner': 'data_team'},
521
description='Analyze task logs from HDFS',
522
schedule_interval='@daily',
523
start_date=datetime(2024, 1, 1),
524
catchup=False
525
)
526
527
analysis_task = PythonOperator(
528
task_id='analyze_performance',
529
python_callable=analyze_task_performance,
530
dag=analysis_dag
531
)
532
```
533
534
## Troubleshooting
535
536
### Common Configuration Issues
537
538
```python
539
def diagnose_hdfs_logging():
540
"""Diagnose common HDFS logging configuration issues."""
541
542
issues = []
543
544
# Check connection
545
try:
546
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
547
from airflow.configuration import conf
548
549
conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID', fallback='webhdfs_default')
550
hook = WebHDFSHook(webhdfs_conn_id=conn_id)
551
client = hook.get_conn()
552
553
# Test connectivity
554
client.status('/')
555
print(f"✓ HDFS connection '{conn_id}' is working")
556
557
except Exception as e:
558
issues.append(f"✗ HDFS connection failed: {e}")
559
560
# Check configuration
561
try:
562
remote_logging = conf.getboolean('logging', 'remote_logging', fallback=False)
563
if not remote_logging:
564
issues.append("✗ remote_logging is not enabled in airflow.cfg")
565
else:
566
print("✓ Remote logging is enabled")
567
568
remote_base = conf.get('logging', 'remote_base_log_folder', fallback=None)
569
if not remote_base:
570
issues.append("✗ remote_base_log_folder not configured")
571
else:
572
print(f"✓ Remote log folder: {remote_base}")
573
574
except Exception as e:
575
issues.append(f"✗ Configuration check failed: {e}")
576
577
# Report issues
578
if issues:
579
print("\n=== Issues Found ===")
580
for issue in issues:
581
print(issue)
582
else:
583
print("\n✓ All checks passed - HDFS logging should be working")
584
585
# Run diagnostic
586
if __name__ == "__main__":
587
diagnose_hdfs_logging()
588
```