0
# Bucket Assignment
1
2
Bucket assignment determines how streaming data elements are organized into logical buckets (directories) in the file system. The package provides flexible bucket assignment strategies and supports custom implementations.
3
4
## Capabilities
5
6
### BucketAssigner Interface
7
8
Core interface for implementing bucket assignment logic.
9
10
```java { .api }
11
/**
12
* Interface for determining bucket assignment of streaming elements
13
* @param <IN> The type of input elements
14
* @param <BucketID> The type of bucket identifier (must have correct hashCode() and equals())
15
*/
16
public interface BucketAssigner<IN, BucketID> extends Serializable {
17
/**
18
* Returns the identifier of the bucket the provided element should be put into
19
* @param element The current element being processed
20
* @param context The context used by the current bucket assigner
21
* @return Bucket identifier for the element
22
*/
23
BucketID getBucketId(IN element, BucketAssigner.Context context);
24
25
/**
26
* Returns a serializer capable of serializing/deserializing bucket IDs
27
* @return SimpleVersionedSerializer for bucket IDs
28
*/
29
SimpleVersionedSerializer<BucketID> getSerializer();
30
}
31
```
32
33
### BucketAssigner Context
34
35
Provides contextual information for bucket assignment decisions.
36
37
```java { .api }
38
/**
39
* Context that BucketAssigner can use for getting additional data about input records
40
* Context is only valid for the duration of a getBucketId() call
41
*/
42
public interface Context {
43
/** Returns the current processing time */
44
long currentProcessingTime();
45
46
/** Returns the current event-time watermark */
47
long currentWatermark();
48
49
/**
50
* Returns the timestamp of the current input record
51
* @return timestamp in milliseconds or null if element has no assigned timestamp
52
*/
53
Long timestamp();
54
}
55
```
56
57
### DateTimeBucketAssigner
58
59
Built-in bucket assigner that creates buckets based on system time using date/time patterns.
60
61
```java { .api }
62
/**
63
* BucketAssigner that assigns to buckets based on current system time
64
* Creates directories of the form: /{basePath}/{dateTimePath}/
65
* Uses DateTimeFormatter with configurable format string and timezone
66
*/
67
public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
68
/** Creates DateTimeBucketAssigner with default format "yyyy-MM-dd--HH" */
69
public DateTimeBucketAssigner();
70
71
/**
72
* Creates DateTimeBucketAssigner with custom date/time format string
73
* @param formatString Format string for DateTimeFormatter to determine bucket path
74
*/
75
public DateTimeBucketAssigner(String formatString);
76
77
/**
78
* Creates DateTimeBucketAssigner with default format using specified timezone
79
* @param zoneId The timezone for DateTimeFormatter
80
*/
81
public DateTimeBucketAssigner(ZoneId zoneId);
82
83
/**
84
* Creates DateTimeBucketAssigner with custom format and timezone
85
* @param formatString Format string for DateTimeFormatter
86
* @param zoneId The timezone for DateTimeFormatter
87
*/
88
public DateTimeBucketAssigner(String formatString, ZoneId zoneId);
89
90
@Override
91
public String getBucketId(IN element, BucketAssigner.Context context);
92
93
@Override
94
public SimpleVersionedSerializer<String> getSerializer();
95
}
96
```
97
98
**Usage Examples:**
99
100
```java
101
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
102
import java.time.ZoneId;
103
104
// Default hourly bucketing: "yyyy-MM-dd--HH"
105
BucketAssigner<String, String> hourlyAssigner = new DateTimeBucketAssigner<>();
106
107
// Daily bucketing
108
BucketAssigner<String, String> dailyAssigner =
109
new DateTimeBucketAssigner<>("yyyy-MM-dd");
110
111
// Hourly bucketing with custom timezone
112
BucketAssigner<String, String> timezoneAssigner =
113
new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("America/New_York"));
114
115
// Minute-level bucketing
116
BucketAssigner<String, String> minuteAssigner =
117
new DateTimeBucketAssigner<>("yyyy-MM-dd/HH/mm");
118
119
// Example bucket paths generated:
120
// "2023-12-31--14" (hourly)
121
// "2023-12-31" (daily)
122
// "2023-12-31/14/30" (minute-level)
123
```
124
125
### BasePathBucketAssigner
126
127
Simple bucket assigner that writes all files to the base path without additional bucketing.
128
129
```java { .api }
130
/**
131
* BucketAssigner that does not perform any bucketing of files
132
* All files are written to the base path
133
*/
134
public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {
135
@Override
136
public String getBucketId(T element, BucketAssigner.Context context);
137
138
@Override
139
public SimpleVersionedSerializer<String> getSerializer();
140
}
141
```
142
143
**Usage Example:**
144
145
```java
146
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
147
148
// No bucketing - all files in base directory
149
BucketAssigner<MyEvent, String> noBucketing = new BasePathBucketAssigner<>();
150
```
151
152
### SimpleVersionedStringSerializer
153
154
Utility serializer for string-based bucket identifiers.
155
156
```java { .api }
157
/**
158
* SimpleVersionedSerializer implementation for Strings
159
*/
160
public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer<String> {
161
/** Singleton instance */
162
public static final SimpleVersionedStringSerializer INSTANCE;
163
164
@Override
165
public int getVersion();
166
167
@Override
168
public byte[] serialize(String value);
169
170
@Override
171
public String deserialize(int version, byte[] serialized) throws IOException;
172
}
173
```
174
175
## Custom Bucket Assignment
176
177
You can implement custom bucket assignment logic by implementing the `BucketAssigner` interface:
178
179
```java
180
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
181
import org.apache.flink.core.io.SimpleVersionedSerializer;
182
183
public class CustomBucketAssigner implements BucketAssigner<MyEvent, String> {
184
@Override
185
public String getBucketId(MyEvent element, BucketAssigner.Context context) {
186
// Custom logic based on element properties
187
if (element.getPriority() == Priority.HIGH) {
188
return "high-priority/" + element.getCategory();
189
} else {
190
return "normal/" + element.getCategory();
191
}
192
}
193
194
@Override
195
public SimpleVersionedSerializer<String> getSerializer() {
196
return SimpleVersionedStringSerializer.INSTANCE;
197
}
198
}
199
```
200
201
## Error Handling
202
203
- `getBucketId()` should not return null
204
- Bucket IDs must implement proper `hashCode()` and `equals()` methods
205
- The `toString()` of the bucket ID becomes part of the file path
206
- Serialization failures will cause job failures and require job restart