or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md

utility-classes.mddocs/

0

# Utility Classes

1

2

Helper classes for common data structures and operations in Pulsar IO connectors.

3

4

## KeyValue<K, V>

5

6

Simple generic key-value pair container for representing paired data.

7

8

```java { .api }

9

package org.apache.pulsar.io.core;

10

11

@InterfaceAudience.Public

12

@InterfaceStability.Stable

13

public class KeyValue<K, V> {

14

/**

15

* Constructor to create a key-value pair.

16

*

17

* @param key the key

18

* @param value the value

19

*/

20

public KeyValue(K key, V value);

21

22

/**

23

* Get the key.

24

*

25

* @return the key

26

*/

27

K getKey();

28

29

/**

30

* Get the value.

31

*

32

* @return the value

33

*/

34

V getValue();

35

36

/**

37

* Set the key.

38

*

39

* @param key the key to set

40

*/

41

void setKey(K key);

42

43

/**

44

* Set the value.

45

*

46

* @param value the value to set

47

*/

48

void setValue(V value);

49

}

50

```

51

52

### Usage Examples

53

54

#### Basic Key-Value Usage

55

56

```java

57

// Create key-value pairs

58

KeyValue<String, Integer> userScore = new KeyValue<>("user123", 850);

59

KeyValue<Long, String> timestampMessage = new KeyValue<>(System.currentTimeMillis(), "Hello World");

60

61

// Access data

62

String userId = userScore.getKey();

63

Integer score = userScore.getValue();

64

65

// Update data

66

userScore.setValue(900);

67

timestampMessage.setKey(System.currentTimeMillis());

68

```

69

70

#### Database Source with Key-Value Records

71

72

```java

73

public class DatabaseKeyValueSource implements Source<KeyValue<String, Map<String, Object>>> {

74

private Connection connection;

75

private SourceContext context;

76

77

@Override

78

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

79

this.context = sourceContext;

80

String jdbcUrl = (String) config.get("jdbc.url");

81

this.connection = DriverManager.getConnection(jdbcUrl);

82

}

83

84

@Override

85

public Record<KeyValue<String, Map<String, Object>>> read() throws Exception {

86

PreparedStatement stmt = connection.prepareStatement(

87

"SELECT id, name, email, created_at FROM users ORDER BY created_at LIMIT 1"

88

);

89

90

ResultSet rs = stmt.executeQuery();

91

if (rs.next()) {

92

String id = rs.getString("id");

93

Map<String, Object> userData = new HashMap<>();

94

userData.put("name", rs.getString("name"));

95

userData.put("email", rs.getString("email"));

96

userData.put("created_at", rs.getTimestamp("created_at"));

97

98

KeyValue<String, Map<String, Object>> keyValue = new KeyValue<>(id, userData);

99

return new SimpleRecord<>(null, keyValue);

100

}

101

102

// Wait before checking again

103

Thread.sleep(5000);

104

return read();

105

}

106

107

@Override

108

public void close() throws Exception {

109

if (connection != null) {

110

connection.close();

111

}

112

}

113

}

114

```

115

116

#### File Processing with Key-Value

117

118

```java

119

public class FileKeyValueSource implements Source<KeyValue<String, String>> {

120

private BufferedReader reader;

121

private int lineNumber = 0;

122

123

@Override

124

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

125

String filePath = (String) config.get("file.path");

126

this.reader = new BufferedReader(new FileReader(filePath));

127

}

128

129

@Override

130

public Record<KeyValue<String, String>> read() throws Exception {

131

String line = reader.readLine();

132

if (line != null) {

133

lineNumber++;

134

String key = "line-" + lineNumber;

135

KeyValue<String, String> keyValue = new KeyValue<>(key, line);

136

return new SimpleRecord<>(null, keyValue);

137

}

138

139

// End of file reached

140

Thread.sleep(1000);

141

return read();

142

}

143

144

@Override

145

public void close() throws Exception {

146

if (reader != null) {

147

reader.close();

148

}

149

}

150

}

151

```

152

153

#### Key-Value Sink Processing

154

155

