or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-common-io

Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-common-io@1.6.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-common-io@1.6.0

0

# Apache Airflow Common IO Provider

1

2

A provider package for Apache Airflow that enables unified I/O operations and file transfer capabilities across different storage systems. This package provides operators for file transfers, XCom backends for object storage, asset/dataset handlers for file-based assets, and configuration options for storage integration.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-common-io

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-common-io`

9

- **Airflow Version**: Requires Apache Airflow 2.10.0+

10

11

## Core Imports

12

13

```python

14

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator

15

from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend

16

from airflow.providers.common.io.assets.file import create_asset, sanitize_uri, convert_asset_to_openlineage

17

```

18

19

## Basic Usage

20

21

```python

22

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator

23

from airflow import DAG

24

from datetime import datetime

25

26

# Create a DAG for file transfer

27

dag = DAG(

28

'file_transfer_example',

29

start_date=datetime(2023, 1, 1),

30

schedule_interval=None

31

)

32

33

# Transfer a file from source to destination

34

transfer_task = FileTransferOperator(

35

task_id='transfer_file',

36

src='/path/to/source/file.txt',

37

dst='/path/to/destination/file.txt',

38

overwrite=True,

39

dag=dag

40

)

41

42

# Use with object storage paths and connections

43

s3_transfer = FileTransferOperator(

44

task_id='s3_transfer',

45

src='s3://source-bucket/file.txt',

46

dst='s3://dest-bucket/file.txt',

47

source_conn_id='aws_default',

48

dest_conn_id='aws_default',

49

dag=dag

50

)

51

```

52

53

## Architecture

54

55

The Common IO Provider follows Apache Airflow's provider pattern and integrates with multiple Airflow subsystems:

56

57

- **Operators**: File transfer operations using ObjectStoragePath for unified storage access

58

- **XCom Backend**: Configurable object storage backend for XCom data with size-based routing

59

- **Asset Handlers**: File asset creation, URI validation, and OpenLineage integration

60

- **Configuration**: Centralized settings for object storage paths, thresholds, and compression

61

62

The package provides version compatibility handling for both Airflow 2.x and 3.0+, ensuring consistent behavior across versions.

63

64

## Capabilities

65

66

### File Transfer Operations

67

68

File transfer operator for copying files between different storage systems with support for local filesystem, cloud storage (S3, GCS, Azure), and other fsspec-compatible storage backends.

69

70

```python { .api }

71

class FileTransferOperator(BaseOperator):

72

def __init__(

73

self,

74

*,

75

src: str | ObjectStoragePath,

76

dst: str | ObjectStoragePath,

77

source_conn_id: str | None = None,

78

dest_conn_id: str | None = None,

79

overwrite: bool = False,

80

**kwargs

81

): ...

82

83

def execute(self, context: Context) -> None: ...

84

def get_openlineage_facets_on_start(self) -> OperatorLineage: ...

85

```

86

87

[File Transfer Operations](./file-transfer.md)

88

89

### XCom Object Storage Backend

90

91

XCom backend that intelligently stores data in object storage or database based on configurable size thresholds, with compression support and automatic cleanup.

92

93

```python { .api }

94

class XComObjectStorageBackend(BaseXCom):

95

@staticmethod

96

def serialize_value(

97

value: T,

98

*,

99

key: str | None = None,

100

task_id: str | None = None,

101

dag_id: str | None = None,

102

run_id: str | None = None,

103

map_index: int | None = None,

104

) -> bytes | str: ...

105

106

@staticmethod

107

def deserialize_value(result) -> Any: ...

108

109

@staticmethod

110

def purge(xcom: XComResult, session: Session | None = None) -> None: ...

111

```

112

113

[XCom Object Storage Backend](./xcom-backend.md)

114

115

### File Asset Handlers

116

117

Asset and dataset handlers for file-based assets with URI validation, asset creation, and OpenLineage conversion for data lineage tracking.

118

119

```python { .api }

120

def create_asset(*, path: str | PosixPath, extra=None) -> Asset: ...

121

def sanitize_uri(uri: SplitResult) -> SplitResult: ...

122

def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset: ...

123

```

124

125

[File Asset Handlers](./asset-handlers.md)

126

127

## Configuration

128

129

The Common IO Provider supports configuration options for XCom object storage behavior:

130

131

### XCom Object Storage Settings

132

133

- `common.io.xcom_objectstorage_path`: Path to object storage location for XComs (e.g., "s3://conn_id@bucket/path")

134

- `common.io.xcom_objectstorage_threshold`: Size threshold in bytes (-1: always database, 0: always object storage, positive: threshold)

135

- `common.io.xcom_objectstorage_compression`: Compression algorithm (gz, bz2, lzma, snappy, zip)

136

137

These settings are configured in `airflow.cfg` or via environment variables following Airflow's configuration patterns.

138

139

## Types

140

141

Core types used throughout the Common IO Provider API:

142

143

```python { .api }

144

# Airflow Context type for task execution

145

from airflow.sdk import Context # Airflow 3.0+

146

# or from airflow.utils.context import Context # Airflow 2.x

147

148

# Object storage path handling

149

if AIRFLOW_V_3_0_PLUS:

150

from airflow.sdk import ObjectStoragePath

151

else:

152

from airflow.io.path import ObjectStoragePath

153

154

# Asset/Dataset types (version-dependent)

155

if AIRFLOW_V_3_0_PLUS:

156

from airflow.sdk.definitions.asset import Asset

157

else:

158

from airflow.datasets import Dataset as Asset

159

160

# XCom result type

161

from airflow.sdk.execution_time.comms import XComResult # Airflow 3.0+

162

# or from airflow.models.xcom import XCom as XComResult # Airflow 2.x

163

164

# Database session type

165

from sqlalchemy.orm import Session

166

167

# Standard library types

168

from pathlib import PosixPath

169

from urllib.parse import SplitResult

170

from typing import Any, TypeVar

171

172

# OpenLineage integration

173

from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset

174

from airflow.providers.openlineage.extractors import OperatorLineage

175

176

# Generic type variable

177

T = TypeVar("T")

178

```