or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-handlers.mdfile-transfer.mdindex.mdxcom-backend.md

asset-handlers.mddocs/

0

# File Asset Handlers

1

2

The Common IO Provider includes asset and dataset handlers for file-based assets that integrate with Airflow's asset system and OpenLineage for data lineage tracking. These handlers support file URI validation, asset creation, and conversion for lineage systems.

3

4

## Capabilities

5

6

### Asset Creation

7

8

Creates file-based assets from file paths with proper URI normalization and validation.

9

10

```python { .api }

11

def create_asset(*, path: str | PosixPath, extra=None) -> Asset:

12

"""

13

Create a file asset from a file path.

14

15

Normalizes file:// URIs and handles various path formats for consistent

16

asset creation across different file path representations.

17

18

Parameters:

19

- path: File path as string or PosixPath object

20

- extra: Optional extra metadata for the asset

21

22

Returns:

23

- Asset: Airflow Asset object with normalized file:// URI

24

"""

25

```

26

27

### URI Validation

28

29

Validates file URI formats to ensure proper asset handling and prevent invalid asset creation.

30

31

```python { .api }

32

def sanitize_uri(uri: SplitResult) -> SplitResult:

33

"""

34

Validate and sanitize file URI format.

35

36

Ensures that file:// URIs contain valid, non-empty paths and meet

37

the requirements for asset URI handling.

38

39

Parameters:

40

- uri: Parsed URI from urllib.parse.urlsplit

41

42

Returns:

43

- SplitResult: Validated URI structure

44

45

Raises:

46

- ValueError: If URI format is invalid or path is empty

47

"""

48

```

49

50

### OpenLineage Conversion

51

52

Converts file assets to OpenLineage dataset format for data lineage tracking integration.

53

54

```python { .api }

55

def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset:

56

"""

57

Convert Airflow Asset to OpenLineage Dataset for lineage tracking.

58

59

Translates Asset with valid AIP-60 URI to OpenLineage format with

60

assistance from the lineage context. Handles various file path formats

61

and network locations.

62

63

Parameters:

64

- asset: Airflow Asset object with file:// URI

65

- lineage_context: OpenLineage context for conversion

66

67

Returns:

68

- OpenLineageDataset: OpenLineage dataset with namespace and name

69

70

Note:

71

- Windows paths are not standardized and can produce unexpected behaviour

72

"""

73

```

74

75

## Usage Examples

76

77

### Creating File Assets

78

79

```python

80

from airflow.providers.common.io.assets.file import create_asset

81

from pathlib import PosixPath

82

83

# Create asset from string path

84

asset1 = create_asset(path="/data/input/dataset.csv")

85

86

# Create asset from PosixPath

87

asset2 = create_asset(path=PosixPath("/data/output/results.json"))

88

89

# Create asset with extra metadata

90

asset3 = create_asset(

91

path="/data/processed/analysis.parquet",

92

extra={"format": "parquet", "schema_version": "1.0"}

93

)

94

95

# Handle various file URI formats

96

asset4 = create_asset(path="file:///absolute/path/data.txt")

97

asset5 = create_asset(path="file://relative/path/data.txt")

98

```

99

100

### Using Assets in DAGs

101

102

```python

103

from airflow import DAG

104

from airflow.operators.python import PythonOperator

105

from airflow.providers.common.io.assets.file import create_asset

106

from datetime import datetime

107

108

# Define file assets

109

input_asset = create_asset(path="/data/input/raw_data.csv")

110

output_asset = create_asset(path="/data/output/processed_data.csv")

111

112

dag = DAG(

113

'asset_example',

114

start_date=datetime(2023, 1, 1),

115

schedule=[input_asset] # Schedule on input asset updates

116

)

117

118

def process_data(**context):

119

# Process input file and create output

120

input_path = "/data/input/raw_data.csv"

121

output_path = "/data/output/processed_data.csv"

122

# ... processing logic ...

123

return output_path

124

125

process_task = PythonOperator(

126

task_id='process_data',

127

python_callable=process_data,

128

outlets=[output_asset], # Declare output asset

129

dag=dag

130

)

131

```

132

133

### Asset URI Validation

134

135

