or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-configuration.mdindex.mdrecoverable-writer.mdstorage-operations.md

filesystem-configuration.mddocs/

0

# FileSystem Configuration

1

2

The Flink GS FileSystem plugin provides comprehensive configuration management for integrating Google Cloud Storage with Flink applications. The system handles authentication, performance tuning, and filesystem creation through a factory pattern.

3

4

## Capabilities

5

6

### GSFileSystemFactory

7

8

Main factory class for creating and configuring GS filesystem instances. Registered automatically with Flink's plugin system.

9

10

```java { .api }

11

/**

12

* Implementation of the Flink FileSystemFactory interface for Google Storage.

13

* Automatically registered via META-INF/services/org.apache.flink.core.fs.FileSystemFactory

14

*/

15

public class GSFileSystemFactory implements FileSystemFactory {

16

/** The scheme for the Google Storage file system */

17

public static final String SCHEME = "gs";

18

19

/** Constructs the Google Storage file system factory */

20

public GSFileSystemFactory();

21

22

/**

23

* Configure the factory with Flink configuration

24

* @param flinkConfig The Flink configuration

25

*/

26

public void configure(Configuration flinkConfig);

27

28

/**

29

* Get the filesystem scheme

30

* @return "gs"

31

*/

32

public String getScheme();

33

34

/**

35

* Create a filesystem instance for the given URI

36

* @param fsUri The filesystem URI (must have gs:// scheme)

37

* @return GSFileSystem instance

38

* @throws IOException If filesystem creation fails

39

*/

40

public FileSystem create(URI fsUri) throws IOException;

41

}

42

```

43

44

**Usage Example:**

45

46

```java

47

import org.apache.flink.configuration.Configuration;

48

import org.apache.flink.fs.gs.GSFileSystemFactory;

49

50

// Factory is automatically instantiated and configured by Flink

51

// Users typically don't interact with it directly

52

GSFileSystemFactory factory = new GSFileSystemFactory();

53

Configuration config = new Configuration();

54

config.setString("gs.auth.service.account.json.keyfile", "/path/to/key.json");

55

factory.configure(config);

56

57

URI gcsUri = URI.create("gs://my-bucket/path");

58

FileSystem fs = factory.create(gcsUri);

59

```

60

61

### GSFileSystemOptions

62

63

Configuration options container that manages all filesystem and writer settings.

64

65

```java { .api }

66

/**

67

* Configuration options for the GS filesystem and recoverable writer

68

*/

69

public class GSFileSystemOptions {

70

71

// Configuration option constants

72

public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME;

73

public static final ConfigOption<MemorySize> WRITER_CHUNK_SIZE;

74

public static final ConfigOption<Boolean> ENABLE_FILESINK_ENTROPY;

75

public static final ConfigOption<Integer> GCS_HTTP_CONNECT_TIMEOUT;

76

public static final ConfigOption<Integer> GCS_HTTP_READ_TIMEOUT;

77

public static final ConfigOption<Integer> GCS_RETRY_MAX_ATTEMPT;

78

public static final ConfigOption<Duration> GCS_RETRY_INIT_RPC_TIMEOUT;

79

public static final ConfigOption<Double> GCS_RETRY_RPC_TIMEOUT_MULTIPLIER;

80

public static final ConfigOption<Duration> GCS_RETRY_MAX_RPC_TIMEOUT;

81

public static final ConfigOption<Duration> GCS_RETRY_TOTAL_TIMEOUT;

82

83

/**

84

* Constructs an options instance from Flink configuration

85

* @param flinkConfig The Flink configuration

86

*/

87

public GSFileSystemOptions(Configuration flinkConfig);

88

89

/**

90

* Get temporary bucket name for recoverable writes

91

* @return Optional temporary bucket name

92

*/

93

public Optional<String> getWriterTemporaryBucketName();

94

95

/**

96

* Get chunk size for writes to Google Storage

97

* @return Optional chunk size (must be multiple of 256KB)

98

*/

99

public Optional<MemorySize> getWriterChunkSize();

100

101

/**

102

* Check if entropy injection is enabled for FileSink paths

103

* @return true if entropy injection is enabled

104

*/

105

public Boolean isFileSinkEntropyEnabled();

106

107

/**

108

* Get HTTP connection timeout

109

* @return Optional connection timeout in milliseconds

110

*/

111

public Optional<Integer> getHTTPConnectionTimeout();

112

113

/**

114

* Get HTTP read timeout

115

* @return Optional read timeout in milliseconds

116

*/

117

public Optional<Integer> getHTTPReadTimeout();

118

119

/**

120

* Get maximum retry attempts for operations

121

* @return Optional maximum attempts

122

*/

123

public Optional<Integer> getMaxAttempts();

124

125

/**

126

* Get initial RPC timeout for retry operations

127

* @return Optional initial timeout duration

128

*/

129

public Optional<org.threeten.bp.Duration> getInitialRpcTimeout();

130

131

/**

132

* Get RPC timeout multiplier for retry backoff

133

* @return Optional timeout multiplier

134

*/

135

public Optional<Double> getRpcTimeoutMultiplier();

136

137

/**

138

* Get maximum RPC timeout for retry operations

139

* @return Optional maximum RPC timeout duration

140

*/

141

public Optional<org.threeten.bp.Duration> getMaxRpcTimeout();

142

143

/**

144

* Get total timeout for all retry operations

145

* @return Optional total timeout duration

146

*/

147

public Optional<org.threeten.bp.Duration> getTotalTimeout();

148

}

149

```

