0
# Rolling Policies
1
2
Rolling policies determine when to close the current part file and start a new one in file sink operations. The package provides flexible policies based on file size, time intervals, processing events, and checkpoints.
3
4
## Capabilities
5
6
### RollingPolicy Interface
7
8
Core interface for implementing file rolling logic.
9
10
```java { .api }
11
/**
12
* Policy for determining when a Bucket in the Filesystem Sink rolls its currently open part file
13
* @param <IN> The type of input elements
14
* @param <BucketID> The type of bucket identifier
15
*/
16
public interface RollingPolicy<IN, BucketID> extends Serializable {
17
/**
18
* Determines if the in-progress part file should roll on every checkpoint
19
* @param partFileState the state of the currently open part file
20
* @return true if the part file should roll, false otherwise
21
*/
22
boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
23
24
/**
25
* Determines if the in-progress part file should roll based on its current state
26
* @param element the element being processed
27
* @param partFileState the state of the currently open part file
28
* @return true if the part file should roll, false otherwise
29
*/
30
boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
31
32
/**
33
* Determines if the in-progress part file should roll based on a time condition
34
* @param partFileState the state of the currently open part file
35
* @param currentTime the current processing time
36
* @return true if the part file should roll, false otherwise
37
*/
38
boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
39
}
40
```
41
42
### PartFileInfo Interface
43
44
Provides information about the current part file for rolling policy decisions.
45
46
```java { .api }
47
/**
48
* Interface exposing information about the current (open) part file
49
* Used by RollingPolicy to determine if it should roll the part file
50
*/
51
public interface PartFileInfo<BucketID> {
52
/**
53
* @return The bucket identifier of the current buffer
54
*/
55
BucketID getBucketId();
56
57
/**
58
* @return The creation time (in ms) of the currently open part file
59
*/
60
long getCreationTime();
61
62
/**
63
* @return The size of the currently open part file
64
*/
65
long getSize() throws IOException;
66
67
/**
68
* @return The last time (in ms) the currently open part file was written to
69
*/
70
long getLastUpdateTime();
71
}
72
```
73
74
### DefaultRollingPolicy
75
76
Comprehensive rolling policy implementation with configurable size, time, and inactivity thresholds.
77
78
```java { .api }
79
/**
80
* Default implementation of RollingPolicy
81
* Rolls a part file if:
82
* 1. there is no open part file
83
* 2. current file has reached maximum size (default 128MB)
84
* 3. current file is older than rollover interval (default 60 sec)
85
* 4. current file has not been written to for more than inactivity time (default 60 sec)
86
*/
87
public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
88
/** Default inactivity interval: 60 seconds */
89
private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
90
91
/** Default rollover interval: 60 seconds */
92
private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
93
94
/** Default maximum part size: 128MB */
95
private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
96
97
@Override
98
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
99
100
@Override
101
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
102
103
@Override
104
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
105
106
/**
107
* Returns the maximum part file size before rolling
108
* @return Max size in bytes
109
*/
110
public long getMaxPartSize();
111
112
/**
113
* Returns the maximum time duration a part file can stay open before rolling
114
* @return Time duration in milliseconds
115
*/
116
public long getRolloverInterval();
117
118
/**
119
* Returns time duration of allowed inactivity after which a part file will roll
120
* @return Time duration in milliseconds
121
*/
122
public long getInactivityInterval();
123
124
/**
125
* Creates a new PolicyBuilder for configuring DefaultRollingPolicy
126
*/
127
public static DefaultRollingPolicy.PolicyBuilder builder();
128
}
129
```
130
131
### DefaultRollingPolicy.PolicyBuilder
132
133
Builder for configuring DefaultRollingPolicy instances.
134
135
```java { .api }
136
/**
137
* Builder class for configuring DefaultRollingPolicy
138
*/
139
public static final class PolicyBuilder {
140
/**
141
* Sets the part size above which a part file will have to roll
142
* @param size the allowed part size
143
*/
144
public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(MemorySize size);
145
146
/**
147
* Sets the interval of allowed inactivity after which a part file will roll
148
* @param interval the allowed inactivity interval
149
*/
150
public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(Duration interval);
151
152
/**
153
* Sets the max time a part file can stay open before having to roll
154
* @param interval the desired rollover interval
155
*/
156
public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(Duration interval);
157
158
/** Creates the actual policy */
159
public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build();
160
}
161
```
162
163
**Usage Examples:**
164
165
```java
166
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
167
import org.apache.flink.configuration.MemorySize;
168
import java.time.Duration;
169
170
// Default policy (128MB, 60s rollover, 60s inactivity)
171
RollingPolicy<String, String> defaultPolicy = DefaultRollingPolicy.<String, String>builder().build();
172
173
// Custom size and timing
174
RollingPolicy<String, String> customPolicy = DefaultRollingPolicy.<String, String>builder()
175
.withMaxPartSize(MemorySize.ofMebiBytes(256))
176
.withRolloverInterval(Duration.ofMinutes(15))
177
.withInactivityInterval(Duration.ofMinutes(5))
178
.build();
179
180
// Large files with longer intervals
181
RollingPolicy<String, String> largeBatchPolicy = DefaultRollingPolicy.<String, String>builder()
182
.withMaxPartSize(MemorySize.ofGibiBytes(1))
183
.withRolloverInterval(Duration.ofHours(1))
184
.withInactivityInterval(Duration.ofMinutes(30))
185
.build();
186
```
187
188
### CheckpointRollingPolicy
189
190
Abstract base class for policies that roll on every checkpoint.
191
192
```java { .api }
193
/**
194
* Abstract RollingPolicy which rolls on every checkpoint
195
*/
196
public abstract class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
197
/** Always returns true - rolls on every checkpoint */
198
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState);
199
200
/** Subclasses define event-based rolling behavior */
201
public abstract boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
202
203
/** Subclasses define time-based rolling behavior */
204
public abstract boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
205
}
206
```
207
208
### OnCheckpointRollingPolicy
209
210
Simple policy that rolls files only on checkpoints.
211
212
```java { .api }
213
/**
214
* RollingPolicy which rolls ONLY on every checkpoint
215
* Does not roll based on events or processing time
216
*/
217
public final class OnCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> {
218
@Override
219
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element);
220
221
@Override
222
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
223
224
/** Creates an instance of OnCheckpointRollingPolicy */
225
public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> build();
226
}
227
```
228
229
**Usage Example:**
230
231
```java
232
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
233
234
// Roll only on checkpoints - useful for exactly-once guarantees
235
RollingPolicy<MyEvent, String> checkpointOnly = OnCheckpointRollingPolicy.<MyEvent, String>build();
236
```
237
238
## Custom Rolling Policies
239
240
You can implement custom rolling logic by implementing the `RollingPolicy` interface:
241
242
```java
243
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
244
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
245
246
public class RecordCountRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
247
private final long maxRecords;
248
private long recordCount = 0;
249
250
public RecordCountRollingPolicy(long maxRecords) {
251
this.maxRecords = maxRecords;
252
}
253
254
@Override
255
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
256
return recordCount >= maxRecords;
257
}
258
259
@Override
260
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
261
recordCount++;
262
return recordCount >= maxRecords;
263
}
264
265
@Override
266
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
267
return false; // Don't roll based on time
268
}
269
}
270
```
271
272
## Error Handling
273
274
- Rolling policy methods should handle `IOException` from `PartFileInfo.getSize()`
275
- Failing rolling policies will cause job failures
276
- Rolling decisions are made frequently - avoid expensive operations
277
- Time-based rolling frequency is controlled by bucket check interval settings