Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure. This allows organizations to store all task execution logs in their data lake for long-term retention and analysis.
Main handler class for managing task logs in HDFS with automatic upload and retrieval capabilities.
class HdfsTaskHandler:
"""
HDFS Task Handler for storing and retrieving Airflow task logs in HDFS.
Extends airflow FileTaskHandler and uploads to and reads from HDFS storage.
"""
def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
"""
Initialize HDFS task handler.
Parameters:
base_log_folder: Local base folder for temporary log storage
hdfs_log_folder: HDFS folder for permanent log storage
**kwargs: Additional configuration options including delete_local_copy
"""
def set_context(self, ti, *, identifier: str | None = None) -> None:
"""
Set the task instance context for log handling.
Parameters:
ti: TaskInstance object
identifier: Optional identifier for the context
"""
def close(self) -> None:
"""
Close handler and upload local log file to HDFS.
Automatically uploads logs when upload_on_close is True and marks
handler as closed to prevent duplicate uploads.
"""Low-level class for handling HDFS log upload and retrieval operations.
class HdfsRemoteLogIO:
"""
Handles remote log I/O operations for HDFS storage.
Attributes:
remote_base (str): Remote base path in HDFS
base_log_folder (Path): Local base log folder path
delete_local_copy (bool): Whether to delete local copies after upload
processors (tuple): Log processors (empty tuple)
"""
def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:
"""
Upload the given log path to HDFS remote storage.
Parameters:
path: Local log file path to upload
ti: Task instance for context
"""
def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]:
"""
Read log file from HDFS remote storage.
Parameters:
relative_path: Relative path to log file in HDFS
ti: Task instance for context
Returns:
tuple: (LogSourceInfo, LogMessages) where LogSourceInfo contains metadata
and LogMessages contains the actual log content lines
"""
@property
def hook(self):
"""
Get WebHDFS hook instance for HDFS operations.
Returns:
WebHDFSHook: Configured hook using REMOTE_LOG_CONN_ID from config
"""Configure HDFS logging in airflow.cfg:
[logging]
# Enable remote logging
remote_logging = True
# HDFS connection for log storage
remote_log_conn_id = hdfs_logs
# Base log folder in HDFS
remote_base_log_folder = hdfs://namenode:9000/airflow/logs
# Task handler class
task_log_reader = airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler
# Local log cleanup
delete_local_logs = True
[core]
# Set logging configuration file
logging_config_class = airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIGCreate HDFS connection for logging:
# Connection configuration for HDFS logs
conn_id = 'hdfs_logs'
conn_type = 'webhdfs'
host = 'namenode.hadoop.cluster'
port = 9870
login = 'airflow'
schema = 'webhdfs/v1'
# For Kerberos environments
extras = {
"use_ssl": True,
"verify": True
}Configure the HDFS task handler in your logging configuration:
# In airflow_local_settings.py or custom logging config
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'hdfs_task': {
'class': 'airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler',
'base_log_folder': '/opt/airflow/logs',
'hdfs_log_folder': 'hdfs://namenode:9000/airflow/logs',
'delete_local_copy': True,
}
},
'loggers': {
'airflow.task': {
'handlers': ['hdfs_task'],
'level': 'INFO',
'propagate': False,
}
}
}Enable HDFS logging for all tasks:
# airflow.cfg configuration
[logging]
remote_logging = True
remote_log_conn_id = production_hdfs
remote_base_log_folder = hdfs://cluster:9000/logs/airflow
task_log_reader = airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler
delete_local_logs = True
# DAG tasks will automatically use HDFS logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_task():
print("This log will be stored in HDFS")
# Task logic here
dag = DAG('hdfs_logged_dag', start_date=datetime(2024, 1, 1))
task = PythonOperator(
task_id='logged_task',
python_callable=my_task,
dag=dag
)
# Logs automatically uploaded to HDFS after task completionAccess task logs stored in HDFS programmatically:
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler, HdfsRemoteLogIO
from airflow.models import TaskInstance, DagRun
from pathlib import Path
def retrieve_task_logs(dag_id: str, task_id: str, execution_date: str):
"""Retrieve task logs from HDFS storage."""
# Initialize log I/O handler
log_io = HdfsRemoteLogIO(
remote_base='/airflow/logs',
base_log_folder=Path('/tmp/airflow/logs'),
delete_local_copy=False
)
# Construct log path
log_path = f"{dag_id}/{task_id}/{execution_date}/1.log"
# Create mock task instance for context
class MockTI:
dag_id = dag_id
task_id = task_id
execution_date = execution_date
mock_ti = MockTI()
# Read logs from HDFS
messages, logs = log_io.read(log_path, mock_ti)
return {
'messages': messages,
'logs': logs,
'path': log_path
}
# Example usage
log_data = retrieve_task_logs(
dag_id='data_pipeline',
task_id='extract_data',
execution_date='2024-01-15T10:00:00+00:00'
)
print("Log messages:", log_data['messages'])
print("Log content:", '\n'.join(log_data['logs']))Create custom HDFS task handler with specific settings:
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
import logging
class CustomHdfsTaskHandler(HdfsTaskHandler):
"""Custom HDFS task handler with additional features."""
def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
# Custom configuration
kwargs.setdefault('delete_local_copy', True)
super().__init__(base_log_folder, hdfs_log_folder, **kwargs)
# Add custom formatting
formatter = logging.Formatter(
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
)
if self.handler:
self.handler.setFormatter(formatter)
def close(self):
"""Custom close with additional cleanup."""
# Add custom logic before upload
self.log.info("Uploading task logs to HDFS with custom handler")
super().close()
self.log.info("Log upload completed")
# Use in logging configuration
CUSTOM_LOGGING_CONFIG = {
'handlers': {
'custom_hdfs_task': {
'class': '__main__.CustomHdfsTaskHandler',
'base_log_folder': '/opt/airflow/logs',
'hdfs_log_folder': 'hdfs://namenode:9000/logs/airflow',
'delete_local_copy': True,
}
}
}Implement log retention policies with HDFS:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from datetime import datetime, timedelta
import re
def cleanup_old_logs():
"""Clean up old log files from HDFS based on retention policy."""
hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')
client = hook.get_conn()
# Define retention period (e.g., 90 days)
cutoff_date = datetime.now() - timedelta(days=90)
# List all DAG directories in logs
log_base = '/airflow/logs'
dag_dirs = client.list(log_base)
for dag_dir in dag_dirs:
dag_path = f"{log_base}/{dag_dir}"
try:
# List task directories
task_dirs = client.list(dag_path)
for task_dir in task_dirs:
task_path = f"{dag_path}/{task_dir}"
# List execution date directories
exec_dirs = client.list(task_path)
for exec_dir in exec_dirs:
# Parse execution date from directory name
date_match = re.match(r'(\d{4}-\d{2}-\d{2})', exec_dir)
if date_match:
exec_date = datetime.strptime(date_match.group(1), '%Y-%m-%d')
if exec_date < cutoff_date:
# Delete old log directory
old_path = f"{task_path}/{exec_dir}"
client.delete(old_path, recursive=True)
print(f"Deleted old logs: {old_path}")
except Exception as e:
print(f"Error processing {dag_path}: {e}")
# DAG for log cleanup
cleanup_dag = DAG(
'hdfs_log_cleanup',
default_args={'owner': 'admin'},
description='Clean up old HDFS logs',
schedule_interval='@weekly', # Run weekly
start_date=datetime(2024, 1, 1),
catchup=False
)
cleanup_task = PythonOperator(
task_id='cleanup_old_logs',
python_callable=cleanup_old_logs,
dag=cleanup_dag
)Monitor HDFS log upload success and failures:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.models import TaskInstance, DagRun
from airflow.utils.state import State
from datetime import datetime, timedelta
def check_log_upload_status():
"""Check if logs were successfully uploaded to HDFS."""
hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')
# Get recent task instances
from airflow.models import Session
session = Session()
recent_tasks = session.query(TaskInstance).filter(
TaskInstance.end_date > datetime.now() - timedelta(hours=1),
TaskInstance.state == State.SUCCESS
).all()
upload_stats = {'success': 0, 'missing': 0, 'errors': []}
for ti in recent_tasks:
try:
# Construct expected log path
log_path = f"/airflow/logs/{ti.dag_id}/{ti.task_id}/{ti.execution_date.strftime('%Y-%m-%dT%H:%M:%S+00:00')}/1.log"
# Check if log exists in HDFS
if hook.check_for_path(log_path):
upload_stats['success'] += 1
else:
upload_stats['missing'] += 1
upload_stats['errors'].append(f"Missing log: {log_path}")
except Exception as e:
upload_stats['errors'].append(f"Error checking {ti}: {e}")
session.close()
# Report results
print(f"Log upload status: {upload_stats['success']} successful, {upload_stats['missing']} missing")
if upload_stats['errors']:
print("Errors found:")
for error in upload_stats['errors'][:10]: # Show first 10 errors
print(f" - {error}")
return upload_stats
# Monitoring DAG
monitoring_dag = DAG(
'hdfs_log_monitoring',
default_args={'owner': 'admin'},
description='Monitor HDFS log uploads',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
monitor_task = PythonOperator(
task_id='check_log_uploads',
python_callable=check_log_upload_status,
dag=monitoring_dag
)Use HDFS-stored logs for analysis and monitoring:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from datetime import datetime, timedelta
import json
import re
def analyze_task_performance():
"""Analyze task performance from HDFS logs."""
hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')
# Analysis results
performance_data = {
'task_durations': {},
'error_patterns': {},
'memory_usage': []
}
# Get logs from last 24 hours
log_base = '/airflow/logs'
# Example: analyze specific DAG logs
dag_logs = f"{log_base}/data_pipeline"
try:
task_dirs = hook.get_conn().list(dag_logs)
for task_dir in task_dirs:
task_path = f"{dag_logs}/{task_dir}"
# Get recent execution logs
exec_dirs = hook.get_conn().list(task_path)
for exec_dir in sorted(exec_dirs)[-5:]: # Last 5 executions
log_file = f"{task_path}/{exec_dir}/1.log"
if hook.check_for_path(log_file):
# Read and analyze log content
log_content = hook.read_file(log_file).decode('utf-8')
# Extract task duration
duration_match = re.search(r'Task exited with return code 0.*?(\d+\.\d+)s', log_content)
if duration_match:
duration = float(duration_match.group(1))
if task_dir not in performance_data['task_durations']:
performance_data['task_durations'][task_dir] = []
performance_data['task_durations'][task_dir].append(duration)
# Extract error patterns
error_lines = [line for line in log_content.split('\n') if 'ERROR' in line]
for error_line in error_lines:
error_type = error_line.split('ERROR')[1].strip()[:50]
if error_type not in performance_data['error_patterns']:
performance_data['error_patterns'][error_type] = 0
performance_data['error_patterns'][error_type] += 1
except Exception as e:
print(f"Error analyzing logs: {e}")
# Generate performance report
print("=== Task Performance Analysis ===")
for task, durations in performance_data['task_durations'].items():
avg_duration = sum(durations) / len(durations)
print(f"{task}: avg {avg_duration:.2f}s, executions: {len(durations)}")
print("\n=== Error Patterns ===")
for error, count in sorted(performance_data['error_patterns'].items(), key=lambda x: x[1], reverse=True)[:5]:
print(f"{error}: {count} occurrences")
return performance_data
# Analysis DAG
analysis_dag = DAG(
'hdfs_log_analysis',
default_args={'owner': 'data_team'},
description='Analyze task logs from HDFS',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
analysis_task = PythonOperator(
task_id='analyze_performance',
python_callable=analyze_task_performance,
dag=analysis_dag
)def diagnose_hdfs_logging():
"""Diagnose common HDFS logging configuration issues."""
issues = []
# Check connection
try:
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.configuration import conf
conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID', fallback='webhdfs_default')
hook = WebHDFSHook(webhdfs_conn_id=conn_id)
client = hook.get_conn()
# Test connectivity
client.status('/')
print(f"✓ HDFS connection '{conn_id}' is working")
except Exception as e:
issues.append(f"✗ HDFS connection failed: {e}")
# Check configuration
try:
remote_logging = conf.getboolean('logging', 'remote_logging', fallback=False)
if not remote_logging:
issues.append("✗ remote_logging is not enabled in airflow.cfg")
else:
print("✓ Remote logging is enabled")
remote_base = conf.get('logging', 'remote_base_log_folder', fallback=None)
if not remote_base:
issues.append("✗ remote_base_log_folder not configured")
else:
print(f"✓ Remote log folder: {remote_base}")
except Exception as e:
issues.append(f"✗ Configuration check failed: {e}")
# Report issues
if issues:
print("\n=== Issues Found ===")
for issue in issues:
print(issue)
else:
print("\n✓ All checks passed - HDFS logging should be working")
# Run diagnostic
if __name__ == "__main__":
diagnose_hdfs_logging()Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hdfs