```python

136

from airflow.providers.common.io.assets.file import sanitize_uri

137

from urllib.parse import urlsplit

138

139

# Validate file URIs

140

try:

141

uri = urlsplit("file:///data/valid/path.txt")

142

validated_uri = sanitize_uri(uri)

143

print("Valid URI:", validated_uri.geturl())

144

except ValueError as e:

145

print("Invalid URI:", e)

146

147

# This will raise ValueError

148

try:

149

empty_uri = urlsplit("file://") # Empty path

150

sanitize_uri(empty_uri)

151

except ValueError:

152

print("Empty path not allowed")

153

```

154

155

### OpenLineage Integration

156

157

```python

158

from airflow.providers.common.io.assets.file import convert_asset_to_openlineage, create_asset

159

160

# Create file asset

161

asset = create_asset(path="/data/warehouse/table.parquet")

162

163

# Convert to OpenLineage dataset (typically handled automatically)

164

# This is usually called by Airflow's lineage system

165

def lineage_context_example():

166

lineage_context = {} # Provided by Airflow lineage system

167

ol_dataset = convert_asset_to_openlineage(asset, lineage_context)

168

169

return {

170

"namespace": ol_dataset.namespace,

171

"name": ol_dataset.name

172

}

173

```

174

175

## Provider Registration

176

177

The asset handlers are automatically registered with Airflow through the provider mechanism and handle `file://` scheme URIs:

178

179

### Automatic Registration

180

181

```python

182

# Provider configuration (from provider.yaml)

183

asset_uris = [

184

{

185

"schemes": ["file"],

186

"handler": "airflow.providers.common.io.assets.file.sanitize_uri",

187

"to_openlineage_converter": "airflow.providers.common.io.assets.file.convert_asset_to_openlineage",

188

"factory": "airflow.providers.common.io.assets.file.create_asset",

189

}

190

]

191

```

192

193

### Supported URI Schemes

194

195

- **file**: Local and network file paths

196

- `file:///absolute/path/to/file.txt`

197

- `file://server/share/file.txt` (network paths)

198

- Automatic normalization of various file URI formats

199

200

## Integration with Airflow Assets

201

202

### Asset-Based Scheduling

203

204

```python

205

from airflow import DAG

206

from airflow.providers.common.io.assets.file import create_asset

207

208

# Define data pipeline assets

209

raw_data = create_asset(path="/data/raw/daily_data.csv")

210

clean_data = create_asset(path="/data/clean/daily_data.csv")

211

report_data = create_asset(path="/reports/daily_report.pdf")

212

213

# DAG triggered by raw data updates

214

process_dag = DAG(

215

'data_processing',

216

schedule=[raw_data],

217

start_date=datetime(2023, 1, 1)

218

)

219

220

# DAG triggered by clean data updates

221

report_dag = DAG(

222

'report_generation',

223

schedule=[clean_data],

224

start_date=datetime(2023, 1, 1)

225

)

226

```

227

228

### Asset Lineage Tracking

229

230

File assets automatically participate in Airflow's data lineage tracking:

231

232

- **Input assets**: Declared via DAG `schedule` parameter

233

- **Output assets**: Declared via task `outlets` parameter

234

- **OpenLineage**: Automatic conversion for external lineage systems

235

- **Asset graph**: Visual representation in Airflow UI

236

237

## Path Handling Details

238

239

### URI Normalization

240

241

The `create_asset` function handles various file path formats:

242

243

1. **Full file URIs**: `file:///absolute/path` → normalized

244

2. **Relative file URIs**: `file://relative/path` → converted to absolute

245

3. **Plain paths**: `/absolute/path` → converted to `file:///absolute/path`

246

4. **PosixPath objects**: Converted to string then processed

247

248

### Cross-Platform Considerations

249

250

- **Linux/macOS**: Standard POSIX path handling

251

- **Windows**: Limited support, may produce unexpected behavior

252

- **Network paths**: Support for UNC paths via file:// URIs

253

254

## Version Compatibility

255

256

Asset handlers work with both Airflow 2.x and 3.0+:

257

258

- **Airflow 2.x**: Uses `airflow.datasets.Dataset`

259

- **Airflow 3.0+**: Uses `airflow.sdk.definitions.asset.Asset`

260

- **Automatic imports**: Version detection handles compatibility