or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-file-sink-common

Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-file-sink-common@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-file-sink-common@2.1.0

0

# Apache Flink File Sink Common

1

2

Apache Flink File Sink Common provides foundational utilities and interfaces for implementing file sink functionality in Apache Flink stream processing applications. It contains core abstractions for bucket writers, part file writers, rolling policies, and bucket assigners that enable efficient and reliable writing of streaming data to file systems.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-file-sink-common

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven dependencies:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-file-sink-common</artifactId>

15

<version>2.1.0</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;

23

import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;

24

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;

25

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

26

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

27

```

28

29

## Basic Usage

30

31

```java

32

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

33

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

34

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;

35

import org.apache.flink.configuration.MemorySize;

36

import java.time.Duration;

37

38

// Create a date-time based bucket assigner

39

BucketAssigner<String, String> bucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd/HH");

40

41

// Create a rolling policy with custom settings

42

RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.<String, String>builder()

43

.withMaxPartSize(MemorySize.ofMebiBytes(256))

44

.withRolloverInterval(Duration.ofMinutes(15))

45

.withInactivityInterval(Duration.ofMinutes(5))

46

.build();

47

48

// Configure output file naming

49

OutputFileConfig outputFileConfig = OutputFileConfig.builder()

50

.withPartPrefix("data")

51

.withPartSuffix(".txt")

52

.build();

53

```

54

55

## Architecture

56

57

Apache Flink File Sink Common is built around several key architectural components:

58

59

- **Bucket Assignment**: Determines how streaming data is organized into logical buckets (directories)

60

- **Rolling Policies**: Define when to close current files and start new ones based on size, time, or other criteria

61

- **File Writers**: Handle the actual writing of data to files, supporting both row-wise and bulk writing patterns

62

- **Recovery Support**: Provides serializable state for fault tolerance and exactly-once processing guarantees

63

- **Configuration**: Offers flexible configuration options for file naming, writer properties, and behavior

64

65

## Capabilities

66

67

### Bucket Assignment

68

69

Organizes streaming data into logical buckets using pluggable assignment strategies. Supports time-based bucketing, custom bucketing logic, and serializable bucket identifiers.

70

71

```java { .api }

72

public interface BucketAssigner<IN, BucketID> extends Serializable {

73

BucketID getBucketId(IN element, BucketAssigner.Context context);

74

SimpleVersionedSerializer<BucketID> getSerializer();

75

}

76

77

public interface Context {

78

long currentProcessingTime();

79

long currentWatermark();

80

Long timestamp();

81

}

82

```

83

84

[Bucket Assignment](./bucket-assignment.md)

85

86

### Rolling Policies

87

88

Controls when to close current part files and start new ones based on configurable criteria including file size, time intervals, and processing events.

89

90

```java { .api }

91

public interface RollingPolicy<IN, BucketID> extends Serializable {

92

boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;

93

boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;

94

boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;

95

}

96

97

public interface PartFileInfo<BucketID> {

98

BucketID getBucketId();

99

long getCreationTime();

100

long getSize() throws IOException;

101

long getLastUpdateTime();

102

}

103

```

104

105

[Rolling Policies](./rolling-policies.md)

106

107

### File Writers

108

109

Provides abstractions for writing data to files with support for both row-wise encoding and bulk writing patterns. Handles file recovery and commit operations.

110

111

```java { .api }

112

public interface BucketWriter<IN, BucketID> {

113

InProgressFileWriter<IN, BucketID> openNewInProgressFile(

114

BucketID bucketID, Path path, long creationTime) throws IOException;

115

116

InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(

117

BucketID bucketID,

118

InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,

119

long creationTime) throws IOException;

120

121

WriterProperties getProperties();

122

}

123

124

public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {

125

void write(IN element, long currentTime) throws IOException;

126

InProgressFileRecoverable persist() throws IOException;

127

PendingFileRecoverable closeForCommit() throws IOException;

128

void dispose();

129

}

130

```

131

132

[File Writers](./file-writers.md)

133

134

### Configuration

135

136

Configurable options for file naming patterns, writer properties, and behavior customization.

137

138

```java { .api }

139

public class OutputFileConfig implements Serializable {

140

public OutputFileConfig(String partPrefix, String partSuffix);

141

public String getPartPrefix();

142

public String getPartSuffix();

143

public static OutputFileConfigBuilder builder();

144

}

145

146

public class WriterProperties {

147

public WriterProperties(

148

SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,

149

SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,

150

boolean supportsResume);

151

152

public boolean supportsResume();

153

}

154

```

155

156

[Configuration](./configuration.md)

157

158

## Types

159

160

```java { .api }

161

public interface InProgressFileWriter {

162

interface InProgressFileRecoverable extends PendingFileRecoverable {}

163

164

interface PendingFileRecoverable {

165

Path getPath();

166

long getSize();

167

}

168

}

169

170

public interface CompactingFileWriter {

171

PendingFileRecoverable closeForCommit() throws IOException;

172

173

enum Type {

174

RECORD_WISE,

175

OUTPUT_STREAM

176

}

177

}

178

179

public interface BucketWriter {

180

interface PendingFile {

181

void commit() throws IOException;

182

void commitAfterRecovery() throws IOException;

183

}

184

}

185

```