0
# Storage Factory and Configuration
1
2
Factory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems. The factory handles service discovery and provides convenient configuration methods for setting up changelog storage.
3
4
## Capabilities
5
6
### FsStateChangelogStorageFactory
7
8
Main factory class implementing the StateChangelogStorageFactory interface for filesystem-based changelog storage.
9
10
```java { .api }
11
/**
12
* Factory for creating FsStateChangelogStorage instances.
13
* Registered as a service for automatic discovery by Flink's StateChangelogStorageFactory loading mechanism.
14
*/
15
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
16
17
/** Identifier for filesystem-based changelog storage */
18
public static final String IDENTIFIER = "filesystem";
19
20
/**
21
* Returns the identifier for this storage factory
22
* @return "filesystem" identifier
23
*/
24
public String getIdentifier();
25
26
/**
27
* Creates a new changelog storage instance for write operations
28
* @param jobID Job identifier for the storage
29
* @param configuration Flink configuration containing storage settings
30
* @param metricGroup Metric group for collecting storage metrics
31
* @param localRecoveryConfig Configuration for local recovery features
32
* @return FsStateChangelogStorage instance for the job
33
* @throws IOException if storage initialization fails
34
*/
35
public StateChangelogStorage<?> createStorage(
36
JobID jobID,
37
Configuration configuration,
38
TaskManagerJobMetricGroup metricGroup,
39
LocalRecoveryConfig localRecoveryConfig
40
) throws IOException;
41
42
/**
43
* Creates a storage view for recovery operations (read-only)
44
* @param configuration Flink configuration
45
* @return FsStateChangelogStorageForRecovery instance for reading persisted changelogs
46
*/
47
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
48
49
/**
50
* Static helper method to configure changelog storage settings
51
* @param configuration Configuration object to modify
52
* @param newFolder Base directory for changelog files
53
* @param uploadTimeout Timeout for upload operations
54
* @param maxUploadAttempts Maximum number of retry attempts for failed uploads
55
*/
56
public static void configure(
57
Configuration configuration,
58
File newFolder,
59
Duration uploadTimeout,
60
int maxUploadAttempts
61
);
62
}
63
```
64
65
**Usage Examples:**
66
67
```java
68
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
69
import org.apache.flink.configuration.Configuration;
70
import org.apache.flink.api.common.JobID;
71
72
// Configure storage using the static helper
73
Configuration config = new Configuration();
74
FsStateChangelogStorageFactory.configure(
75
config,
76
new File("/hdfs/changelog"),
77
Duration.ofSeconds(30),
78
5
79
);
80
81
// Create factory instance
82
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
83
84
// Verify identifier
85
assert "filesystem".equals(factory.getIdentifier());
86
87
// Create storage for write operations
88
FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(
89
new JobID(),
90
config,
91
taskManagerJobMetricGroup,
92
localRecoveryConfig
93
);
94
95
// Create storage view for recovery
96
FsStateChangelogStorageForRecovery recoveryStorage =
97
(FsStateChangelogStorageForRecovery) factory.createStorageView(config);
98
```
99
100
### Service Registration
101
102
The factory is automatically registered via Java's ServiceLoader mechanism:
103
104
```java { .api }
105
// META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory
106
org.apache.flink.changelog.fs.FsStateChangelogStorageFactory
107
```
108
109
This enables automatic discovery by Flink's changelog storage loading system when the "filesystem" identifier is specified in configuration.
110
111
### Configuration Integration
112
113
The factory integrates with Flink's configuration system using the options defined in `FsStateChangelogOptions`:
114
115
```java
116
import static org.apache.flink.changelog.fs.FsStateChangelogOptions.*;
117
import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;
118
119
// Manual configuration
120
Configuration config = new Configuration();
121
config.set(STATE_CHANGE_LOG_STORAGE, "filesystem");
122
config.set(BASE_PATH, "/path/to/changelog/storage");
123
config.set(UPLOAD_TIMEOUT, Duration.ofSeconds(10));
124
config.set(RETRY_MAX_ATTEMPTS, 3);
125
config.set(COMPRESSION_ENABLED, true);
126
```
127
128
### Error Handling
129
130
The factory methods handle various initialization errors:
131
132
- **IOException** from `createStorage()` when filesystem access fails or configuration is invalid
133
- **IllegalArgumentException** when required configuration options are missing or invalid
134
- **FileSystem connectivity issues** during storage initialization
135
- **Permission errors** when accessing the configured base path
136
137
```java
138
try {
139
StateChangelogStorage<?> storage = factory.createStorage(
140
jobId, config, metricGroup, localRecoveryConfig
141
);
142
} catch (IOException e) {
143
// Handle storage initialization failure
144
log.error("Failed to initialize changelog storage", e);
145
throw new RuntimeException("Changelog storage setup failed", e);
146
}
147
```