0
# Flink Shaded Curator
1
2
Flink Shaded Curator is a shaded JAR library that provides Apache Curator's distributed coordination capabilities specifically designed for Apache Flink's distributed stream processing framework. It bundles Apache Curator dependencies with relocated classes to avoid classpath conflicts, enabling reliable ZooKeeper-based coordination services within Flink applications.
3
4
## Package Information
5
6
- **Package Name**: flink-shaded-curator
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-shaded-curator
11
- **Version**: 1.10.3
12
- **Installation**: Add to Maven dependencies:
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-shaded-curator</artifactId>
18
<version>1.10.3</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
import org.apache.curator.framework.CuratorFramework;
26
import org.apache.curator.framework.CuratorFrameworkFactory;
27
import org.apache.curator.framework.recipes.leader.LeaderLatch;
28
import org.apache.curator.framework.recipes.cache.NodeCache;
29
import org.apache.curator.framework.recipes.shared.SharedCount;
30
import org.apache.curator.retry.ExponentialBackoffRetry;
31
```
32
33
**Important**: This library relocates certain Guava classes to avoid conflicts. If you need to use Guava functions, import from the shaded namespace:
34
35
```java
36
import org.apache.flink.curator.shaded.com.google.common.base.Function;
37
import org.apache.flink.curator.shaded.com.google.common.base.Predicate;
38
```
39
40
## Basic Usage
41
42
```java
43
import org.apache.curator.framework.CuratorFramework;
44
import org.apache.curator.framework.CuratorFrameworkFactory;
45
import org.apache.curator.framework.recipes.leader.LeaderLatch;
46
import org.apache.curator.retry.ExponentialBackoffRetry;
47
48
// Create a Curator client (typical Flink usage pattern)
49
CuratorFramework client = CuratorFrameworkFactory.newClient(
50
"localhost:2181",
51
new ExponentialBackoffRetry(1000, 3)
52
);
53
client.start();
54
55
// Leader election (primary use case in Flink)
56
LeaderLatch leaderLatch = new LeaderLatch(client, "/flink/leader", "jobmanager-1");
57
leaderLatch.start();
58
59
try {
60
// Wait to become leader
61
leaderLatch.await();
62
System.out.println("I am the leader!");
63
64
// Do leader work...
65
66
} finally {
67
leaderLatch.close();
68
}
69
70
client.close();
71
```
72
73
## Architecture
74
75
Flink Shaded Curator is built around Apache Curator's distributed coordination patterns:
76
77
- **Dependency Isolation**: Curator libraries are bundled with selected Guava classes relocated to prevent conflicts with other Flink dependencies
78
- **ZooKeeper Integration**: All coordination patterns rely on ZooKeeper for distributed consensus and state management
79
- **Flink-Specific Usage**: Primarily used for JobManager leader election, checkpoint ID coordination, and high availability services
80
- **Recipe Patterns**: Provides high-level abstractions for distributed coordination, though Flink focuses on leader election and caching capabilities
81
- **Version Compatibility**: Bundles Curator 2.12.0 for compatibility with Flink 1.10.3's distributed architecture
82
83
## Shading Details
84
85
This library is specifically designed for Apache Flink and relocates certain dependencies to avoid classpath conflicts:
86
87
- **Curator packages**: Remain in original `org.apache.curator.*` namespace and provide full functionality
88
- **Selected Guava classes**: Only essential classes are included and relocated:
89
- `com.google.common.base.Function` → `org.apache.flink.curator.shaded.com.google.common.base.Function`
90
- `com.google.common.base.Predicate` → `org.apache.flink.curator.shaded.com.google.common.base.Predicate`
91
- `com.google.common.reflect.TypeToken` → `org.apache.flink.curator.shaded.com.google.common.reflect.TypeToken`
92
93
**Note**: This library bundles Apache Curator 2.12.0 and is primarily used within Flink for JobManager leader election, checkpoint coordination, and configuration management.
94
95
## Capabilities
96
97
### Leader Election
98
99
Leader election capabilities for coordinating which process should act as the primary in a distributed system. This is the primary use case within Flink for JobManager coordination.
100
101
```java { .api }
102
public class LeaderLatch implements Closeable {
103
public LeaderLatch(CuratorFramework client, String latchPath);
104
public LeaderLatch(CuratorFramework client, String latchPath, String id);
105
public void start() throws Exception;
106
public boolean hasLeadership();
107
public void await() throws InterruptedException;
108
}
109
110
public class LeaderSelector implements Closeable {
111
public LeaderSelector(CuratorFramework client, String mutexPath, LeaderSelectorListener listener);
112
public void start() throws IOException;
113
public void requeue() throws InterruptedException;
114
}
115
```
116
117
[Leader Election](./leader-election.md)
118
119
### Path Caching
120
121
Caching mechanisms for ZooKeeper paths to improve performance and reduce network overhead. Used in Flink for monitoring configuration changes and coordinator state.
122
123
```java { .api }
124
public class NodeCache implements Closeable {
125
public NodeCache(CuratorFramework client, String path);
126
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
127
public void start() throws Exception;
128
public void start(boolean buildInitial) throws Exception;
129
public ChildData getCurrentData();
130
}
131
132
public class PathChildrenCache implements Closeable {
133
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
134
public void start() throws Exception;
135
public void start(StartMode mode) throws Exception;
136
public List<ChildData> getCurrentData();
137
}
138
```
139
140
[Path Caching](./caching.md)
141
142
### Shared Counters and Values
143
144
Shared data structures for maintaining counters and values across distributed processes. Used in Flink for checkpoint ID coordination and configuration sharing.
145
146
```java { .api }
147
public class SharedCount implements Closeable {
148
public SharedCount(CuratorFramework client, String path, int seedValue);
149
public void start() throws Exception;
150
public int getCount();
151
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
152
}
153
154
public class SharedValue implements Closeable {
155
public SharedValue(CuratorFramework client, String path, byte[] seedValue);
156
public void start() throws Exception;
157
public VersionedValue<byte[]> getValue() throws Exception;
158
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception;
159
}
160
```
161
162
[Shared Counters and Values](./shared-values.md)
163
164
### Distributed Locking
165
166
Provides locking mechanisms for coordinating access to shared resources across distributed processes. Available but less commonly used in typical Flink deployments.
167
168
```java { .api }
169
public interface InterProcessLock {
170
void acquire() throws Exception;
171
boolean acquire(long time, TimeUnit unit) throws Exception;
172
void release() throws Exception;
173
}
174
175
public class InterProcessMutex implements InterProcessLock {
176
public InterProcessMutex(CuratorFramework client, String lockPath);
177
}
178
```
179
180
[Distributed Locking](./locking.md)
181
182
## Types
183
184
```java { .api }
185
// Core framework types
186
public interface CuratorFramework extends Closeable {
187
void start();
188
void close();
189
CuratorFramework.State getState();
190
void blockUntilConnected() throws InterruptedException;
191
}
192
193
// Cache data structures
194
public class ChildData {
195
public String getPath();
196
public Stat getStat();
197
public byte[] getData();
198
}
199
200
// Versioned value wrapper
201
public class VersionedValue<T> {
202
public T getValue();
203
public int getVersion();
204
}
205
206
// Connection and retry policies
207
public class ExponentialBackoffRetry implements RetryPolicy {
208
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
209
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs);
210
}
211
212
// Listener interfaces
213
public interface LeaderLatchListener {
214
void isLeader();
215
void notLeader();
216
}
217
218
public interface NodeCacheListener {
219
void nodeChanged() throws Exception;
220
}
221
222
public interface PathChildrenCacheListener {
223
void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
224
}
225
```
226
227
228
229
230