or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

file-sinks.mddocs/

0

# File Sinks

1

2

File sinks provide unified writing capabilities with exactly-once semantics, bucketing, rolling policies, and optional file compaction for improved performance.

3

4

## Capabilities

5

6

### FileSink Class

7

8

Main entry point for creating file sinks that write to distributed file systems with exactly-once guarantees.

9

10

```java { .api }

11

/**

12

* A unified sink that emits its input elements to FileSystem files within buckets. This

13

* sink achieves exactly-once semantics for both BATCH and STREAMING.

14

*

15

* When creating the sink a basePath must be specified. The base directory contains one

16

* directory for every bucket. The bucket directories themselves contain several part files, with at

17

* least one for each parallel subtask of the sink which is writing data to that bucket. These part

18

* files contain the actual output data.

19

*

20

* The sink uses a BucketAssigner to determine in which bucket directory each element

21

* should be written to inside the base directory. The BucketAssigner can, for example, roll

22

* on every checkpoint or use time or a property of the element to determine the bucket directory.

23

* The default BucketAssigner is a DateTimeBucketAssigner which will create one new

24

* bucket every hour.

25

*/

26

@Experimental

27

public class FileSink<IN>

28

implements Sink<IN>,

29

SupportsWriterState<IN, FileWriterBucketState>,

30

SupportsCommitter<FileSinkCommittable>,

31

SupportsWriterState.WithCompatibleState,

32

SupportsPreCommitTopology<FileSinkCommittable, FileSinkCommittable>,

33

SupportsConcurrentExecutionAttempts {

34

35

/**

36

* Creates a FileSink for row-wise writing using encoders.

37

* The created sink will write each record in a separate line separated by line delimiters.

38

*/

39

public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(

40

final Path basePath, final Encoder<IN> encoder);

41

42

/**

43

* Creates a FileSink for bulk writing using BulkWriter factories.

44

* This is suitable for formats such as Parquet or ORC.

45

*/

46

public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(

47

final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);

48

}

49

```

50

51

### FileSink.DefaultRowFormatBuilder

52

53

Builder for configuring row-format file sinks with encoders.

54

55

```java { .api }

56

/**

57

* Builder for row-format file sinks

58

*/

59

public static class DefaultRowFormatBuilder<IN> {

60

/**

61

* Sets custom bucketing strategy for organizing output files

62

* @param bucketAssigner Strategy for assigning records to buckets

63

* @return Builder instance for chaining

64

*/

65

public DefaultRowFormatBuilder<IN> withBucketAssigner(

66

BucketAssigner<IN, String> bucketAssigner);

67

68

/**

69

* Sets rolling policy for when to create new files

70

* @param rollingPolicy Policy controlling file rolling behavior

71

* @return Builder instance for chaining

72

*/

73

public DefaultRowFormatBuilder<IN> withRollingPolicy(

74

RollingPolicy<IN, String> rollingPolicy);

75

76

/**

77

* Sets output file configuration for naming and format

78

* @param outputFileConfig Configuration for output file properties

79

* @return Builder instance for chaining

80

*/

81

public DefaultRowFormatBuilder<IN> withOutputFileConfig(

82

OutputFileConfig outputFileConfig);

83

84

/**

85

* Enables file compaction to merge small files

86

* @param compactStrategy Strategy for triggering compaction

87

* @param compactor Implementation for compacting files

88

* @return Builder instance for chaining

89

*/

90

public DefaultRowFormatBuilder<IN> enableCompact(

91

FileCompactStrategy compactStrategy, FileCompactor compactor);

92

93

/**

94

* Builds the final FileSink instance

95

* @return Configured FileSink

96

*/

97

public FileSink<IN> build();

98

}

99

```

100

101

### FileSink.DefaultBulkFormatBuilder

102

103

Builder for configuring bulk-format file sinks.

104

105

```java { .api }

106

/**

107

* Builder for bulk-format file sinks

108

*/

109

public static class DefaultBulkFormatBuilder<IN> {

110

/**

111

* Sets custom bucketing strategy for organizing output files

112

* @param bucketAssigner Strategy for assigning records to buckets

113

* @return Builder instance for chaining

114

*/

115

public DefaultBulkFormatBuilder<IN> withBucketAssigner(

116

BucketAssigner<IN, String> bucketAssigner);

117

118

/**

119

* Sets rolling policy for when to create new files

120

* @param rollingPolicy Policy controlling file rolling behavior

121

* @return Builder instance for chaining

122

*/

123

public DefaultBulkFormatBuilder<IN> withRollingPolicy(

124

RollingPolicy<IN, String> rollingPolicy);

125

126

/**

127

* Sets output file configuration for naming and format

128

* @param outputFileConfig Configuration for output file properties

129

* @return Builder instance for chaining

130

*/

131

public DefaultBulkFormatBuilder<IN> withOutputFileConfig(

132

OutputFileConfig outputFileConfig);

133

134

/**

135

* Enables file compaction to merge small files

136

* @param compactStrategy Strategy for triggering compaction

137

* @param compactor Implementation for compacting files

138

* @return Builder instance for chaining

139

*/

140

public DefaultBulkFormatBuilder<IN> enableCompact(

141

FileCompactStrategy compactStrategy, FileCompactor compactor);

142

143

/**

144

* Disables writing to local file system for HDFS compatibility

145

* @return Builder instance for chaining

146

*/

147

public DefaultBulkFormatBuilder<IN> disableLocalWriting();

148

149

/**

150

* Builds the final FileSink instance

151

* @return Configured FileSink

152

*/

153

public FileSink<IN> build();

154

}

155

```

