or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucket-assignment.mdconfiguration.mdfile-writers.mdindex.mdrolling-policies.md

configuration.mddocs/

0

# Configuration

1

2

Configuration classes provide options for customizing file naming patterns, writer properties, and behavior of the file sink components.

3

4

## Capabilities

5

6

### OutputFileConfig

7

8

Configuration class for customizing part file naming patterns.

9

10

```java { .api }

11

/**

12

* Part file name configuration

13

* Allows defining a prefix and suffix to the part file name

14

*/

15

public class OutputFileConfig implements Serializable {

16

/**

17

* Initiates the OutputFileConfig with values passed as parameters

18

* @param partPrefix the beginning of part file name

19

* @param partSuffix the ending of part file name

20

*/

21

public OutputFileConfig(String partPrefix, String partSuffix);

22

23

/**

24

* The prefix for the part name

25

* @return the part file prefix

26

*/

27

public String getPartPrefix();

28

29

/**

30

* The suffix for the part name

31

* @return the part file suffix

32

*/

33

public String getPartSuffix();

34

35

/**

36

* Creates a builder to create the part file configuration

37

* @return new OutputFileConfigBuilder instance

38

*/

39

public static OutputFileConfigBuilder builder();

40

}

41

```

42

43

### OutputFileConfigBuilder

44

45

Builder for creating OutputFileConfig instances with fluent API.

46

47

```java { .api }

48

/**

49

* Builder to create the part file configuration

50

*/

51

public static class OutputFileConfigBuilder {

52

/** Default part prefix: "part" */

53

private static final String DEFAULT_PART_PREFIX = "part";

54

55

/** Default part suffix: "" (empty string) */

56

private static final String DEFAULT_PART_SUFFIX = "";

57

58

/**

59

* Sets the prefix for part files

60

* @param prefix the desired prefix

61

* @return this builder instance

62

*/

63

public OutputFileConfigBuilder withPartPrefix(String prefix);

64

65

/**

66

* Sets the suffix for part files

67

* @param suffix the desired suffix

68

* @return this builder instance

69

*/

70

public OutputFileConfigBuilder withPartSuffix(String suffix);

71

72

/**

73

* Creates the OutputFileConfig instance

74

* @return configured OutputFileConfig

75

*/

76

public OutputFileConfig build();

77

}

78

```

79

80

**Usage Examples:**

81

82

```java

83

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;

84

85

// Default configuration: prefix="part", suffix=""

86

OutputFileConfig defaultConfig = OutputFileConfig.builder().build();

87

// Generates files like: part-0, part-1, part-2

88

89

// Custom prefix and suffix

90

OutputFileConfig customConfig = OutputFileConfig.builder()

91

.withPartPrefix("data")

92

.withPartSuffix(".txt")

93

.build();

94

// Generates files like: data-0.txt, data-1.txt, data-2.txt

95

96

// JSON files with timestamp prefix

97

OutputFileConfig jsonConfig = OutputFileConfig.builder()

98

.withPartPrefix("events-" + System.currentTimeMillis())

99

.withPartSuffix(".json")

100

.build();

101

// Generates files like: events-1640995200000-0.json

102

103

// Log files with descriptive naming

104

OutputFileConfig logConfig = OutputFileConfig.builder()

105

.withPartPrefix("application-logs")

106

.withPartSuffix(".log")

107

.build();

108

// Generates files like: application-logs-0.log

109

```

110

111

### WriterProperties

112

113

Configuration class describing the properties and capabilities of a BucketWriter.

114

115

```java { .api }

116

/**

117

* Class describing the property of the BucketWriter

118

*/

119

public class WriterProperties {

120

/**

121

* Creates WriterProperties with serializers and resume support flag

122

* @param inProgressFileRecoverableSerializer serializer for in-progress file recoverables

123

* @param pendingFileRecoverableSerializer serializer for pending file recoverables

124

* @param supportsResume whether the BucketWriter supports appending data to restored files

125

*/

126

public WriterProperties(

127

SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,

128

SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,

129

boolean supportsResume);

130

131

/**

132

* @return Whether the BucketWriter supports appending data to the restored in-progress file

133

*/

134

public boolean supportsResume();

135

136

/**

137

* @return the serializer for the PendingFileRecoverable

138

*/

139

public SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>

140

getPendingFileRecoverableSerializer();

141

142

/**

143

* @return the serializer for the InProgressFileRecoverable

144

*/

145

public SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>

146

getInProgressFileRecoverableSerializer();

147

}

148

```

149

150

**Usage Example:**

151

152

```java

153

import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;

154

import org.apache.flink.core.io.SimpleVersionedSerializer;

155

156

// Example of creating WriterProperties for a custom BucketWriter

157

public class MyBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {

158

private final WriterProperties properties;

159

160

public MyBucketWriter() {

161

// Define serializers for recovery state

162

SimpleVersionedSerializer<InProgressFileRecoverable> inProgressSerializer = // ... implementation

163

SimpleVersionedSerializer<PendingFileRecoverable> pendingSerializer = // ... implementation

164

165

// Configure properties

166

this.properties = new WriterProperties(

167

inProgressSerializer,

168

pendingSerializer,

169

true // This writer supports resume operations

170

);

171

}

172

173

@Override

174

public WriterProperties getProperties() {

175

return properties;

176

}

177

178

// ... other BucketWriter methods

179

}

180

```

181

182

## Configuration Patterns

183

184

### File Naming Strategies

185

186

```java

187

// Time-based naming

188

OutputFileConfig timeBasedConfig = OutputFileConfig.builder()

189

.withPartPrefix("data-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss")))

190

.withPartSuffix(".avro")

191

.build();

192

193

// Environment-specific naming

194

String environment = System.getProperty("app.environment", "dev");

195

OutputFileConfig envConfig = OutputFileConfig.builder()

196

.withPartPrefix(environment + "-events")

197

.withPartSuffix(".parquet")

198

.build();

199

200

// Content-type specific naming

201

OutputFileConfig csvConfig = OutputFileConfig.builder()

202

.withPartPrefix("export")

203

.withPartSuffix(".csv")

204

.build();

205

```

206

207

### Writer Properties Configuration

208

209

```java

210

// High-performance writer with resume support

211

WriterProperties highPerfProperties = new WriterProperties(

212

customInProgressSerializer,

213

customPendingSerializer,

214

true // Supports resume for fault tolerance

215

);

216

217

// Simple writer without resume capability

218

WriterProperties simpleProperties = new WriterProperties(

219

basicInProgressSerializer,

220

basicPendingSerializer,

221

false // No resume support - simpler but less fault tolerant

222

);

223

```

224

225

## Integration with File Sink

226

227

These configuration classes are typically used when setting up file sinks:

228

229

```java

230

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

231

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

232

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

233

234

// Complete file sink configuration

235

StreamingFileSink<String> sink = StreamingFileSink

236

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

237

.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))

238

.withRollingPolicy(DefaultRollingPolicy.<String, String>builder()

239

.withRolloverInterval(Duration.ofMinutes(15))

240

.withInactivityInterval(Duration.ofMinutes(5))

241

.build())

242

.withOutputFileConfig(OutputFileConfig.builder()

243

.withPartPrefix("events")

244

.withPartSuffix(".txt")

245

.build())

246

.build();

247

```

248

249

## Error Handling

250

251

- Invalid prefix or suffix strings may cause file system errors

252

- Null values for prefix or suffix are not allowed (checked by Preconditions)

253

- Resume capability mismatches between properties and actual implementation will cause runtime failures

254

- Serializer failures during recovery will cause job restart

255

- File naming conflicts may occur if multiple sinks use the same configuration