0
# Apache Flink File Sink Common
1
2
Apache Flink File Sink Common provides foundational utilities and interfaces for implementing file sink functionality in Apache Flink stream processing applications. It contains core abstractions for bucket writers, part file writers, rolling policies, and bucket assigners that enable efficient and reliable writing of streaming data to file systems.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-file-sink-common
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven dependencies:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-file-sink-common</artifactId>
15
<version>2.1.0</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
23
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
24
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
25
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
26
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
27
```
28
29
## Basic Usage
30
31
```java
32
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
33
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
34
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
35
import org.apache.flink.configuration.MemorySize;
36
import java.time.Duration;
37
38
// Create a date-time based bucket assigner
39
BucketAssigner<String, String> bucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd/HH");
40
41
// Create a rolling policy with custom settings
42
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.<String, String>builder()
43
.withMaxPartSize(MemorySize.ofMebiBytes(256))
44
.withRolloverInterval(Duration.ofMinutes(15))
45
.withInactivityInterval(Duration.ofMinutes(5))
46
.build();
47
48
// Configure output file naming
49
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
50
.withPartPrefix("data")
51
.withPartSuffix(".txt")
52
.build();
53
```
54
55
## Architecture
56
57
Apache Flink File Sink Common is built around several key architectural components:
58
59
- **Bucket Assignment**: Determines how streaming data is organized into logical buckets (directories)
60
- **Rolling Policies**: Define when to close current files and start new ones based on size, time, or other criteria
61
- **File Writers**: Handle the actual writing of data to files, supporting both row-wise and bulk writing patterns
62
- **Recovery Support**: Provides serializable state for fault tolerance and exactly-once processing guarantees
63
- **Configuration**: Offers flexible configuration options for file naming, writer properties, and behavior
64
65
## Capabilities
66
67
### Bucket Assignment
68
69
Organizes streaming data into logical buckets using pluggable assignment strategies. Supports time-based bucketing, custom bucketing logic, and serializable bucket identifiers.
70
71
```java { .api }
72
public interface BucketAssigner<IN, BucketID> extends Serializable {
73
BucketID getBucketId(IN element, BucketAssigner.Context context);
74
SimpleVersionedSerializer<BucketID> getSerializer();
75
}
76
77
public interface Context {
78
long currentProcessingTime();
79
long currentWatermark();
80
Long timestamp();
81
}
82
```
83
84
[Bucket Assignment](./bucket-assignment.md)
85
86
### Rolling Policies
87
88
Controls when to close current part files and start new ones based on configurable criteria including file size, time intervals, and processing events.
89
90
```java { .api }
91
public interface RollingPolicy<IN, BucketID> extends Serializable {
92
boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
93
boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
94
boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
95
}
96
97
public interface PartFileInfo<BucketID> {
98
BucketID getBucketId();
99
long getCreationTime();
100
long getSize() throws IOException;
101
long getLastUpdateTime();
102
}
103
```
104
105
[Rolling Policies](./rolling-policies.md)
106
107
### File Writers
108
109
Provides abstractions for writing data to files with support for both row-wise encoding and bulk writing patterns. Handles file recovery and commit operations.
110
111
```java { .api }
112
public interface BucketWriter<IN, BucketID> {
113
InProgressFileWriter<IN, BucketID> openNewInProgressFile(
114
BucketID bucketID, Path path, long creationTime) throws IOException;
115
116
InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
117
BucketID bucketID,
118
InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
119
long creationTime) throws IOException;
120
121
WriterProperties getProperties();
122
}
123
124
public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
125
void write(IN element, long currentTime) throws IOException;
126
InProgressFileRecoverable persist() throws IOException;
127
PendingFileRecoverable closeForCommit() throws IOException;
128
void dispose();
129
}
130
```
131
132
[File Writers](./file-writers.md)
133
134
### Configuration
135
136
Configurable options for file naming patterns, writer properties, and behavior customization.
137
138
```java { .api }
139
public class OutputFileConfig implements Serializable {
140
public OutputFileConfig(String partPrefix, String partSuffix);
141
public String getPartPrefix();
142
public String getPartSuffix();
143
public static OutputFileConfigBuilder builder();
144
}
145
146
public class WriterProperties {
147
public WriterProperties(
148
SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
149
SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
150
boolean supportsResume);
151
152
public boolean supportsResume();
153
}
154
```
155
156
[Configuration](./configuration.md)
157
158
## Types
159
160
```java { .api }
161
public interface InProgressFileWriter {
162
interface InProgressFileRecoverable extends PendingFileRecoverable {}
163
164
interface PendingFileRecoverable {
165
Path getPath();
166
long getSize();
167
}
168
}
169
170
public interface CompactingFileWriter {
171
PendingFileRecoverable closeForCommit() throws IOException;
172
173
enum Type {
174
RECORD_WISE,
175
OUTPUT_STREAM
176
}
177
}
178
179
public interface BucketWriter {
180
interface PendingFile {
181
void commit() throws IOException;
182
void commitAfterRecovery() throws IOException;
183
}
184
}
185
```