or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucket-assignment.mdconfiguration.mdfile-writers.mdindex.mdrolling-policies.md

rolling-policies.mddocs/

0

# Rolling Policies

1

2

Rolling policies determine when to close the current part file and start a new one in file sink operations. The package provides flexible policies based on file size, time intervals, processing events, and checkpoints.

3

4

## Capabilities

5

6

### RollingPolicy Interface

7

8

Core interface for implementing file rolling logic.

9

10

```java { .api }

11

/**

12

* Policy for determining when a Bucket in the Filesystem Sink rolls its currently open part file

13

* @param <IN> The type of input elements

14

* @param <BucketID> The type of bucket identifier

15

*/

16

public interface RollingPolicy<IN, BucketID> extends Serializable {

17

/**

18

* Determines if the in-progress part file should roll on every checkpoint

19

* @param partFileState the state of the currently open part file

20

* @return true if the part file should roll, false otherwise

21

*/

22

boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;

23

24

/**

25

* Determines if the in-progress part file should roll based on its current state

26

* @param element the element being processed

27

* @param partFileState the state of the currently open part file

28

* @return true if the part file should roll, false otherwise

29

*/

30

boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;

31

32

/**

33

* Determines if the in-progress part file should roll based on a time condition

34

* @param partFileState the state of the currently open part file

35

* @param currentTime the current processing time

36

* @return true if the part file should roll, false otherwise

37

*/

38

boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;

39

}

40

```

41

42

### PartFileInfo Interface

43

44

Provides information about the current part file for rolling policy decisions.

45

46

```java { .api }

47

/**

48

* Interface exposing information about the current (open) part file

49

* Used by RollingPolicy to determine if it should roll the part file

50

*/

51

public interface PartFileInfo<BucketID> {

52

/**

53

* @return The bucket identifier of the current buffer

54

*/

55

BucketID getBucketId();

56

57

/**

58

* @return The creation time (in ms) of the currently open part file

59

*/

60

long getCreationTime();

61

62

/**

63

* @return The size of the currently open part file

64

*/

65

long getSize() throws IOException;

66

67

/**

68

* @return The last time (in ms) the currently open part file was written to

69

*/

70

long getLastUpdateTime();

71

}

72

```

73

74

### DefaultRollingPolicy

75

76

Comprehensive rolling policy implementation with configurable size, time, and inactivity thresholds.

77

78

```java { .api }

79

/**

80

* Default implementation of RollingPolicy

81

* Rolls a part file if:

82

* 1. there is no open part file

83

* 2. current file has reached maximum size (default 128MB)

84

* 3. current file is older than rollover interval (default 60 sec)

85

* 4. current file has not been written to for more than inactivity time (default 60 sec)

86

*/

87

public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {

88

/** Default inactivity interval: 60 seconds */

89

private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;

90

91

/** Default rollover interval: 60 seconds */

92

private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;

93

94

/** Default maximum part size: 128MB */

95

private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;

96

97

@Override

98

public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;

99

100

@Override

101

public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;

102

103

@Override

104

public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);

105

106

/**

107

* Returns the maximum part file size before rolling

108

* @return Max size in bytes

109

*/

110

public long getMaxPartSize();

111

112

/**

113

* Returns the maximum time duration a part file can stay open before rolling

114

* @return Time duration in milliseconds

115

*/

116

public long getRolloverInterval();

117

118

/**

119

* Returns time duration of allowed inactivity after which a part file will roll

120

* @return Time duration in milliseconds

121

*/

122

public long getInactivityInterval();

123

124

/**

125

* Creates a new PolicyBuilder for configuring DefaultRollingPolicy

126

*/

127

public static DefaultRollingPolicy.PolicyBuilder builder();

128

}

129

```

130

131

### DefaultRollingPolicy.PolicyBuilder

132

133

Builder for configuring DefaultRollingPolicy instances.

134

135

