or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucketers.mdindex.mdsinks.mdutilities.mdwriters.md

bucketers.mddocs/

0

# Bucketing Strategies

1

2

Bucketers determine how streaming data is organized into directory structures within the base path. They control when new bucket directories are created and how elements are routed to appropriate buckets.

3

4

## Bucketer Interface

5

6

The modern bucketing interface used with BucketingSink.

7

8

```java { .api }

9

public interface Bucketer<T> extends Serializable

10

```

11

12

### Core Method

13

14

```java { .api }

15

Path getBucketPath(org.apache.flink.streaming.connectors.fs.Clock clock, org.apache.hadoop.fs.Path basePath, T element)

16

```

17

18

Returns the complete bucket path for the provided element.

19

20

**Parameters:**

21

- `clock` - Clock implementation for getting current time

22

- `basePath` - Base directory containing all buckets

23

- `element` - Current element being processed

24

25

**Returns:** Complete Path where the element should be written, including basePath and subtask index

26

27

## DateTimeBucketer

28

29

Creates buckets based on date and time patterns, organizing files into time-based directory structures.

30

31

### Constructors

32

33

```java { .api }

34

public DateTimeBucketer()

35

```

36

37

Creates a DateTimeBucketer with default format "yyyy-MM-dd--HH" (hourly buckets).

38

39

```java { .api }

40

public DateTimeBucketer(String formatString)

41

```

42

43

Creates a DateTimeBucketer with custom date format pattern.

44

45

**Parameters:**

46

- `formatString` - Java SimpleDateFormat pattern string

47

48

49

### Usage Examples

50

51

```java

52

import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

53

54

// Hourly buckets (default): /basePath/2023-12-25--14/

55

DateTimeBucketer<String> hourly = new DateTimeBucketer<>();

56

57

// Daily buckets: /basePath/2023-12-25/

58

DateTimeBucketer<String> daily = new DateTimeBucketer<>("yyyy-MM-dd");

59

60

// Minute-level buckets: /basePath/2023/12/25/14/30/

61

DateTimeBucketer<String> minutely = new DateTimeBucketer<>("yyyy/MM/dd/HH/mm");

62

63

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

64

sink.setBucketer(hourly);

65

```

66

67

### Common Date Format Patterns

68

69

| Pattern | Example Output | Description |

70

|---------|----------------|-------------|

71

| `yyyy-MM-dd--HH` | `2023-12-25--14` | Default: hourly buckets |

72

| `yyyy-MM-dd` | `2023-12-25` | Daily buckets |

73

| `yyyy/MM/dd/HH` | `2023/12/25/14` | Hierarchical hourly |

74

| `yyyy-MM-dd/HH/mm` | `2023-12-25/14/30` | Minute-level buckets |

75

| `yyyy/MM` | `2023/12` | Monthly buckets |

76

| `'year='yyyy'/month='MM'/day='dd` | `year=2023/month=12/day=25` | Hive-style partitioning |

77

78

## BasePathBucketer

79

80

Uses the base path as the bucket directory without creating subdirectories. All files are written directly to the base path.

81

82

### Constructor

83

84

```java { .api }

85

public BasePathBucketer()

86

```

87

88

Creates a BasePathBucketer that writes all files to the base directory.

89

90

### Usage Example

91

92

```java

93

import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;

94

95

// All files written directly to /tmp/output/

96

BasePathBucketer<String> bucketer = new BasePathBucketer<>();

97

98

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

99

sink.setBucketer(bucketer);

100

```

101

102

## Legacy Bucketer Interface (Deprecated)

103

104

The original bucketing interface used with RollingSink.

105

106

```java { .api }

107

@Deprecated

108

public interface Bucketer extends Serializable

109

```

110

111

### Methods

112

113

```java { .api }

114

@Deprecated

115

boolean shouldStartNewBucket(org.apache.hadoop.fs.Path basePath, org.apache.hadoop.fs.Path currentBucketPath)

116

```

117

118

Determines if a new bucket should be started.

119

120

```java { .api }

121

@Deprecated

122

org.apache.hadoop.fs.Path getNextBucketPath(org.apache.hadoop.fs.Path basePath)

123

```

124

125

Returns the path for the next bucket.

126

127

### Legacy Implementations

128

129

#### DateTimeBucketer (Legacy)

130

131

```java { .api }

132

@Deprecated

133

public class DateTimeBucketer implements Bucketer

134

```

135

136

Legacy time-based bucketing for RollingSink.

137

138

#### NonRollingBucketer (Legacy)

139

140

```java { .api }

141

@Deprecated

142

public class NonRollingBucketer implements Bucketer

143

```

144

145

Legacy single-bucket strategy for RollingSink.

146

147

## Custom Bucketer Implementation

148

149

You can create custom bucketing strategies by implementing the Bucketer interface:

150

151

```java

152

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;

153

import org.apache.flink.streaming.connectors.fs.Clock;

154

import org.apache.hadoop.fs.Path;

155

156

public class CustomBucketer<T> implements Bucketer<T> {

157

158

@Override

159

public Path getBucketPath(Clock clock, Path basePath, T element) {

160

// Custom bucketing logic based on element properties

161

String bucketName = determineBucketName(element);

162

return new Path(basePath, bucketName);

163

}

164

165

private String determineBucketName(T element) {

166

// Example: bucket by string length

167

if (element instanceof String) {

168

String str = (String) element;

169

return "length-" + str.length();

170

}

171

return "default";

172

}

173

}

174

```

175

176

## Bucketing Best Practices

177

178

### Performance Considerations

179

180

1. **Avoid Too Many Small Buckets**: Excessive bucketing can create many small files, impacting performance

181

2. **Balance Bucket Size**: Consider your data volume and processing patterns

182

3. **HDFS Block Size**: Aim for file sizes that are multiples of HDFS block size (typically 128MB)

183

184

### Time-based Bucketing Guidelines

185

186

```java

187

// High-volume streams: Use larger time windows

188

DateTimeBucketer<String> hourly = new DateTimeBucketer<>("yyyy-MM-dd--HH");

189

190

// Medium-volume streams: Smaller windows acceptable

191

DateTimeBucketer<String> tenMinute = new DateTimeBucketer<>("yyyy-MM-dd--HH-mm");

192

193

// Low-volume streams: May use very granular bucketing

194

DateTimeBucketer<String> perMinute = new DateTimeBucketer<>("yyyy/MM/dd/HH/mm");

195

```

196

197

### Combining with Batch Size

198

199

```java

200

// Large buckets with smaller batch sizes for faster file rotation

201

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

202

sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd")) // Daily buckets

203

.setBatchSize(64 * 1024 * 1024); // 64MB files

204

```

205

206

### Hive-Compatible Partitioning

207

208

```java

209

// Create Hive-compatible partition structure

210

DateTimeBucketer<String> hiveBucketer = new DateTimeBucketer<>(

211

"'year='yyyy'/month='MM'/day='dd'/hour='HH"

212

);

213

214

// Results in: /basePath/year=2023/month=12/day=25/hour=14/

215

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

216

sink.setBucketer(hiveBucketer);

217

```