or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

storage-factory.mddocs/

0

# Storage Factory and Configuration

1

2

Factory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems. The factory handles service discovery and provides convenient configuration methods for setting up changelog storage.

3

4

## Capabilities

5

6

### FsStateChangelogStorageFactory

7

8

Main factory class implementing the StateChangelogStorageFactory interface for filesystem-based changelog storage.

9

10

```java { .api }

11

/**

12

* Factory for creating FsStateChangelogStorage instances.

13

* Registered as a service for automatic discovery by Flink's StateChangelogStorageFactory loading mechanism.

14

*/

15

public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {

16

17

/** Identifier for filesystem-based changelog storage */

18

public static final String IDENTIFIER = "filesystem";

19

20

/**

21

* Returns the identifier for this storage factory

22

* @return "filesystem" identifier

23

*/

24

public String getIdentifier();

25

26

/**

27

* Creates a new changelog storage instance for write operations

28

* @param jobID Job identifier for the storage

29

* @param configuration Flink configuration containing storage settings

30

* @param metricGroup Metric group for collecting storage metrics

31

* @param localRecoveryConfig Configuration for local recovery features

32

* @return FsStateChangelogStorage instance for the job

33

* @throws IOException if storage initialization fails

34

*/

35

public StateChangelogStorage<?> createStorage(

36

JobID jobID,

37

Configuration configuration,

38

TaskManagerJobMetricGroup metricGroup,

39

LocalRecoveryConfig localRecoveryConfig

40

) throws IOException;

41

42

/**

43

* Creates a storage view for recovery operations (read-only)

44

* @param configuration Flink configuration

45

* @return FsStateChangelogStorageForRecovery instance for reading persisted changelogs

46

*/

47

public StateChangelogStorageView<?> createStorageView(Configuration configuration);

48

49

/**

50

* Static helper method to configure changelog storage settings

51

* @param configuration Configuration object to modify

52

* @param newFolder Base directory for changelog files

53

* @param uploadTimeout Timeout for upload operations

54

* @param maxUploadAttempts Maximum number of retry attempts for failed uploads

55

*/

56

public static void configure(

57

Configuration configuration,

58

File newFolder,

59

Duration uploadTimeout,

60

int maxUploadAttempts

61

);

62

}

63

```

64

65

**Usage Examples:**

66

67

```java

68

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;

69

import org.apache.flink.configuration.Configuration;

70

import org.apache.flink.api.common.JobID;

71

72

// Configure storage using the static helper

73

Configuration config = new Configuration();

74

FsStateChangelogStorageFactory.configure(

75

config,

76

new File("/hdfs/changelog"),

77

Duration.ofSeconds(30),

78

5

79

);

80

81

// Create factory instance

82

FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

83

84

// Verify identifier

85

assert "filesystem".equals(factory.getIdentifier());

86

87

// Create storage for write operations

88

FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(

89

new JobID(),

90

config,

91

taskManagerJobMetricGroup,

92

localRecoveryConfig

93

);

94

95

// Create storage view for recovery

96

FsStateChangelogStorageForRecovery recoveryStorage =

97

(FsStateChangelogStorageForRecovery) factory.createStorageView(config);

98

```

99

100

### Service Registration

101

102

The factory is automatically registered via Java's ServiceLoader mechanism:

103

104

```java { .api }

105

// META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory

106

org.apache.flink.changelog.fs.FsStateChangelogStorageFactory

107

```

108

109

This enables automatic discovery by Flink's changelog storage loading system when the "filesystem" identifier is specified in configuration.

110

111

### Configuration Integration

112

113

The factory integrates with Flink's configuration system using the options defined in `FsStateChangelogOptions`:

114

115

```java

116

import static org.apache.flink.changelog.fs.FsStateChangelogOptions.*;

117

import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;

118

119

// Manual configuration

120

Configuration config = new Configuration();

121

config.set(STATE_CHANGE_LOG_STORAGE, "filesystem");

122

config.set(BASE_PATH, "/path/to/changelog/storage");

123

config.set(UPLOAD_TIMEOUT, Duration.ofSeconds(10));

124

config.set(RETRY_MAX_ATTEMPTS, 3);

125

config.set(COMPRESSION_ENABLED, true);

126

```

127

128

### Error Handling

129

130

The factory methods handle various initialization errors:

131

132

- **IOException** from `createStorage()` when filesystem access fails or configuration is invalid

133

- **IllegalArgumentException** when required configuration options are missing or invalid

134

- **FileSystem connectivity issues** during storage initialization

135

- **Permission errors** when accessing the configured base path

136

137

```java

138

try {

139

StateChangelogStorage<?> storage = factory.createStorage(

140

jobId, config, metricGroup, localRecoveryConfig

141

);

142

} catch (IOException e) {

143

// Handle storage initialization failure

144

log.error("Failed to initialize changelog storage", e);

145

throw new RuntimeException("Changelog storage setup failed", e);

146

}

147

```