0
# File Asset Handlers
1
2
The Common IO Provider includes asset and dataset handlers for file-based assets that integrate with Airflow's asset system and OpenLineage for data lineage tracking. These handlers support file URI validation, asset creation, and conversion for lineage systems.
3
4
## Capabilities
5
6
### Asset Creation
7
8
Creates file-based assets from file paths with proper URI normalization and validation.
9
10
```python { .api }
11
def create_asset(*, path: str | PosixPath, extra=None) -> Asset:
12
"""
13
Create a file asset from a file path.
14
15
Normalizes file:// URIs and handles various path formats for consistent
16
asset creation across different file path representations.
17
18
Parameters:
19
- path: File path as string or PosixPath object
20
- extra: Optional extra metadata for the asset
21
22
Returns:
23
- Asset: Airflow Asset object with normalized file:// URI
24
"""
25
```
26
27
### URI Validation
28
29
Validates file URI formats to ensure proper asset handling and prevent invalid asset creation.
30
31
```python { .api }
32
def sanitize_uri(uri: SplitResult) -> SplitResult:
33
"""
34
Validate and sanitize file URI format.
35
36
Ensures that file:// URIs contain valid, non-empty paths and meet
37
the requirements for asset URI handling.
38
39
Parameters:
40
- uri: Parsed URI from urllib.parse.urlsplit
41
42
Returns:
43
- SplitResult: Validated URI structure
44
45
Raises:
46
- ValueError: If URI format is invalid or path is empty
47
"""
48
```
49
50
### OpenLineage Conversion
51
52
Converts file assets to OpenLineage dataset format for data lineage tracking integration.
53
54
```python { .api }
55
def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset:
56
"""
57
Convert Airflow Asset to OpenLineage Dataset for lineage tracking.
58
59
Translates Asset with valid AIP-60 URI to OpenLineage format with
60
assistance from the lineage context. Handles various file path formats
61
and network locations.
62
63
Parameters:
64
- asset: Airflow Asset object with file:// URI
65
- lineage_context: OpenLineage context for conversion
66
67
Returns:
68
- OpenLineageDataset: OpenLineage dataset with namespace and name
69
70
Note:
71
- Windows paths are not standardized and can produce unexpected behaviour
72
"""
73
```
74
75
## Usage Examples
76
77
### Creating File Assets
78
79
```python
80
from airflow.providers.common.io.assets.file import create_asset
81
from pathlib import PosixPath
82
83
# Create asset from string path
84
asset1 = create_asset(path="/data/input/dataset.csv")
85
86
# Create asset from PosixPath
87
asset2 = create_asset(path=PosixPath("/data/output/results.json"))
88
89
# Create asset with extra metadata
90
asset3 = create_asset(
91
path="/data/processed/analysis.parquet",
92
extra={"format": "parquet", "schema_version": "1.0"}
93
)
94
95
# Handle various file URI formats
96
asset4 = create_asset(path="file:///absolute/path/data.txt")
97
asset5 = create_asset(path="file://relative/path/data.txt")
98
```
99
100
### Using Assets in DAGs
101
102
```python
103
from airflow import DAG
104
from airflow.operators.python import PythonOperator
105
from airflow.providers.common.io.assets.file import create_asset
106
from datetime import datetime
107
108
# Define file assets
109
input_asset = create_asset(path="/data/input/raw_data.csv")
110
output_asset = create_asset(path="/data/output/processed_data.csv")
111
112
dag = DAG(
113
'asset_example',
114
start_date=datetime(2023, 1, 1),
115
schedule=[input_asset] # Schedule on input asset updates
116
)
117
118
def process_data(**context):
119
# Process input file and create output
120
input_path = "/data/input/raw_data.csv"
121
output_path = "/data/output/processed_data.csv"
122
# ... processing logic ...
123
return output_path
124
125
process_task = PythonOperator(
126
task_id='process_data',
127
python_callable=process_data,
128
outlets=[output_asset], # Declare output asset
129
dag=dag
130
)
131
```
132
133
### Asset URI Validation
134
135
```python
136
from airflow.providers.common.io.assets.file import sanitize_uri
137
from urllib.parse import urlsplit
138
139
# Validate file URIs
140
try:
141
uri = urlsplit("file:///data/valid/path.txt")
142
validated_uri = sanitize_uri(uri)
143
print("Valid URI:", validated_uri.geturl())
144
except ValueError as e:
145
print("Invalid URI:", e)
146
147
# This will raise ValueError
148
try:
149
empty_uri = urlsplit("file://") # Empty path
150
sanitize_uri(empty_uri)
151
except ValueError:
152
print("Empty path not allowed")
153
```
154
155
### OpenLineage Integration
156
157
```python
158
from airflow.providers.common.io.assets.file import convert_asset_to_openlineage, create_asset
159
160
# Create file asset
161
asset = create_asset(path="/data/warehouse/table.parquet")
162
163
# Convert to OpenLineage dataset (typically handled automatically)
164
# This is usually called by Airflow's lineage system
165
def lineage_context_example():
166
lineage_context = {} # Provided by Airflow lineage system
167
ol_dataset = convert_asset_to_openlineage(asset, lineage_context)
168
169
return {
170
"namespace": ol_dataset.namespace,
171
"name": ol_dataset.name
172
}
173
```
174
175
## Provider Registration
176
177
The asset handlers are automatically registered with Airflow through the provider mechanism and handle `file://` scheme URIs:
178
179
### Automatic Registration
180
181
```python
182
# Provider configuration (from provider.yaml)
183
asset_uris = [
184
{
185
"schemes": ["file"],
186
"handler": "airflow.providers.common.io.assets.file.sanitize_uri",
187
"to_openlineage_converter": "airflow.providers.common.io.assets.file.convert_asset_to_openlineage",
188
"factory": "airflow.providers.common.io.assets.file.create_asset",
189
}
190
]
191
```
192
193
### Supported URI Schemes
194
195
- **file**: Local and network file paths
196
- `file:///absolute/path/to/file.txt`
197
- `file://server/share/file.txt` (network paths)
198
- Automatic normalization of various file URI formats
199
200
## Integration with Airflow Assets
201
202
### Asset-Based Scheduling
203
204
```python
205
from airflow import DAG
206
from airflow.providers.common.io.assets.file import create_asset
207
208
# Define data pipeline assets
209
raw_data = create_asset(path="/data/raw/daily_data.csv")
210
clean_data = create_asset(path="/data/clean/daily_data.csv")
211
report_data = create_asset(path="/reports/daily_report.pdf")
212
213
# DAG triggered by raw data updates
214
process_dag = DAG(
215
'data_processing',
216
schedule=[raw_data],
217
start_date=datetime(2023, 1, 1)
218
)
219
220
# DAG triggered by clean data updates
221
report_dag = DAG(
222
'report_generation',
223
schedule=[clean_data],
224
start_date=datetime(2023, 1, 1)
225
)
226
```
227
228
### Asset Lineage Tracking
229
230
File assets automatically participate in Airflow's data lineage tracking:
231
232
- **Input assets**: Declared via DAG `schedule` parameter
233
- **Output assets**: Declared via task `outlets` parameter
234
- **OpenLineage**: Automatic conversion for external lineage systems
235
- **Asset graph**: Visual representation in Airflow UI
236
237
## Path Handling Details
238
239
### URI Normalization
240
241
The `create_asset` function handles various file path formats:
242
243
1. **Full file URIs**: `file:///absolute/path` → normalized
244
2. **Relative file URIs**: `file://relative/path` → converted to absolute
245
3. **Plain paths**: `/absolute/path` → converted to `file:///absolute/path`
246
4. **PosixPath objects**: Converted to string then processed
247
248
### Cross-Platform Considerations
249
250
- **Linux/macOS**: Standard POSIX path handling
251
- **Windows**: Limited support, may produce unexpected behavior
252
- **Network paths**: Support for UNC paths via file:// URIs
253
254
## Version Compatibility
255
256
Asset handlers work with both Airflow 2.x and 3.0+:
257
258
- **Airflow 2.x**: Uses `airflow.datasets.Dataset`
259
- **Airflow 3.0+**: Uses `airflow.sdk.definitions.asset.Asset`
260
- **Automatic imports**: Version detection handles compatibility