or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlogging-integration.mdmonitoring-sensors.mdwebhdfs-operations.md

logging-integration.mddocs/

0

# HDFS Logging Integration

1

2

Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure. This allows organizations to store all task execution logs in their data lake for long-term retention and analysis.

3

4

## Capabilities

5

6

### HDFS Task Handler

7

8

Main handler class for managing task logs in HDFS with automatic upload and retrieval capabilities.

9

10

```python { .api }

11

class HdfsTaskHandler:

12

"""

13

HDFS Task Handler for storing and retrieving Airflow task logs in HDFS.

14

15

Extends airflow FileTaskHandler and uploads to and reads from HDFS storage.

16

"""

17

18

def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):

19

"""

20

Initialize HDFS task handler.

21

22

Parameters:

23

base_log_folder: Local base folder for temporary log storage

24

hdfs_log_folder: HDFS folder for permanent log storage

25

**kwargs: Additional configuration options including delete_local_copy

26

"""

27

28

def set_context(self, ti, *, identifier: str | None = None) -> None:

29

"""

30

Set the task instance context for log handling.

31

32

Parameters:

33

ti: TaskInstance object

34

identifier: Optional identifier for the context

35

"""

36

37

def close(self) -> None:

38

"""

39

Close handler and upload local log file to HDFS.

40

41

Automatically uploads logs when upload_on_close is True and marks

42

handler as closed to prevent duplicate uploads.

43

"""

44

```

45

46

### Remote Log I/O Operations

47

48

Low-level class for handling HDFS log upload and retrieval operations.

49

50

```python { .api }

51

class HdfsRemoteLogIO:

52

"""

53

Handles remote log I/O operations for HDFS storage.

54

55

Attributes:

56

remote_base (str): Remote base path in HDFS

57

base_log_folder (Path): Local base log folder path

58

delete_local_copy (bool): Whether to delete local copies after upload

59

processors (tuple): Log processors (empty tuple)

60

"""

61

62

def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:

63

"""

64

Upload the given log path to HDFS remote storage.

65

66

Parameters:

67

path: Local log file path to upload

68

ti: Task instance for context

69

"""

70

71

def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]:

72

"""

73

Read log file from HDFS remote storage.

74

75

Parameters:

76

relative_path: Relative path to log file in HDFS

77

ti: Task instance for context

78

79

Returns:

80

tuple: (LogSourceInfo, LogMessages) where LogSourceInfo contains metadata

81

and LogMessages contains the actual log content lines

82

"""

83

84

@property

85

def hook(self):

86

"""

87

Get WebHDFS hook instance for HDFS operations.

88

89

Returns:

90

WebHDFSHook: Configured hook using REMOTE_LOG_CONN_ID from config

91

"""

92

```

93

94

## Configuration

95

96

### Airflow Configuration

97

98

Configure HDFS logging in `airflow.cfg`:

99

100

```ini

101

[logging]

102

# Enable remote logging

103

remote_logging = True

104

105

# HDFS connection for log storage

106

remote_log_conn_id = hdfs_logs

107

108

# Base log folder in HDFS

109

remote_base_log_folder = hdfs://namenode:9000/airflow/logs

110

111

# Task handler class

112

task_log_reader = airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler

113

114

# Local log cleanup

115

delete_local_logs = True

116

117

[core]

118

# Set logging configuration file

119

logging_config_class = airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG

120

```

121

122

### Connection Setup

123

124

Create HDFS connection for logging:

125

126

```python

127

# Connection configuration for HDFS logs

128

conn_id = 'hdfs_logs'

129

conn_type = 'webhdfs'

130

host = 'namenode.hadoop.cluster'

131

port = 9870

132

login = 'airflow'

133

schema = 'webhdfs/v1'

134

135

# For Kerberos environments

136

extras = {

137

"use_ssl": True,

138

"verify": True

139

}

140

```

141

142

### Handler Configuration

143

144

Configure the HDFS task handler in your logging configuration:

145

146

```python

147

# In airflow_local_settings.py or custom logging config

148

149

LOGGING_CONFIG = {

150

'version': 1,

151

'disable_existing_loggers': False,

152

'handlers': {

153

'hdfs_task': {

154

'class': 'airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler',

155

'base_log_folder': '/opt/airflow/logs',

156

'hdfs_log_folder': 'hdfs://namenode:9000/airflow/logs',

157

'delete_local_copy': True,

158

}

159

},

160

'loggers': {

161

'airflow.task': {

162

'handlers': ['hdfs_task'],

163

'level': 'INFO',

164

'propagate': False,

165

}

166

}

167

}

168

```

