or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-apache-hdfs

Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-hdfs@4.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-hdfs@4.10.0

0

# Apache Airflow Providers Apache HDFS

1

2

Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations. This package enables data engineers to interact with HDFS clusters through Airflow workflows, providing hooks for WebHDFS API operations, sensors for monitoring HDFS file states, and task handlers for logging to HDFS storage.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-hdfs

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-apache-hdfs`

10

11

## Core Imports

12

13

```python

14

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

15

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensor

16

```

17

18

For logging integration:

19

20

```python

21

from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler

22

```

23

24

## Basic Usage

25

26

```python

27

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

28

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor

29

from airflow import DAG

30

from airflow.operators.python import PythonOperator

31

from datetime import datetime

32

33

def upload_to_hdfs():

34

# Create WebHDFS hook

35

hook = WebHDFSHook(webhdfs_conn_id='webhdfs_default')

36

37

# Upload a file

38

hook.load_file(

39

source='/local/path/data.csv',

40

destination='/hdfs/path/data.csv',

41

overwrite=True

42

)

43

44

# Check if file exists

45

exists = hook.check_for_path('/hdfs/path/data.csv')

46

print(f"File exists: {exists}")

47

48

# Define DAG

49

dag = DAG(

50

'hdfs_example',

51

start_date=datetime(2024, 1, 1),

52

schedule_interval=None

53

)

54

55

# Sensor to wait for file

56

file_sensor = WebHdfsSensor(

57

task_id='wait_for_file',

58

filepath='/hdfs/input/data_ready.flag',

59

webhdfs_conn_id='webhdfs_default',

60

poke_interval=30,

61

timeout=300,

62

dag=dag

63

)

64

65

# Upload task

66

upload_task = PythonOperator(

67

task_id='upload_to_hdfs',

68

python_callable=upload_to_hdfs,

69

dag=dag

70

)

71

72

file_sensor >> upload_task

73

```

74

75

## Architecture

76

77

The provider follows Airflow's standard architecture patterns:

78

79

- **Hooks**: Connection-based classes for interacting with external systems (WebHDFSHook)

80

- **Sensors**: Long-running tasks that wait for conditions to be met (WebHdfsSensor, MultipleFilesWebHdfsSensor)

81

- **Handlers**: Infrastructure components for logging and task management (HdfsTaskHandler)

82

- **Version Compatibility**: Abstraction layer for supporting multiple Airflow versions

83

84

The package integrates with Airflow's connection system for credential management and supports both insecure and Kerberos-authenticated connections to HDFS clusters.

85

86

## Capabilities

87

88

### WebHDFS Operations

89

90

Core functionality for interacting with HDFS through the WebHDFS REST API, including file operations, path checking, and connection management with support for SSL, authentication, and high availability configurations.

91

92

```python { .api }

93

class WebHDFSHook:

94

def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None): ...

95

def get_conn(self) -> Any: ...

96

def check_for_path(self, hdfs_path: str) -> bool: ...

97

def load_file(self, source: str, destination: str, overwrite: bool = True, parallelism: int = 1, **kwargs) -> None: ...

98

def read_file(self, filename: str) -> bytes: ...

99

```

100

101

[WebHDFS Operations](./webhdfs-operations.md)

102

103

### File System Monitoring

104

105

Sensors for monitoring HDFS file system states, including single file detection and multiple file monitoring capabilities for workflow coordination and data pipeline triggering.

106

107

```python { .api }

108

class WebHdfsSensor:

109

def __init__(self, *, filepath: str, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...

110

def poke(self, context) -> bool: ...

111

112

class MultipleFilesWebHdfsSensor:

113

def __init__(self, *, directory_path: str, expected_filenames, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...

114

def poke(self, context) -> bool: ...

115

```

116

117

[File System Monitoring](./monitoring-sensors.md)

118

119

### HDFS Logging Integration

120

121

Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure.

122

123

```python { .api }

124

class HdfsTaskHandler:

125

def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs): ...

126

def set_context(self, ti, *, identifier: str | None = None) -> None: ...

127

def close(self) -> None: ...

128

```

129

130

[HDFS Logging Integration](./logging-integration.md)

131

132

## Connection Configuration

133

134

The package uses Airflow's connection system with the following configuration:

135

136

- **Connection Type**: `webhdfs`

137

- **Default Connection ID**: `webhdfs_default`

138

- **Host**: HDFS namenode(s), comma-separated for HA setups

139

- **Port**: WebHDFS port (typically 9870 for secure, 50070 for older versions)

140

- **Login**: Username for authentication

141

- **Password**: Password for basic authentication (optional)

142

- **Schema**: WebHDFS path prefix (optional)

143

144

### Extra Configuration Options

145

146

```python { .api }

147

# Connection extras support the following options:

148

{

149

"use_ssl": bool, # Enable HTTPS connections

150

"verify": bool | str, # SSL certificate verification

151

"cert": str, # Client certificate path for mTLS

152

"key": str, # Client key path for mTLS

153

"cookies": dict, # Custom cookies

154

"headers": dict # Custom headers

155

}

156

```

157

158

## Types

159

160

```python { .api }

161

from airflow.utils.context import Context

162

from airflow.models.taskinstance import TaskInstance

163

from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI

164

from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo

165

from collections.abc import Sequence

166

from typing import Any

167

import os

168

from hdfs import InsecureClient

169

from hdfs.ext.kerberos import KerberosClient

170

171

# Connection client types

172

Client = InsecureClient | KerberosClient

173

174

# Task handler return types

175

LogMessages = list[str]

176

LogSourceInfo = list[str]

177

```