0
# Utility Classes
1
2
Helper classes for common data structures and operations in Pulsar IO connectors.
3
4
## KeyValue<K, V>
5
6
Simple generic key-value pair container for representing paired data.
7
8
```java { .api }
9
package org.apache.pulsar.io.core;
10
11
@InterfaceAudience.Public
12
@InterfaceStability.Stable
13
public class KeyValue<K, V> {
14
/**
15
* Constructor to create a key-value pair.
16
*
17
* @param key the key
18
* @param value the value
19
*/
20
public KeyValue(K key, V value);
21
22
/**
23
* Get the key.
24
*
25
* @return the key
26
*/
27
K getKey();
28
29
/**
30
* Get the value.
31
*
32
* @return the value
33
*/
34
V getValue();
35
36
/**
37
* Set the key.
38
*
39
* @param key the key to set
40
*/
41
void setKey(K key);
42
43
/**
44
* Set the value.
45
*
46
* @param value the value to set
47
*/
48
void setValue(V value);
49
}
50
```
51
52
### Usage Examples
53
54
#### Basic Key-Value Usage
55
56
```java
57
// Create key-value pairs
58
KeyValue<String, Integer> userScore = new KeyValue<>("user123", 850);
59
KeyValue<Long, String> timestampMessage = new KeyValue<>(System.currentTimeMillis(), "Hello World");
60
61
// Access data
62
String userId = userScore.getKey();
63
Integer score = userScore.getValue();
64
65
// Update data
66
userScore.setValue(900);
67
timestampMessage.setKey(System.currentTimeMillis());
68
```
69
70
#### Database Source with Key-Value Records
71
72
```java
73
public class DatabaseKeyValueSource implements Source<KeyValue<String, Map<String, Object>>> {
74
private Connection connection;
75
private SourceContext context;
76
77
@Override
78
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
79
this.context = sourceContext;
80
String jdbcUrl = (String) config.get("jdbc.url");
81
this.connection = DriverManager.getConnection(jdbcUrl);
82
}
83
84
@Override
85
public Record<KeyValue<String, Map<String, Object>>> read() throws Exception {
86
PreparedStatement stmt = connection.prepareStatement(
87
"SELECT id, name, email, created_at FROM users ORDER BY created_at LIMIT 1"
88
);
89
90
ResultSet rs = stmt.executeQuery();
91
if (rs.next()) {
92
String id = rs.getString("id");
93
Map<String, Object> userData = new HashMap<>();
94
userData.put("name", rs.getString("name"));
95
userData.put("email", rs.getString("email"));
96
userData.put("created_at", rs.getTimestamp("created_at"));
97
98
KeyValue<String, Map<String, Object>> keyValue = new KeyValue<>(id, userData);
99
return new SimpleRecord<>(null, keyValue);
100
}
101
102
// Wait before checking again
103
Thread.sleep(5000);
104
return read();
105
}
106
107
@Override
108
public void close() throws Exception {
109
if (connection != null) {
110
connection.close();
111
}
112
}
113
}
114
```
115
116
#### File Processing with Key-Value
117
118
```java
119
public class FileKeyValueSource implements Source<KeyValue<String, String>> {
120
private BufferedReader reader;
121
private int lineNumber = 0;
122
123
@Override
124
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
125
String filePath = (String) config.get("file.path");
126
this.reader = new BufferedReader(new FileReader(filePath));
127
}
128
129
@Override
130
public Record<KeyValue<String, String>> read() throws Exception {
131
String line = reader.readLine();
132
if (line != null) {
133
lineNumber++;
134
String key = "line-" + lineNumber;
135
KeyValue<String, String> keyValue = new KeyValue<>(key, line);
136
return new SimpleRecord<>(null, keyValue);
137
}
138
139
// End of file reached
140
Thread.sleep(1000);
141
return read();
142
}
143
144
@Override
145
public void close() throws Exception {
146
if (reader != null) {
147
reader.close();
148
}
149
}
150
}
151
```
152
153
#### Key-Value Sink Processing
154
155
```java
156
public class KeyValueProcessingSink implements Sink<KeyValue<String, Map<String, Object>>> {
157
private Map<String, Object> cache = new ConcurrentHashMap<>();
158
private SinkContext context;
159
160
@Override
161
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
162
this.context = sinkContext;
163
}
164
165
@Override
166
public void write(Record<KeyValue<String, Map<String, Object>>> record) throws Exception {
167
KeyValue<String, Map<String, Object>> keyValue = record.getValue();
168
String key = keyValue.getKey();
169
Map<String, Object> value = keyValue.getValue();
170
171
// Process based on key
172
if (key.startsWith("user-")) {
173
processUserData(key, value);
174
} else if (key.startsWith("order-")) {
175
processOrderData(key, value);
176
} else {
177
processGenericData(key, value);
178
}
179
180
// Cache for later use
181
cache.put(key, value);
182
}
183
184
private void processUserData(String key, Map<String, Object> userData) {
185
System.out.println("Processing user: " + key);
186
// Validate user data
187
if (!userData.containsKey("email")) {
188
throw new IllegalArgumentException("User data missing email field");
189
}
190
// Additional user-specific processing...
191
}
192
193
private void processOrderData(String key, Map<String, Object> orderData) {
194
System.out.println("Processing order: " + key);
195
// Validate order data
196
if (!orderData.containsKey("amount")) {
197
throw new IllegalArgumentException("Order data missing amount field");
198
}
199
// Additional order-specific processing...
200
}
201
202
private void processGenericData(String key, Map<String, Object> data) {
203
System.out.println("Processing generic data: " + key);
204
// Generic processing logic...
205
}
206
207
@Override
208
public void close() throws Exception {
209
// Clean up cache or flush pending data
210
cache.clear();
211
}
212
}
213
```
214
215
#### Key-Value Transformation Source
216
217
```java
218
public class TransformationSource implements Source<KeyValue<String, String>> {
219
private Source<String> wrappedSource;
220
private SourceContext context;
221
222
@Override
223
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
224
this.context = sourceContext;
225
226
// Initialize wrapped source
227
String sourceClass = (String) config.get("wrapped.source.class");
228
this.wrappedSource = (Source<String>) Class.forName(sourceClass).newInstance();
229
this.wrappedSource.open(config, sourceContext);
230
}
231
232
@Override
233
public Record<KeyValue<String, String>> read() throws Exception {
234
Record<String> originalRecord = wrappedSource.read();
235
String originalValue = originalRecord.getValue();
236
237
// Transform single value into key-value pair
238
String transformedKey = generateKey(originalValue);
239
String transformedValue = transformValue(originalValue);
240
241
KeyValue<String, String> keyValue = new KeyValue<>(transformedKey, transformedValue);
242
return new SimpleRecord<>(originalRecord.getKey(), keyValue);
243
}
244
245
private String generateKey(String value) {
246
// Generate key based on value content
247
return "transformed-" + value.hashCode();
248
}
249
250
private String transformValue(String value) {
251
// Apply transformations to value
252
return value.toUpperCase().trim();
253
}
254
255
@Override
256
public void close() throws Exception {
257
if (wrappedSource != null) {
258
wrappedSource.close();
259
}
260
}
261
}
262
```
263
264
#### Batch Key-Value Processing
265
266
```java
267
public class BatchKeyValueSource extends BatchPushSource<KeyValue<String, List<String>>> {
268
private Map<String, List<String>> batchData = new HashMap<>();
269
private SourceContext context;
270
271
@Override
272
public void open(Map<String, Object> config, SourceContext context) throws Exception {
273
this.context = context;
274
}
275
276
@Override
277
public void discover(Consumer<byte[]> taskEater) throws Exception {
278
// Discover available data sources
279
String[] dataSources = {"source1", "source2", "source3"};
280
for (String source : dataSources) {
281
taskEater.accept(source.getBytes());
282
}
283
}
284
285
@Override
286
public void prepare(byte[] task) throws Exception {
287
String sourceName = new String(task);
288
289
// Collect batch data for this source
290
List<String> batchItems = collectBatchData(sourceName);
291
292
// Create key-value pair and push to queue
293
KeyValue<String, List<String>> keyValue = new KeyValue<>(sourceName, batchItems);
294
this.consume(new SimpleRecord<>(null, keyValue));
295
}
296
297
private List<String> collectBatchData(String sourceName) {
298
// Simulate collecting batch data
299
List<String> items = new ArrayList<>();
300
for (int i = 1; i <= 10; i++) {
301
items.add(sourceName + "-item-" + i);
302
}
303
return items;
304
}
305
306
@Override
307
public void close() throws Exception {
308
batchData.clear();
309
}
310
}
311
```
312
313
## Types
314
315
```java { .api }
316
// Required imports
317
import org.apache.pulsar.common.classification.InterfaceAudience;
318
import org.apache.pulsar.common.classification.InterfaceStability;
319
```