169

170

## Usage Examples

171

172

### Basic HDFS Logging Setup

173

174

Enable HDFS logging for all tasks:

175

176

```python

177

# airflow.cfg configuration

178

[logging]

179

remote_logging = True

180

remote_log_conn_id = production_hdfs

181

remote_base_log_folder = hdfs://cluster:9000/logs/airflow

182

task_log_reader = airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler

183

delete_local_logs = True

184

185

# DAG tasks will automatically use HDFS logging

186

from airflow import DAG

187

from airflow.operators.python import PythonOperator

188

from datetime import datetime

189

190

def my_task():

191

print("This log will be stored in HDFS")

192

# Task logic here

193

194

dag = DAG('hdfs_logged_dag', start_date=datetime(2024, 1, 1))

195

196

task = PythonOperator(

197

task_id='logged_task',

198

python_callable=my_task,

199

dag=dag

200

)

201

# Logs automatically uploaded to HDFS after task completion

202

```

203

204

### Programmatic Log Access

205

206

Access task logs stored in HDFS programmatically:

207

208

```python

209

from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler, HdfsRemoteLogIO

210

from airflow.models import TaskInstance, DagRun

211

from pathlib import Path

212

213

def retrieve_task_logs(dag_id: str, task_id: str, execution_date: str):

214

"""Retrieve task logs from HDFS storage."""

215

216

# Initialize log I/O handler

217

log_io = HdfsRemoteLogIO(

218

remote_base='/airflow/logs',

219

base_log_folder=Path('/tmp/airflow/logs'),

220

delete_local_copy=False

221

)

222

223

# Construct log path

224

log_path = f"{dag_id}/{task_id}/{execution_date}/1.log"

225

226

# Create mock task instance for context

227

class MockTI:

228

dag_id = dag_id

229

task_id = task_id

230

execution_date = execution_date

231

232

mock_ti = MockTI()

233

234

# Read logs from HDFS

235

messages, logs = log_io.read(log_path, mock_ti)

236

237

return {

238

'messages': messages,

239

'logs': logs,

240

'path': log_path

241

}

242

243

# Example usage

244

log_data = retrieve_task_logs(

245

dag_id='data_pipeline',

246

task_id='extract_data',

247

execution_date='2024-01-15T10:00:00+00:00'

248

)

249

250

print("Log messages:", log_data['messages'])

251

print("Log content:", '\n'.join(log_data['logs']))

252

```

253

254

### Custom Handler Configuration

255

256

Create custom HDFS task handler with specific settings:

257

258

```python

259

from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler

260

import logging

261

262

class CustomHdfsTaskHandler(HdfsTaskHandler):

263

"""Custom HDFS task handler with additional features."""

264

265

def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):

266

# Custom configuration

267

kwargs.setdefault('delete_local_copy', True)

268

super().__init__(base_log_folder, hdfs_log_folder, **kwargs)

269

270

# Add custom formatting

271

formatter = logging.Formatter(

272

'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'

273

)

274

if self.handler:

275

self.handler.setFormatter(formatter)

276

277

def close(self):

278

"""Custom close with additional cleanup."""

279

# Add custom logic before upload

280

self.log.info("Uploading task logs to HDFS with custom handler")

281

super().close()

282

self.log.info("Log upload completed")

283

284

# Use in logging configuration

285

CUSTOM_LOGGING_CONFIG = {

286

'handlers': {

287

'custom_hdfs_task': {

288

'class': '__main__.CustomHdfsTaskHandler',

289

'base_log_folder': '/opt/airflow/logs',

290

'hdfs_log_folder': 'hdfs://namenode:9000/logs/airflow',

291

'delete_local_copy': True,

292

}

293

}

294

}

295

```

296

297

### Log Retention and Management

298

299

Implement log retention policies with HDFS:

300

301