150

151

**Configuration Examples:**

152

153

```java

154

import org.apache.flink.configuration.Configuration;

155

import org.apache.flink.configuration.MemorySize;

156

import org.apache.flink.fs.gs.GSFileSystemOptions;

157

158

// Create configuration

159

Configuration config = new Configuration();

160

161

// Writer configuration

162

config.setString("gs.writer.temporary.bucket.name", "temp-bucket");

163

config.set(GSFileSystemOptions.WRITER_CHUNK_SIZE, MemorySize.ofMebiBytes(8));

164

config.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, true);

165

166

// HTTP timeouts

167

config.setInteger("gs.http.connect-timeout", 30000);

168

config.setInteger("gs.http.read-timeout", 60000);

169

170

// Retry configuration

171

config.setInteger("gs.retry.max-attempt", 10);

172

config.setString("gs.retry.init-rpc-timeout", "5s");

173

config.setDouble("gs.retry.rpc-timeout-multiplier", 2.0);

174

config.setString("gs.retry.max-rpc-timeout", "60s");

175

config.setString("gs.retry.total-timeout", "300s");

176

177

// Create options instance

178

GSFileSystemOptions options = new GSFileSystemOptions(config);

179

```

180

181

### GSFileSystem

182

183

Core filesystem implementation that extends Hadoop FileSystem with recoverable writer support.

184

185

```java { .api }

186

/**

187

* FileSystem implementation that wraps GoogleHadoopFileSystem and supports RecoverableWriter

188

* Package-private - users interact through standard Flink FileSystem APIs

189

*/

190

class GSFileSystem extends HadoopFileSystem {

191

192

/**

193

* Create a recoverable writer for fault-tolerant streaming writes

194

* @return GSRecoverableWriter instance

195

*/

196

public RecoverableWriter createRecoverableWriter();

197

}

198

```

199

200

**Usage Example:**

201

202

```java

203

import org.apache.flink.core.fs.FileSystem;

204

import org.apache.flink.core.fs.Path;

205

import org.apache.flink.core.fs.RecoverableWriter;

206

207

// Get filesystem instance (automatically created by factory)

208

Path gcsPath = new Path("gs://my-bucket/data/");

209

FileSystem fs = gcsPath.getFileSystem();

210

211

// Use recoverable writer for streaming applications

212

RecoverableWriter recoverableWriter = fs.createRecoverableWriter();

213

RecoverableFsDataOutputStream outputStream = recoverableWriter.open(

214

new Path("gs://my-bucket/output/part-0")

215

);

216

```

217

218

## Configuration Properties

219

220

### Authentication Configuration

221

222

Configure authentication through Hadoop configuration properties:

223

224

```properties

225

# Enable service account authentication

226

google.cloud.auth.service.account.enable=true

227

228

# Path to service account JSON key file

229

google.cloud.auth.service.account.json.keyfile=/path/to/service-account.json

230

231

# Alternative: use environment variable GOOGLE_APPLICATION_CREDENTIALS

232

```

233

234

### Writer Configuration

235

236

Configure recoverable writer behavior:

237

238

```properties

239

# Temporary bucket for multi-part uploads (optional)

240

gs.writer.temporary.bucket.name=my-temp-bucket

241

242

# Upload chunk size - must be multiple of 256KB

243

gs.writer.chunk.size=8MB

244

245

# Enable entropy injection to reduce hotspots

246

gs.filesink.entropy.enabled=true

247

```

248

249

### Network Configuration

250

251

Configure HTTP client behavior:

252

253

```properties

254

# Connection timeout in milliseconds

255

gs.http.connect-timeout=30000

256

257

# Read timeout in milliseconds

258

gs.http.read-timeout=60000

259

```

260

261

### Retry Configuration

262

263

Configure retry behavior for transient failures:

264

265

```properties

266

# Maximum number of retry attempts

267

gs.retry.max-attempt=10

268

269

# Initial RPC timeout

270

gs.retry.init-rpc-timeout=5s

271

272

# Timeout multiplier for exponential backoff

273

gs.retry.rpc-timeout-multiplier=2.0

274

275

# Maximum RPC timeout

276

gs.retry.max-rpc-timeout=60s

277

278

# Total timeout for all retries

279

gs.retry.total-timeout=300s

280

```

281

282

## Integration with Flink

283

284

### Service Discovery

285

286

The filesystem factory is automatically registered with Flink through the service provider interface:

287

288

```

289

META-INF/services/org.apache.flink.core.fs.FileSystemFactory

290

```

291

292

This file contains:

293

```

294

org.apache.flink.fs.gs.GSFileSystemFactory

295

```

296

297

### Configuration Loading

298

299

The factory integrates with Flink's configuration system to load settings from:

300

301

1. **Flink Configuration**: Properties with `gs.*` prefixes

302

2. **Hadoop Configuration**: Properties with `fs.gs.*` prefixes from Hadoop config files

303

3. **Environment Variables**: `HADOOP_CONF_DIR`, `GOOGLE_APPLICATION_CREDENTIALS`

304

305

### Hadoop Integration

306

307

The implementation leverages Hadoop's configuration system and integrates with:

308

309

- **HadoopConfigLoader**: For loading Hadoop-style configurations

310

- **GoogleHadoopFileSystem**: Underlying GCS Hadoop connector

311

- **Configuration overlays**: Flink config takes precedence over Hadoop config