or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucketers.mdindex.mdsinks.mdutilities.mdwriters.md

sinks.mddocs/

0

# Sink Implementations

1

2

The filesystem connector provides two main sink implementations for writing streaming data to Hadoop-compatible filesystems with fault tolerance and exactly-once semantics.

3

4

## BucketingSink

5

6

Modern sink implementation that can write to multiple buckets concurrently, providing better performance and more flexible bucketing strategies.

7

8

### Constructor

9

10

```java { .api }

11

public BucketingSink(String basePath)

12

```

13

14

Creates a new BucketingSink with the specified base path where bucket directories will be created.

15

16

**Parameters:**

17

- `basePath` - Base directory path where all buckets will be created

18

19

### Configuration Methods

20

21

```java { .api }

22

public BucketingSink<T> setBatchSize(long batchSize)

23

```

24

25

Sets the maximum size for part files before rolling to a new file (default: 384MB).

26

27

**Parameters:**

28

- `batchSize` - Maximum file size in bytes before rolling

29

30

**Returns:** The BucketingSink instance for method chaining

31

32

```java { .api }

33

public BucketingSink<T> setBucketer(Bucketer<T> bucketer)

34

```

35

36

Sets the bucketing strategy for organizing files into directories.

37

38

**Parameters:**

39

- `bucketer` - Bucketing strategy implementation

40

41

**Returns:** The BucketingSink instance for method chaining

42

43

```java { .api }

44

public BucketingSink<T> setWriter(Writer<T> writer)

45

```

46

47

Sets the writer implementation for handling file I/O.

48

49

**Parameters:**

50

- `writer` - Writer implementation for the specific data format

51

52

**Returns:** The BucketingSink instance for method chaining

53

54

```java { .api }

55

public BucketingSink<T> setPartPrefix(String partPrefix)

56

```

57

58

Sets the prefix for part file names (default: "part").

59

60

**Parameters:**

61

- `partPrefix` - Prefix string for part files

62

63

**Returns:** The BucketingSink instance for method chaining

64

65

```java { .api }

66

public BucketingSink<T> setInactiveBucketCheckInterval(long interval)

67

```

68

69

Sets the interval for checking inactive buckets (default: 60000ms).

70

71

**Parameters:**

72

- `interval` - Check interval in milliseconds

73

74

**Returns:** The BucketingSink instance for method chaining

75

76

```java { .api }

77

public BucketingSink<T> setInactiveBucketThreshold(long threshold)

78

```

79

80

Sets the threshold for considering buckets inactive (default: 60000ms).

81

82

**Parameters:**

83

- `threshold` - Inactivity threshold in milliseconds

84

85

**Returns:** The BucketingSink instance for method chaining

86

87

### File State Configuration

88

89

```java { .api }

90

public BucketingSink<T> setInProgressSuffix(String inProgressSuffix)

91

```

92

93

Sets suffix for files currently being written to.

94

95

```java { .api }

96

public BucketingSink<T> setInProgressPrefix(String inProgressPrefix)

97

```

98

99

Sets prefix for files currently being written to.

100

101

```java { .api }

102

public BucketingSink<T> setPendingSuffix(String pendingSuffix)

103

```

104

105

Sets suffix for files waiting for checkpoint confirmation.

106

107

```java { .api }

108

public BucketingSink<T> setPendingPrefix(String pendingPrefix)

109

```

110

111

Sets prefix for files waiting for checkpoint confirmation.

112

113

```java { .api }

114

public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix)

115

```

116

117

Sets suffix for valid-length tracking files.

118

119

```java { .api }

120

public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix)

121

```

122

123

Sets prefix for valid-length tracking files.

124

125

### Filesystem Configuration

126

127

```java { .api }

128

public BucketingSink<T> setFSConfig(org.apache.flink.configuration.Configuration config)

129

```

130

131

Sets Flink configuration for the filesystem.

132

133

```java { .api }

134

public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config)

135

```

136

137

Sets Hadoop configuration for the filesystem.

138

139

### Advanced Configuration

140

141

```java { .api }

142

public BucketingSink<T> setAsyncTimeout(long timeout)

143

```

144

145

Sets timeout for asynchronous operations.

146

147

**Parameters:**

148

- `timeout` - Timeout in milliseconds

149

150

**Returns:** The BucketingSink instance for method chaining

151

152

## RollingSink (Deprecated)

153

154

Legacy sink implementation that maintains a single active bucket at a time.

155

156

**Note:** RollingSink is deprecated. Use BucketingSink for new applications.

157

158

### Constructor

159

160

```java { .api }

161

@Deprecated

162

public RollingSink(String basePath)

163

```

164

165

Creates a new RollingSink with the specified base path.

166

167

### Configuration Methods

168

169

The RollingSink provides similar configuration methods to BucketingSink but with different bucketing behavior:

170

171

```java { .api }

172

@Deprecated

173

public RollingSink<T> setBatchSize(long batchSize)

174

@Deprecated

175

public RollingSink<T> setBucketer(org.apache.flink.streaming.connectors.fs.Bucketer bucketer)

176

@Deprecated

177

public RollingSink<T> setWriter(Writer<T> writer)

178

```

179

180

## Usage Examples

181

182

### Basic String Output

183

184

```java

185

import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;

186

import org.apache.flink.streaming.connectors.fs.StringWriter;

187

188

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

189

sink.setWriter(new StringWriter<String>())

190

.setBatchSize(1024 * 1024 * 64); // 64MB files

191

192

dataStream.addSink(sink);

193

```

194

195

### Time-based Bucketing

196

197

```java

198

import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

199

200

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

201

sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd/HH"))

202

.setWriter(new StringWriter<String>());

203

204

dataStream.addSink(sink);

205

```

206

207

### Custom File Naming

208

209

```java

210

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

211

sink.setPartPrefix("data")

212

.setInProgressSuffix(".tmp")

213

.setPendingSuffix(".pending")

214

.setWriter(new StringWriter<String>());

215

216

dataStream.addSink(sink);

217

```