```python

302

from airflow import DAG

303

from airflow.operators.python import PythonOperator

304

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

305

from datetime import datetime, timedelta

306

import re

307

308

def cleanup_old_logs():

309

"""Clean up old log files from HDFS based on retention policy."""

310

311

hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')

312

client = hook.get_conn()

313

314

# Define retention period (e.g., 90 days)

315

cutoff_date = datetime.now() - timedelta(days=90)

316

317

# List all DAG directories in logs

318

log_base = '/airflow/logs'

319

dag_dirs = client.list(log_base)

320

321

for dag_dir in dag_dirs:

322

dag_path = f"{log_base}/{dag_dir}"

323

324

try:

325

# List task directories

326

task_dirs = client.list(dag_path)

327

328

for task_dir in task_dirs:

329

task_path = f"{dag_path}/{task_dir}"

330

331

# List execution date directories

332

exec_dirs = client.list(task_path)

333

334

for exec_dir in exec_dirs:

335

# Parse execution date from directory name

336

date_match = re.match(r'(\d{4}-\d{2}-\d{2})', exec_dir)

337

if date_match:

338

exec_date = datetime.strptime(date_match.group(1), '%Y-%m-%d')

339

340

if exec_date < cutoff_date:

341

# Delete old log directory

342

old_path = f"{task_path}/{exec_dir}"

343

client.delete(old_path, recursive=True)

344

print(f"Deleted old logs: {old_path}")

345

346

except Exception as e:

347

print(f"Error processing {dag_path}: {e}")

348

349

# DAG for log cleanup

350

cleanup_dag = DAG(

351

'hdfs_log_cleanup',

352

default_args={'owner': 'admin'},

353

description='Clean up old HDFS logs',

354

schedule_interval='@weekly', # Run weekly

355

start_date=datetime(2024, 1, 1),

356

catchup=False

357

)

358

359

cleanup_task = PythonOperator(

360

task_id='cleanup_old_logs',

361

python_callable=cleanup_old_logs,

362

dag=cleanup_dag

363

)

364

```

365

366

### Monitoring Log Upload Status

367

368

Monitor HDFS log upload success and failures:

369

370

```python

371

from airflow import DAG

372

from airflow.operators.python import PythonOperator

373

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

374

from airflow.models import TaskInstance, DagRun

375

from airflow.utils.state import State

376

from datetime import datetime, timedelta

377

378

def check_log_upload_status():

379

"""Check if logs were successfully uploaded to HDFS."""

380

381

hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')

382

383

# Get recent task instances

384

from airflow.models import Session

385

session = Session()

386

387

recent_tasks = session.query(TaskInstance).filter(

388

TaskInstance.end_date > datetime.now() - timedelta(hours=1),

389

TaskInstance.state == State.SUCCESS

390

).all()

391

392

upload_stats = {'success': 0, 'missing': 0, 'errors': []}

393

394

for ti in recent_tasks:

395

try:

396

# Construct expected log path

397

log_path = f"/airflow/logs/{ti.dag_id}/{ti.task_id}/{ti.execution_date.strftime('%Y-%m-%dT%H:%M:%S+00:00')}/1.log"

398

399

# Check if log exists in HDFS

400

if hook.check_for_path(log_path):

401

upload_stats['success'] += 1

402

else:

403

upload_stats['missing'] += 1

404

upload_stats['errors'].append(f"Missing log: {log_path}")

405

406

except Exception as e:

407

upload_stats['errors'].append(f"Error checking {ti}: {e}")

408

409

session.close()

410

411

# Report results

412

print(f"Log upload status: {upload_stats['success']} successful, {upload_stats['missing']} missing")

413

414

if upload_stats['errors']:

415

print("Errors found:")

416

for error in upload_stats['errors'][:10]: # Show first 10 errors

417

print(f" - {error}")

418

419

return upload_stats

420

421

# Monitoring DAG

422

monitoring_dag = DAG(

423

'hdfs_log_monitoring',

424

default_args={'owner': 'admin'},

425

description='Monitor HDFS log uploads',

426

schedule_interval=timedelta(hours=1),

427

start_date=datetime(2024, 1, 1),

428

catchup=False

429

)

430

431

monitor_task = PythonOperator(

432

task_id='check_log_uploads',

433

python_callable=check_log_upload_status,

434

dag=monitoring_dag

435

)

436

```

437

438

## Integration with Log Analysis

439

440

### Log Aggregation and Analysis

441

442

Use HDFS-stored logs for analysis and monitoring:

443

444

