0
# Bucketing Strategies
1
2
Bucketers determine how streaming data is organized into directory structures within the base path. They control when new bucket directories are created and how elements are routed to appropriate buckets.
3
4
## Bucketer Interface
5
6
The modern bucketing interface used with BucketingSink.
7
8
```java { .api }
9
public interface Bucketer<T> extends Serializable
10
```
11
12
### Core Method
13
14
```java { .api }
15
Path getBucketPath(org.apache.flink.streaming.connectors.fs.Clock clock, org.apache.hadoop.fs.Path basePath, T element)
16
```
17
18
Returns the complete bucket path for the provided element.
19
20
**Parameters:**
21
- `clock` - Clock implementation for getting current time
22
- `basePath` - Base directory containing all buckets
23
- `element` - Current element being processed
24
25
**Returns:** Complete Path where the element should be written, including basePath and subtask index
26
27
## DateTimeBucketer
28
29
Creates buckets based on date and time patterns, organizing files into time-based directory structures.
30
31
### Constructors
32
33
```java { .api }
34
public DateTimeBucketer()
35
```
36
37
Creates a DateTimeBucketer with default format "yyyy-MM-dd--HH" (hourly buckets).
38
39
```java { .api }
40
public DateTimeBucketer(String formatString)
41
```
42
43
Creates a DateTimeBucketer with custom date format pattern.
44
45
**Parameters:**
46
- `formatString` - Java SimpleDateFormat pattern string
47
48
49
### Usage Examples
50
51
```java
52
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
53
54
// Hourly buckets (default): /basePath/2023-12-25--14/
55
DateTimeBucketer<String> hourly = new DateTimeBucketer<>();
56
57
// Daily buckets: /basePath/2023-12-25/
58
DateTimeBucketer<String> daily = new DateTimeBucketer<>("yyyy-MM-dd");
59
60
// Minute-level buckets: /basePath/2023/12/25/14/30/
61
DateTimeBucketer<String> minutely = new DateTimeBucketer<>("yyyy/MM/dd/HH/mm");
62
63
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
64
sink.setBucketer(hourly);
65
```
66
67
### Common Date Format Patterns
68
69
| Pattern | Example Output | Description |
70
|---------|----------------|-------------|
71
| `yyyy-MM-dd--HH` | `2023-12-25--14` | Default: hourly buckets |
72
| `yyyy-MM-dd` | `2023-12-25` | Daily buckets |
73
| `yyyy/MM/dd/HH` | `2023/12/25/14` | Hierarchical hourly |
74
| `yyyy-MM-dd/HH/mm` | `2023-12-25/14/30` | Minute-level buckets |
75
| `yyyy/MM` | `2023/12` | Monthly buckets |
76
| `'year='yyyy'/month='MM'/day='dd` | `year=2023/month=12/day=25` | Hive-style partitioning |
77
78
## BasePathBucketer
79
80
Uses the base path as the bucket directory without creating subdirectories. All files are written directly to the base path.
81
82
### Constructor
83
84
```java { .api }
85
public BasePathBucketer()
86
```
87
88
Creates a BasePathBucketer that writes all files to the base directory.
89
90
### Usage Example
91
92
```java
93
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
94
95
// All files written directly to /tmp/output/
96
BasePathBucketer<String> bucketer = new BasePathBucketer<>();
97
98
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
99
sink.setBucketer(bucketer);
100
```
101
102
## Legacy Bucketer Interface (Deprecated)
103
104
The original bucketing interface used with RollingSink.
105
106
```java { .api }
107
@Deprecated
108
public interface Bucketer extends Serializable
109
```
110
111
### Methods
112
113
```java { .api }
114
@Deprecated
115
boolean shouldStartNewBucket(org.apache.hadoop.fs.Path basePath, org.apache.hadoop.fs.Path currentBucketPath)
116
```
117
118
Determines if a new bucket should be started.
119
120
```java { .api }
121
@Deprecated
122
org.apache.hadoop.fs.Path getNextBucketPath(org.apache.hadoop.fs.Path basePath)
123
```
124
125
Returns the path for the next bucket.
126
127
### Legacy Implementations
128
129
#### DateTimeBucketer (Legacy)
130
131
```java { .api }
132
@Deprecated
133
public class DateTimeBucketer implements Bucketer
134
```
135
136
Legacy time-based bucketing for RollingSink.
137
138
#### NonRollingBucketer (Legacy)
139
140
```java { .api }
141
@Deprecated
142
public class NonRollingBucketer implements Bucketer
143
```
144
145
Legacy single-bucket strategy for RollingSink.
146
147
## Custom Bucketer Implementation
148
149
You can create custom bucketing strategies by implementing the Bucketer interface:
150
151
```java
152
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;
153
import org.apache.flink.streaming.connectors.fs.Clock;
154
import org.apache.hadoop.fs.Path;
155
156
public class CustomBucketer<T> implements Bucketer<T> {
157
158
@Override
159
public Path getBucketPath(Clock clock, Path basePath, T element) {
160
// Custom bucketing logic based on element properties
161
String bucketName = determineBucketName(element);
162
return new Path(basePath, bucketName);
163
}
164
165
private String determineBucketName(T element) {
166
// Example: bucket by string length
167
if (element instanceof String) {
168
String str = (String) element;
169
return "length-" + str.length();
170
}
171
return "default";
172
}
173
}
174
```
175
176
## Bucketing Best Practices
177
178
### Performance Considerations
179
180
1. **Avoid Too Many Small Buckets**: Excessive bucketing can create many small files, impacting performance
181
2. **Balance Bucket Size**: Consider your data volume and processing patterns
182
3. **HDFS Block Size**: Aim for file sizes that are multiples of HDFS block size (typically 128MB)
183
184
### Time-based Bucketing Guidelines
185
186
```java
187
// High-volume streams: Use larger time windows
188
DateTimeBucketer<String> hourly = new DateTimeBucketer<>("yyyy-MM-dd--HH");
189
190
// Medium-volume streams: Smaller windows acceptable
191
DateTimeBucketer<String> tenMinute = new DateTimeBucketer<>("yyyy-MM-dd--HH-mm");
192
193
// Low-volume streams: May use very granular bucketing
194
DateTimeBucketer<String> perMinute = new DateTimeBucketer<>("yyyy/MM/dd/HH/mm");
195
```
196
197
### Combining with Batch Size
198
199
```java
200
// Large buckets with smaller batch sizes for faster file rotation
201
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
202
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd")) // Daily buckets
203
.setBatchSize(64 * 1024 * 1024); // 64MB files
204
```
205
206
### Hive-Compatible Partitioning
207
208
```java
209
// Create Hive-compatible partition structure
210
DateTimeBucketer<String> hiveBucketer = new DateTimeBucketer<>(
211
"'year='yyyy'/month='MM'/day='dd'/hour='HH"
212
);
213
214
// Results in: /basePath/year=2023/month=12/day=25/hour=14/
215
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
216
sink.setBucketer(hiveBucketer);
217
```