```java

156

public class KeyValueProcessingSink implements Sink<KeyValue<String, Map<String, Object>>> {

157

private Map<String, Object> cache = new ConcurrentHashMap<>();

158

private SinkContext context;

159

160

@Override

161

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

162

this.context = sinkContext;

163

}

164

165

@Override

166

public void write(Record<KeyValue<String, Map<String, Object>>> record) throws Exception {

167

KeyValue<String, Map<String, Object>> keyValue = record.getValue();

168

String key = keyValue.getKey();

169

Map<String, Object> value = keyValue.getValue();

170

171

// Process based on key

172

if (key.startsWith("user-")) {

173

processUserData(key, value);

174

} else if (key.startsWith("order-")) {

175

processOrderData(key, value);

176

} else {

177

processGenericData(key, value);

178

}

179

180

// Cache for later use

181

cache.put(key, value);

182

}

183

184

private void processUserData(String key, Map<String, Object> userData) {

185

System.out.println("Processing user: " + key);

186

// Validate user data

187

if (!userData.containsKey("email")) {

188

throw new IllegalArgumentException("User data missing email field");

189

}

190

// Additional user-specific processing...

191

}

192

193

private void processOrderData(String key, Map<String, Object> orderData) {

194

System.out.println("Processing order: " + key);

195

// Validate order data

196

if (!orderData.containsKey("amount")) {

197

throw new IllegalArgumentException("Order data missing amount field");

198

}

199

// Additional order-specific processing...

200

}

201

202

private void processGenericData(String key, Map<String, Object> data) {

203

System.out.println("Processing generic data: " + key);

204

// Generic processing logic...

205

}

206

207

@Override

208

public void close() throws Exception {

209

// Clean up cache or flush pending data

210

cache.clear();

211

}

212

}

213

```

214

215

#### Key-Value Transformation Source

216

217

```java

218

public class TransformationSource implements Source<KeyValue<String, String>> {

219

private Source<String> wrappedSource;

220

private SourceContext context;

221

222

@Override

223

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

224

this.context = sourceContext;

225

226

// Initialize wrapped source

227

String sourceClass = (String) config.get("wrapped.source.class");

228

this.wrappedSource = (Source<String>) Class.forName(sourceClass).newInstance();

229

this.wrappedSource.open(config, sourceContext);

230

}

231

232

@Override

233

public Record<KeyValue<String, String>> read() throws Exception {

234

Record<String> originalRecord = wrappedSource.read();

235

String originalValue = originalRecord.getValue();

236

237

// Transform single value into key-value pair

238

String transformedKey = generateKey(originalValue);

239

String transformedValue = transformValue(originalValue);

240

241

KeyValue<String, String> keyValue = new KeyValue<>(transformedKey, transformedValue);

242

return new SimpleRecord<>(originalRecord.getKey(), keyValue);

243

}

244

245

private String generateKey(String value) {

246

// Generate key based on value content

247

return "transformed-" + value.hashCode();

248

}

249

250

private String transformValue(String value) {

251

// Apply transformations to value

252

return value.toUpperCase().trim();

253

}

254

255

@Override

256

public void close() throws Exception {

257

if (wrappedSource != null) {

258

wrappedSource.close();

259

}

260

}

261

}

262

```

263

264

#### Batch Key-Value Processing

265

266

```java

267

public class BatchKeyValueSource extends BatchPushSource<KeyValue<String, List<String>>> {

268

private Map<String, List<String>> batchData = new HashMap<>();

269

private SourceContext context;

270

271

@Override

272

public void open(Map<String, Object> config, SourceContext context) throws Exception {

273

this.context = context;

274

}

275

276

@Override

277

public void discover(Consumer<byte[]> taskEater) throws Exception {

278

// Discover available data sources

279

String[] dataSources = {"source1", "source2", "source3"};

280

for (String source : dataSources) {

281

taskEater.accept(source.getBytes());

282

}

283

}

284

285

@Override

286

public void prepare(byte[] task) throws Exception {

287

String sourceName = new String(task);

288

289

// Collect batch data for this source

290

List<String> batchItems = collectBatchData(sourceName);

291

292

// Create key-value pair and push to queue

293

KeyValue<String, List<String>> keyValue = new KeyValue<>(sourceName, batchItems);

294

this.consume(new SimpleRecord<>(null, keyValue));

295

}

296

297

private List<String> collectBatchData(String sourceName) {

298

// Simulate collecting batch data

299

List<String> items = new ArrayList<>();

300

for (int i = 1; i <= 10; i++) {

301

items.add(sourceName + "-item-" + i);

302

}

303

return items;

304

}

305

306

@Override

307

public void close() throws Exception {

308

batchData.clear();

309

}

310

}

311

```

312

313

## Types

314

315

```java { .api }

316

// Required imports

317

import org.apache.pulsar.common.classification.InterfaceAudience;

318

import org.apache.pulsar.common.classification.InterfaceStability;

319

```