```python

445

from airflow import DAG

446

from airflow.operators.python import PythonOperator

447

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

448

from datetime import datetime, timedelta

449

import json

450

import re

451

452

def analyze_task_performance():

453

"""Analyze task performance from HDFS logs."""

454

455

hook = WebHDFSHook(webhdfs_conn_id='hdfs_logs')

456

457

# Analysis results

458

performance_data = {

459

'task_durations': {},

460

'error_patterns': {},

461

'memory_usage': []

462

}

463

464

# Get logs from last 24 hours

465

log_base = '/airflow/logs'

466

467

# Example: analyze specific DAG logs

468

dag_logs = f"{log_base}/data_pipeline"

469

470

try:

471

task_dirs = hook.get_conn().list(dag_logs)

472

473

for task_dir in task_dirs:

474

task_path = f"{dag_logs}/{task_dir}"

475

476

# Get recent execution logs

477

exec_dirs = hook.get_conn().list(task_path)

478

479

for exec_dir in sorted(exec_dirs)[-5:]: # Last 5 executions

480

log_file = f"{task_path}/{exec_dir}/1.log"

481

482

if hook.check_for_path(log_file):

483

# Read and analyze log content

484

log_content = hook.read_file(log_file).decode('utf-8')

485

486

# Extract task duration

487

duration_match = re.search(r'Task exited with return code 0.*?(\d+\.\d+)s', log_content)

488

if duration_match:

489

duration = float(duration_match.group(1))

490

if task_dir not in performance_data['task_durations']:

491

performance_data['task_durations'][task_dir] = []

492

performance_data['task_durations'][task_dir].append(duration)

493

494

# Extract error patterns

495

error_lines = [line for line in log_content.split('\n') if 'ERROR' in line]

496

for error_line in error_lines:

497

error_type = error_line.split('ERROR')[1].strip()[:50]

498

if error_type not in performance_data['error_patterns']:

499

performance_data['error_patterns'][error_type] = 0

500

performance_data['error_patterns'][error_type] += 1

501

502

except Exception as e:

503

print(f"Error analyzing logs: {e}")

504

505

# Generate performance report

506

print("=== Task Performance Analysis ===")

507

for task, durations in performance_data['task_durations'].items():

508

avg_duration = sum(durations) / len(durations)

509

print(f"{task}: avg {avg_duration:.2f}s, executions: {len(durations)}")

510

511

print("\n=== Error Patterns ===")

512

for error, count in sorted(performance_data['error_patterns'].items(), key=lambda x: x[1], reverse=True)[:5]:

513

print(f"{error}: {count} occurrences")

514

515

return performance_data

516

517

# Analysis DAG

518

analysis_dag = DAG(

519

'hdfs_log_analysis',

520

default_args={'owner': 'data_team'},

521

description='Analyze task logs from HDFS',

522

schedule_interval='@daily',

523

start_date=datetime(2024, 1, 1),

524

catchup=False

525

)

526

527

analysis_task = PythonOperator(

528

task_id='analyze_performance',

529

python_callable=analyze_task_performance,

530

dag=analysis_dag

531

)

532

```

533

534

## Troubleshooting

535

536

### Common Configuration Issues

537

538

```python

539

def diagnose_hdfs_logging():

540

"""Diagnose common HDFS logging configuration issues."""

541

542

issues = []

543

544

# Check connection

545

try:

546

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

547

from airflow.configuration import conf

548

549

conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID', fallback='webhdfs_default')

550

hook = WebHDFSHook(webhdfs_conn_id=conn_id)

551

client = hook.get_conn()

552

553

# Test connectivity

554

client.status('/')

555

print(f"✓ HDFS connection '{conn_id}' is working")

556

557

except Exception as e:

558

issues.append(f"✗ HDFS connection failed: {e}")

559

560

# Check configuration

561

try:

562

remote_logging = conf.getboolean('logging', 'remote_logging', fallback=False)

563

if not remote_logging:

564

issues.append("✗ remote_logging is not enabled in airflow.cfg")

565

else:

566

print("✓ Remote logging is enabled")

567

568

remote_base = conf.get('logging', 'remote_base_log_folder', fallback=None)

569

if not remote_base:

570

issues.append("✗ remote_base_log_folder not configured")

571

else:

572

print(f"✓ Remote log folder: {remote_base}")

573

574

except Exception as e:

575

issues.append(f"✗ Configuration check failed: {e}")

576

577

# Report issues

578

if issues:

579

print("\n=== Issues Found ===")

580

for issue in issues:

581

print(issue)

582

else:

583

print("\n✓ All checks passed - HDFS logging should be working")

584

585

# Run diagnostic

586

if __name__ == "__main__":

587

diagnose_hdfs_logging()

588

```