or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-handlers.mdfile-transfer.mdindex.mdxcom-backend.md

file-transfer.mddocs/

0

# File Transfer Operations

1

2

The FileTransferOperator provides unified file copying capabilities across different storage systems, supporting local filesystems, cloud storage, and any fsspec-compatible storage backend. It handles streaming transfers for large files and integrates with Airflow's templating and OpenLineage systems.

3

4

## Capabilities

5

6

### File Transfer Operator

7

8

Copies files from source to destination with support for different storage systems, connection management, and overwrite protection.

9

10

```python { .api }

11

class FileTransferOperator(BaseOperator):

12

"""

13

Copies a file from a source to a destination.

14

15

This streams the file from the source to the destination if required,

16

so it does not need to fit into memory.

17

"""

18

19

template_fields: Sequence[str] = ("src", "dst")

20

21

def __init__(

22

self,

23

*,

24

src: str | ObjectStoragePath,

25

dst: str | ObjectStoragePath,

26

source_conn_id: str | None = None,

27

dest_conn_id: str | None = None,

28

overwrite: bool = False,

29

**kwargs

30

) -> None:

31

"""

32

Initialize FileTransferOperator.

33

34

Parameters:

35

- src: The source file path or ObjectStoragePath object

36

- dst: The destination file path or ObjectStoragePath object

37

- source_conn_id: The optional source connection id

38

- dest_conn_id: The optional destination connection id

39

- overwrite: Whether to overwrite existing destination files

40

"""

41

42

def execute(self, context: Context) -> None:

43

"""

44

Execute the file transfer operation.

45

46

Parameters:

47

- context: Airflow task execution context

48

49

Raises:

50

- ValueError: If destination exists and overwrite is False

51

"""

52

53

def get_openlineage_facets_on_start(self) -> OperatorLineage:

54

"""

55

Get OpenLineage facets for data lineage tracking.

56

57

Returns:

58

- OperatorLineage: Input and output datasets for lineage

59

"""

60

61

@staticmethod

62

def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> ObjectStoragePath:

63

"""

64

Convert string path to ObjectStoragePath with optional connection.

65

66

Parameters:

67

- path: File path as string or ObjectStoragePath

68

- conn_id: Optional Airflow connection ID

69

70

Returns:

71

- ObjectStoragePath: Resolved path object

72

"""

73

```

74

75

### Usage Examples

76

77

#### Basic File Transfer

78

79

```python

80

from airflow.providers.common.io.operators.file_transfer import FileTransferOperator

81

from airflow import DAG

82

from datetime import datetime

83

84

dag = DAG('file_transfer', start_date=datetime(2023, 1, 1))

85

86

# Simple local file copy

87

local_copy = FileTransferOperator(

88

task_id='copy_local_file',

89

src='/tmp/source.txt',

90

dst='/tmp/destination.txt',

91

overwrite=True,

92

dag=dag

93

)

94

```

95

96

#### Cloud Storage Transfer

97

98

```python

99

# Transfer between S3 buckets

100

s3_transfer = FileTransferOperator(

101

task_id='s3_to_s3_transfer',

102

src='s3://source-bucket/data/file.csv',

103

dst='s3://dest-bucket/processed/file.csv',

104

source_conn_id='aws_default',

105

dest_conn_id='aws_default',

106

overwrite=False,

107

dag=dag

108

)

109

110

# Transfer from local to cloud storage

111

upload_task = FileTransferOperator(

112

task_id='upload_to_gcs',

113

src='/local/path/data.json',

114

dst='gs://my-bucket/data/data.json',

115

dest_conn_id='google_cloud_default',

116

dag=dag

117

)

118

```

119

120

#### Using ObjectStoragePath Objects

121

122

```python

123

from airflow.io.path import ObjectStoragePath

124

125

# Create ObjectStoragePath objects directly

126

source_path = ObjectStoragePath('s3://bucket/source.txt', conn_id='aws_conn')

127

dest_path = ObjectStoragePath('gcs://bucket/dest.txt', conn_id='gcp_conn')

128

129

cross_cloud_transfer = FileTransferOperator(

130

task_id='cross_cloud_transfer',

131

src=source_path,

132

dst=dest_path,

133

dag=dag

134

)

135

```

136

137

#### Template Support

138

139

```python

140

# Use Airflow templating in paths

141

templated_transfer = FileTransferOperator(

142

task_id='templated_transfer',

143

src='s3://bucket/data/{{ ds }}/input.csv',

144

dst='s3://bucket/processed/{{ ds }}/output.csv',

145

source_conn_id='aws_default',

146

dest_conn_id='aws_default',

147

dag=dag

148

)

149

```

150

151

## Error Handling

152

153

The FileTransferOperator handles several error conditions:

154

155

- **Destination exists**: Raises `ValueError` if destination file exists and `overwrite=False`

156

- **Connection errors**: Propagates connection-related exceptions from underlying storage systems

157

- **Permission errors**: Propagates permission-related exceptions from storage backends

158

- **Path validation**: Uses ObjectStoragePath validation for path resolution

159

160

## Integration Features

161

162

### OpenLineage Integration

163

164

The operator automatically provides data lineage information through the `get_openlineage_facets_on_start()` method, creating input and output datasets for lineage tracking systems.

165

166

### Airflow Templating

167

168

The `src` and `dst` fields support Airflow's Jinja2 templating, allowing dynamic path construction using execution context variables like `{{ ds }}`, `{{ task_instance_key_str }}`, etc.

169

170

### Version Compatibility

171

172

The operator works with both Airflow 2.x and 3.0+ through the version compatibility layer, automatically importing the correct BaseOperator and ObjectStoragePath classes.