0
# Sink Implementations
1
2
The filesystem connector provides two main sink implementations for writing streaming data to Hadoop-compatible filesystems with fault tolerance and exactly-once semantics.
3
4
## BucketingSink
5
6
Modern sink implementation that can write to multiple buckets concurrently, providing better performance and more flexible bucketing strategies.
7
8
### Constructor
9
10
```java { .api }
11
public BucketingSink(String basePath)
12
```
13
14
Creates a new BucketingSink with the specified base path where bucket directories will be created.
15
16
**Parameters:**
17
- `basePath` - Base directory path where all buckets will be created
18
19
### Configuration Methods
20
21
```java { .api }
22
public BucketingSink<T> setBatchSize(long batchSize)
23
```
24
25
Sets the maximum size for part files before rolling to a new file (default: 384MB).
26
27
**Parameters:**
28
- `batchSize` - Maximum file size in bytes before rolling
29
30
**Returns:** The BucketingSink instance for method chaining
31
32
```java { .api }
33
public BucketingSink<T> setBucketer(Bucketer<T> bucketer)
34
```
35
36
Sets the bucketing strategy for organizing files into directories.
37
38
**Parameters:**
39
- `bucketer` - Bucketing strategy implementation
40
41
**Returns:** The BucketingSink instance for method chaining
42
43
```java { .api }
44
public BucketingSink<T> setWriter(Writer<T> writer)
45
```
46
47
Sets the writer implementation for handling file I/O.
48
49
**Parameters:**
50
- `writer` - Writer implementation for the specific data format
51
52
**Returns:** The BucketingSink instance for method chaining
53
54
```java { .api }
55
public BucketingSink<T> setPartPrefix(String partPrefix)
56
```
57
58
Sets the prefix for part file names (default: "part").
59
60
**Parameters:**
61
- `partPrefix` - Prefix string for part files
62
63
**Returns:** The BucketingSink instance for method chaining
64
65
```java { .api }
66
public BucketingSink<T> setInactiveBucketCheckInterval(long interval)
67
```
68
69
Sets the interval for checking inactive buckets (default: 60000ms).
70
71
**Parameters:**
72
- `interval` - Check interval in milliseconds
73
74
**Returns:** The BucketingSink instance for method chaining
75
76
```java { .api }
77
public BucketingSink<T> setInactiveBucketThreshold(long threshold)
78
```
79
80
Sets the threshold for considering buckets inactive (default: 60000ms).
81
82
**Parameters:**
83
- `threshold` - Inactivity threshold in milliseconds
84
85
**Returns:** The BucketingSink instance for method chaining
86
87
### File State Configuration
88
89
```java { .api }
90
public BucketingSink<T> setInProgressSuffix(String inProgressSuffix)
91
```
92
93
Sets suffix for files currently being written to.
94
95
```java { .api }
96
public BucketingSink<T> setInProgressPrefix(String inProgressPrefix)
97
```
98
99
Sets prefix for files currently being written to.
100
101
```java { .api }
102
public BucketingSink<T> setPendingSuffix(String pendingSuffix)
103
```
104
105
Sets suffix for files waiting for checkpoint confirmation.
106
107
```java { .api }
108
public BucketingSink<T> setPendingPrefix(String pendingPrefix)
109
```
110
111
Sets prefix for files waiting for checkpoint confirmation.
112
113
```java { .api }
114
public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix)
115
```
116
117
Sets suffix for valid-length tracking files.
118
119
```java { .api }
120
public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix)
121
```
122
123
Sets prefix for valid-length tracking files.
124
125
### Filesystem Configuration
126
127
```java { .api }
128
public BucketingSink<T> setFSConfig(org.apache.flink.configuration.Configuration config)
129
```
130
131
Sets Flink configuration for the filesystem.
132
133
```java { .api }
134
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config)
135
```
136
137
Sets Hadoop configuration for the filesystem.
138
139
### Advanced Configuration
140
141
```java { .api }
142
public BucketingSink<T> setAsyncTimeout(long timeout)
143
```
144
145
Sets timeout for asynchronous operations.
146
147
**Parameters:**
148
- `timeout` - Timeout in milliseconds
149
150
**Returns:** The BucketingSink instance for method chaining
151
152
## RollingSink (Deprecated)
153
154
Legacy sink implementation that maintains a single active bucket at a time.
155
156
**Note:** RollingSink is deprecated. Use BucketingSink for new applications.
157
158
### Constructor
159
160
```java { .api }
161
@Deprecated
162
public RollingSink(String basePath)
163
```
164
165
Creates a new RollingSink with the specified base path.
166
167
### Configuration Methods
168
169
The RollingSink provides similar configuration methods to BucketingSink but with different bucketing behavior:
170
171
```java { .api }
172
@Deprecated
173
public RollingSink<T> setBatchSize(long batchSize)
174
@Deprecated
175
public RollingSink<T> setBucketer(org.apache.flink.streaming.connectors.fs.Bucketer bucketer)
176
@Deprecated
177
public RollingSink<T> setWriter(Writer<T> writer)
178
```
179
180
## Usage Examples
181
182
### Basic String Output
183
184
```java
185
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
186
import org.apache.flink.streaming.connectors.fs.StringWriter;
187
188
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
189
sink.setWriter(new StringWriter<String>())
190
.setBatchSize(1024 * 1024 * 64); // 64MB files
191
192
dataStream.addSink(sink);
193
```
194
195
### Time-based Bucketing
196
197
```java
198
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
199
200
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
201
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd/HH"))
202
.setWriter(new StringWriter<String>());
203
204
dataStream.addSink(sink);
205
```
206
207
### Custom File Naming
208
209
```java
210
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
211
sink.setPartPrefix("data")
212
.setInProgressSuffix(".tmp")
213
.setPendingSuffix(".pending")
214
.setWriter(new StringWriter<String>());
215
216
dataStream.addSink(sink);
217
```