or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-gs-fs-hadoop

Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-gs-fs-hadoop@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-gs-fs-hadoop@2.1.0

0

# Flink GS FileSystem Hadoop

1

2

Flink GS FileSystem Hadoop provides a Google Cloud Storage (GCS) filesystem plugin for Apache Flink that enables reading from and writing to GCS buckets using the `gs://` URI scheme. The plugin integrates seamlessly with Flink's FileSystem interface and provides fault-tolerant streaming through recoverable writers, making it ideal for checkpointing, state storage, and data processing workflows.

3

4

## Package Information

5

6

- **Package Name**: flink-gs-fs-hadoop

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-gs-fs-hadoop

10

- **Language**: Java

11

- **Version**: 2.1.0

12

- **License**: Apache License 2.0

13

- **Installation**: Add to Maven dependencies or use Flink's plugin system

14

15

## Core Usage

16

17

The filesystem is automatically registered with Flink through the service provider interface and can be used directly with `gs://` URIs:

18

19

```java

20

import org.apache.flink.core.fs.FileSystem;

21

import org.apache.flink.core.fs.Path;

22

23

// FileSystem is automatically discovered and configured

24

Path gcsPath = new Path("gs://my-bucket/data/file.txt");

25

FileSystem fs = gcsPath.getFileSystem();

26

27

// Use for reading/writing files

28

FSDataInputStream inputStream = fs.open(gcsPath);

29

FSDataOutputStream outputStream = fs.create(gcsPath, WriteMode.OVERWRITE);

30

```

31

32

## Basic Configuration

33

34

Configure the filesystem through Flink configuration using `gs.*` prefixes:

35

36

```properties

37

# Authentication via service account

38

fs.gs.auth.service.account.enable=true

39

fs.gs.auth.service.account.json.keyfile=/path/to/service-account.json

40

41

# Performance tuning

42

gs.writer.chunk.size=8MB

43

gs.filesink.entropy.enabled=true

44

45

# Retry configuration

46

gs.retry.max-attempt=10

47

gs.retry.total-timeout=300s

48

```

49

50

## Architecture

51

52

The plugin is built on several key components:

53

54

- **GSFileSystemFactory**: Main entry point that creates and configures filesystem instances

55

- **GSFileSystem**: Core filesystem implementation extending Hadoop FileSystem with recoverable writer support

56

- **GSRecoverableWriter**: Fault-tolerant writer system for streaming applications with exactly-once guarantees

57

- **GSBlobStorage**: Abstraction layer over Google Cloud Storage operations

58

- **Configuration System**: Comprehensive options for performance tuning, authentication, and retry behavior

59

60

The implementation leverages Google's Cloud Storage SDK and GCS Connector for Hadoop, providing enterprise-grade reliability and performance optimizations.

61

62

## Capabilities

63

64

### FileSystem Factory and Configuration

65

66

Core filesystem factory and configuration management for integrating GCS with Flink applications.

67

68

```java { .api }

69

// Main factory class registered via META-INF services

70

public class GSFileSystemFactory implements FileSystemFactory {

71

public static final String SCHEME = "gs";

72

73

public void configure(Configuration flinkConfig);

74

public String getScheme();

75

public FileSystem create(URI fsUri) throws IOException;

76

}

77

78

// Configuration options container

79

public class GSFileSystemOptions {

80

public Optional<String> getWriterTemporaryBucketName();

81

public Optional<MemorySize> getWriterChunkSize();

82

public Boolean isFileSinkEntropyEnabled();

83

public Optional<Integer> getHTTPConnectionTimeout();

84

public Optional<Integer> getHTTPReadTimeout();

85

// ... retry configuration methods

86

}

87

```

88

89

[FileSystem Configuration](./filesystem-configuration.md)

90

91

### Recoverable Writer System

92

93

Fault-tolerant streaming write system providing exactly-once guarantees for Flink streaming applications.

94

95

```java { .api }

96

// Main recoverable writer interface

97

public class GSRecoverableWriter implements RecoverableWriter {

98

public boolean requiresCleanupOfRecoverableState();

99

public boolean supportsResume();

100

public RecoverableFsDataOutputStream open(Path path) throws IOException;

101

public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);

102

public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);

103

public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();

104

public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();

105

}

106

107

// State objects for recovery

108

public class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {

109

public final GSBlobIdentifier finalBlobIdentifier;

110

public final List<UUID> componentObjectIds;

111

}

112

113

public class GSResumeRecoverable extends GSCommitRecoverable

114

implements RecoverableWriter.ResumeRecoverable {

115

public final long position;

116

public final boolean closed;

117

}

118

```

119

120

[Recoverable Writer](./recoverable-writer.md)

121

122

### Storage Operations

123

124

Low-level Google Cloud Storage operations abstraction for blob management and data operations.

125

126

```java { .api }

127

// Storage abstraction interface

128

public interface GSBlobStorage {

129

WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);

130

WriteChannel writeBlob(GSBlobIdentifier blobIdentifier, MemorySize chunkSize);

131

void createBlob(GSBlobIdentifier blobIdentifier);

132

Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);

133

List<GSBlobIdentifier> list(String bucketName, String prefix);

134

void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier);

135

void compose(List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier);

136

List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);

137

}

138

139

// Blob identifier abstraction

140

public class GSBlobIdentifier {

141

public final String bucketName;

142

public final String objectName;

143

144

public GSBlobIdentifier(String bucketName, String objectName);

145

public BlobId getBlobId();

146

public static GSBlobIdentifier fromBlobId(BlobId blobId);

147

}

148

```

149

150

[Storage Operations](./storage-operations.md)

151

152

## Configuration Options

153

154

All configuration options use Flink's Configuration system with `gs.*` prefixes:

155

156

| Option | Type | Description |

157

|--------|------|-------------|

158

| `gs.writer.temporary.bucket.name` | String | Bucket for temporary files during recoverable writes |

159

| `gs.writer.chunk.size` | MemorySize | Upload chunk size (must be multiple of 256KB) |

160

| `gs.filesink.entropy.enabled` | Boolean | Enable entropy injection to reduce hotspots (default: false) |

161

| `gs.http.connect-timeout` | Integer | HTTP connection timeout (milliseconds) |

162

| `gs.http.read-timeout` | Integer | HTTP read timeout (milliseconds) |

163

| `gs.retry.max-attempt` | Integer | Maximum retry attempts |

164

| `gs.retry.init-rpc-timeout` | Duration | Initial RPC timeout |

165

| `gs.retry.rpc-timeout-multiplier` | Double | RPC timeout multiplier |

166

| `gs.retry.max-rpc-timeout` | Duration | Maximum RPC timeout |

167

| `gs.retry.total-timeout` | Duration | Total timeout for retries |

168

169

## Common Use Cases

170

171

- **Checkpointing**: Store Flink application checkpoints in GCS for fault tolerance

172

- **State Backend**: Use GCS as a distributed state backend for large-state applications

173

- **Data Ingestion**: Read data files from GCS buckets for batch and streaming processing

174

- **Data Output**: Write processed results to GCS with exactly-once guarantees

175

- **File Sink**: Use FileSink connector to write streaming data to GCS in various formats

176

177

## Error Handling

178

179

The plugin handles various error scenarios:

180

181

- **Authentication Failures**: Clear error messages for credential issues

182

- **Network Timeouts**: Configurable retry policies with exponential backoff

183

- **Storage Errors**: Proper exception propagation with context information

184

- **Recovery Scenarios**: Automatic cleanup and recovery for interrupted operations