156

157

**Usage Examples:**

158

159

```java

160

import org.apache.flink.connector.file.sink.FileSink;

161

import org.apache.flink.api.common.serialization.SimpleStringEncoder;

162

import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultRollingPolicy;

163

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

164

import org.apache.flink.core.fs.Path;

165

import org.apache.flink.configuration.MemorySize;

166

import java.time.Duration;

167

168

// Basic file sink for text output

169

FileSink<String> basicSink = FileSink

170

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

171

.build();

172

173

// Sink with rolling policy and bucketing

174

FileSink<String> advancedSink = FileSink

175

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

176

.withRollingPolicy(DefaultRollingPolicy.builder()

177

.withMaxPartSize(MemorySize.ofMebiBytes(128))

178

.withRolloverInterval(Duration.ofMinutes(15))

179

.withInactivityInterval(Duration.ofMinutes(5))

180

.build())

181

.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))

182

.build();

183

184

// Sink with file compaction

185

FileSink<String> compactingSink = FileSink

186

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

187

.enableCompact(

188

FileCompactStrategy.builder()

189

.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())

190

.enableCompactionOnCheckpoint(3)

191

.build(),

192

new ConcatFileCompactor())

193

.build();

194

195

// Use with DataStream API

196

stream.sinkTo(advancedSink);

197

```

198

199

### File Writers and Buckets

200

201

Internal components for managing file writing operations.

202

203

```java { .api }

204

/**

205

* Writer that manages file buckets and handles the writing process

206

*/

207

public class FileWriter<IN> implements SinkWriter<IN> {

208

// Internal implementation - not directly used by applications

209

}

210

211

/**

212

* Factory for creating FileWriterBucket instances

213

*/

214

public interface FileWriterBucketFactory<IN> {

215

FileWriterBucket<IN> getWriterBucket(String bucketId) throws IOException;

216

}

217

218

/**

219

* Default implementation of FileWriterBucketFactory

220

*/

221

public class DefaultFileWriterBucketFactory<IN> implements FileWriterBucketFactory<IN> {

222

public DefaultFileWriterBucketFactory(

223

Path basePath,

224

Encoder<IN> encoder,

225

RollingPolicy<IN, String> rollingPolicy,

226

OutputFileConfig outputFileConfig);

227

}

228

```

229

230

### Committable Types

231

232

Types representing committable file operations for exactly-once semantics.

233

234

```java { .api }

235

/**

236

* Represents a committable file operation

237

*/

238

public class FileSinkCommittable {

239

public FileSinkCommittable(String bucketId, Path path, long creationTime);

240

public String getBucketId();

241

public Path getPath();

242

public long getCreationTime();

243

}

244

245

/**

246

* Serializer for FileSinkCommittable instances

247

*/

248

public class FileSinkCommittableSerializer implements SimpleVersionedSerializer<FileSinkCommittable> {

249

public int getVersion();

250

public byte[] serialize(FileSinkCommittable committable) throws IOException;

251

public FileSinkCommittable deserialize(int version, byte[] serialized) throws IOException;

252

}

253

```

254

255

**Advanced Configuration Examples:**

256

257

```java

258

// Custom output file configuration

259

OutputFileConfig fileConfig = OutputFileConfig.builder()

260

.withPartPrefix("data-")

261

.withPartSuffix(".txt")

262

.build();

263

264

// Custom bucket assigner for organizing by key

265

BucketAssigner<MyRecord, String> keyBucketAssigner = new BucketAssigner<MyRecord, String>() {

266

@Override

267

public String getBucketId(MyRecord record, Context context) {

268

return "bucket-" + record.getKey().hashCode() % 10;

269

}

270

271

@Override

272

public SimpleVersionedSerializer<String> getSerializer() {

273

return SimpleVersionedStringSerializer.INSTANCE;

274

}

275

};

276

277

FileSink<MyRecord> customSink = FileSink

278

.forRowFormat(new Path("/output"), new MyRecordEncoder())

279

.withBucketAssigner(keyBucketAssigner)

280

.withOutputFileConfig(fileConfig)

281

.build();

282

```

283

284

## Error Handling

285

286

File sinks handle various error conditions during writing:

287

288

- **IOException**: File system write errors, disk full conditions

289

- **IllegalArgumentException**: Invalid configuration or paths

290

- **RuntimeException**: Encoding errors, compaction failures

291

292

```java

293

try {

294

FileSink<String> sink = FileSink

295

.forRowFormat(new Path("/invalid/output"), new SimpleStringEncoder<>("UTF-8"))

296

.build();

297

stream.sinkTo(sink);

298

} catch (Exception e) {

299

// Handle sink configuration or runtime errors

300

}

301

```

302

303

## Performance Considerations

304

305

- Use appropriate rolling policies to balance file size and number of files

306

- Configure bucketing to distribute load and improve query performance

307

- Enable compaction for workloads that produce many small files

308

- Consider bulk formats for high-throughput scenarios

309

- Monitor checkpoint intervals to balance consistency and performance

310

- Use `disableLocalWriting()` for HDFS deployments to avoid local filesystem usage