```java { .api }

136

/**

137

* Builder class for configuring DefaultRollingPolicy

138

*/

139

public static final class PolicyBuilder {

140

/**

141

* Sets the part size above which a part file will have to roll

142

* @param size the allowed part size

143

*/

144

public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(MemorySize size);

145

146

/**

147

* Sets the interval of allowed inactivity after which a part file will roll

148

* @param interval the allowed inactivity interval

149

*/

150

public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(Duration interval);

151

152

/**

153

* Sets the max time a part file can stay open before having to roll

154

* @param interval the desired rollover interval

155

*/

156

public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(Duration interval);

157

158

/** Creates the actual policy */

159

public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build();

160

}

161

```

162

163

**Usage Examples:**

164

165

```java

166

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

167

import org.apache.flink.configuration.MemorySize;

168

import java.time.Duration;

169

170

// Default policy (128MB, 60s rollover, 60s inactivity)

171

RollingPolicy<String, String> defaultPolicy = DefaultRollingPolicy.<String, String>builder().build();

172

173

// Custom size and timing

174

RollingPolicy<String, String> customPolicy = DefaultRollingPolicy.<String, String>builder()

175

.withMaxPartSize(MemorySize.ofMebiBytes(256))

176

.withRolloverInterval(Duration.ofMinutes(15))

177

.withInactivityInterval(Duration.ofMinutes(5))

178

.build();

179

180

// Large files with longer intervals

181

RollingPolicy<String, String> largeBatchPolicy = DefaultRollingPolicy.<String, String>builder()

182

.withMaxPartSize(MemorySize.ofGibiBytes(1))

183

.withRolloverInterval(Duration.ofHours(1))

184

.withInactivityInterval(Duration.ofMinutes(30))

185

.build();

186

```

187

188

### CheckpointRollingPolicy

189

190

Abstract base class for policies that roll on every checkpoint.

191

192

```java { .api }

193

/**

194

* Abstract RollingPolicy which rolls on every checkpoint

195

*/

196

public abstract class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {

197

/** Always returns true - rolls on every checkpoint */

198

public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState);

199

200

/** Subclasses define event-based rolling behavior */

201

public abstract boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;

202

203

/** Subclasses define time-based rolling behavior */

204

public abstract boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;

205

}

206

```

207

208

### OnCheckpointRollingPolicy

209

210

Simple policy that rolls files only on checkpoints.

211

212

```java { .api }

213

/**

214

* RollingPolicy which rolls ONLY on every checkpoint

215

* Does not roll based on events or processing time

216

*/

217

public final class OnCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> {

218

@Override

219

public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element);

220

221

@Override

222

public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);

223

224

/** Creates an instance of OnCheckpointRollingPolicy */

225

public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> build();

226

}

227

```

228

229

**Usage Example:**

230

231

```java

232

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;

233

234

// Roll only on checkpoints - useful for exactly-once guarantees

235

RollingPolicy<MyEvent, String> checkpointOnly = OnCheckpointRollingPolicy.<MyEvent, String>build();

236

```

237

238

## Custom Rolling Policies

239

240

You can implement custom rolling logic by implementing the `RollingPolicy` interface:

241

242

```java

243

import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;

244

import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;

245

246

public class RecordCountRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {

247

private final long maxRecords;

248

private long recordCount = 0;

249

250

public RecordCountRollingPolicy(long maxRecords) {

251

this.maxRecords = maxRecords;

252

}

253

254

@Override

255

public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {

256

return recordCount >= maxRecords;

257

}

258

259

@Override

260

public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {

261

recordCount++;

262

return recordCount >= maxRecords;

263

}

264

265

@Override

266

public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {

267

return false; // Don't roll based on time

268

}

269

}

270

```

271

272

## Error Handling

273

274

- Rolling policy methods should handle `IOException` from `PartFileInfo.getSize()`

275

- Failing rolling policies will cause job failures

276

- Rolling decisions are made frequently - avoid expensive operations

277

- Time-based rolling frequency is controlled by bucket check interval settings