or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucket-assignment.mdconfiguration.mdfile-writers.mdindex.mdrolling-policies.md

bucket-assignment.mddocs/

0

# Bucket Assignment

1

2

Bucket assignment determines how streaming data elements are organized into logical buckets (directories) in the file system. The package provides flexible bucket assignment strategies and supports custom implementations.

3

4

## Capabilities

5

6

### BucketAssigner Interface

7

8

Core interface for implementing bucket assignment logic.

9

10

```java { .api }

11

/**

12

* Interface for determining bucket assignment of streaming elements

13

* @param <IN> The type of input elements

14

* @param <BucketID> The type of bucket identifier (must have correct hashCode() and equals())

15

*/

16

public interface BucketAssigner<IN, BucketID> extends Serializable {

17

/**

18

* Returns the identifier of the bucket the provided element should be put into

19

* @param element The current element being processed

20

* @param context The context used by the current bucket assigner

21

* @return Bucket identifier for the element

22

*/

23

BucketID getBucketId(IN element, BucketAssigner.Context context);

24

25

/**

26

* Returns a serializer capable of serializing/deserializing bucket IDs

27

* @return SimpleVersionedSerializer for bucket IDs

28

*/

29

SimpleVersionedSerializer<BucketID> getSerializer();

30

}

31

```

32

33

### BucketAssigner Context

34

35

Provides contextual information for bucket assignment decisions.

36

37

```java { .api }

38

/**

39

* Context that BucketAssigner can use for getting additional data about input records

40

* Context is only valid for the duration of a getBucketId() call

41

*/

42

public interface Context {

43

/** Returns the current processing time */

44

long currentProcessingTime();

45

46

/** Returns the current event-time watermark */

47

long currentWatermark();

48

49

/**

50

* Returns the timestamp of the current input record

51

* @return timestamp in milliseconds or null if element has no assigned timestamp

52

*/

53

Long timestamp();

54

}

55

```

56

57

### DateTimeBucketAssigner

58

59

Built-in bucket assigner that creates buckets based on system time using date/time patterns.

60

61

```java { .api }

62

/**

63

* BucketAssigner that assigns to buckets based on current system time

64

* Creates directories of the form: /{basePath}/{dateTimePath}/

65

* Uses DateTimeFormatter with configurable format string and timezone

66

*/

67

public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {

68

/** Creates DateTimeBucketAssigner with default format "yyyy-MM-dd--HH" */

69

public DateTimeBucketAssigner();

70

71

/**

72

* Creates DateTimeBucketAssigner with custom date/time format string

73

* @param formatString Format string for DateTimeFormatter to determine bucket path

74

*/

75

public DateTimeBucketAssigner(String formatString);

76

77

/**

78

* Creates DateTimeBucketAssigner with default format using specified timezone

79

* @param zoneId The timezone for DateTimeFormatter

80

*/

81

public DateTimeBucketAssigner(ZoneId zoneId);

82

83

/**

84

* Creates DateTimeBucketAssigner with custom format and timezone

85

* @param formatString Format string for DateTimeFormatter

86

* @param zoneId The timezone for DateTimeFormatter

87

*/

88

public DateTimeBucketAssigner(String formatString, ZoneId zoneId);

89

90

@Override

91

public String getBucketId(IN element, BucketAssigner.Context context);

92

93

@Override

94

public SimpleVersionedSerializer<String> getSerializer();

95

}

96

```

97

98

**Usage Examples:**

99

100

```java

101

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

102

import java.time.ZoneId;

103

104

// Default hourly bucketing: "yyyy-MM-dd--HH"

105

BucketAssigner<String, String> hourlyAssigner = new DateTimeBucketAssigner<>();

106

107

// Daily bucketing

108

BucketAssigner<String, String> dailyAssigner =

109

new DateTimeBucketAssigner<>("yyyy-MM-dd");

110

111

// Hourly bucketing with custom timezone

112

BucketAssigner<String, String> timezoneAssigner =

113

new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("America/New_York"));

114

115

// Minute-level bucketing

116

BucketAssigner<String, String> minuteAssigner =

117

new DateTimeBucketAssigner<>("yyyy-MM-dd/HH/mm");

118

119

// Example bucket paths generated:

120

// "2023-12-31--14" (hourly)

121

// "2023-12-31" (daily)

122

// "2023-12-31/14/30" (minute-level)

123

```

124

125

### BasePathBucketAssigner

126

127

Simple bucket assigner that writes all files to the base path without additional bucketing.

128

129

```java { .api }

130

/**

131

* BucketAssigner that does not perform any bucketing of files

132

* All files are written to the base path

133

*/

134

public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {

135

@Override

136

public String getBucketId(T element, BucketAssigner.Context context);

137

138

@Override

139

public SimpleVersionedSerializer<String> getSerializer();

140

}

141

```

142

143

**Usage Example:**

144

145

```java

146

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;

147

148

// No bucketing - all files in base directory

149

BucketAssigner<MyEvent, String> noBucketing = new BasePathBucketAssigner<>();

150

```

151

152

### SimpleVersionedStringSerializer

153

154

Utility serializer for string-based bucket identifiers.

155

156

```java { .api }

157

/**

158

* SimpleVersionedSerializer implementation for Strings

159

*/

160

public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer<String> {

161

/** Singleton instance */

162

public static final SimpleVersionedStringSerializer INSTANCE;

163

164

@Override

165

public int getVersion();

166

167

@Override

168

public byte[] serialize(String value);

169

170

@Override

171

public String deserialize(int version, byte[] serialized) throws IOException;

172

}

173

```

174

175

## Custom Bucket Assignment

176

177

You can implement custom bucket assignment logic by implementing the `BucketAssigner` interface:

178

179

```java

180

import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;

181

import org.apache.flink.core.io.SimpleVersionedSerializer;

182

183

public class CustomBucketAssigner implements BucketAssigner<MyEvent, String> {

184

@Override

185

public String getBucketId(MyEvent element, BucketAssigner.Context context) {

186

// Custom logic based on element properties

187

if (element.getPriority() == Priority.HIGH) {

188

return "high-priority/" + element.getCategory();

189

} else {

190

return "normal/" + element.getCategory();

191

}

192

}

193

194

@Override

195

public SimpleVersionedSerializer<String> getSerializer() {

196

return SimpleVersionedStringSerializer.INSTANCE;

197

}

198

}

199

```

200

201

## Error Handling

202

203

- `getBucketId()` should not return null

204

- Bucket IDs must implement proper `hashCode()` and `equals()` methods

205

- The `toString()` of the bucket ID becomes part of the file path

206

- Serialization failures will cause job failures and require job restart