Core interfaces and abstractions for building Apache Pulsar IO connectors
—
Helper classes for common data structures and operations in Pulsar IO connectors.
Simple generic key-value pair container for representing paired data.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class KeyValue<K, V> {
/**
* Constructor to create a key-value pair.
*
* @param key the key
* @param value the value
*/
public KeyValue(K key, V value);
/**
* Get the key.
*
* @return the key
*/
K getKey();
/**
* Get the value.
*
* @return the value
*/
V getValue();
/**
* Set the key.
*
* @param key the key to set
*/
void setKey(K key);
/**
* Set the value.
*
* @param value the value to set
*/
void setValue(V value);
}// Create key-value pairs
KeyValue<String, Integer> userScore = new KeyValue<>("user123", 850);
KeyValue<Long, String> timestampMessage = new KeyValue<>(System.currentTimeMillis(), "Hello World");
// Access data
String userId = userScore.getKey();
Integer score = userScore.getValue();
// Update data
userScore.setValue(900);
timestampMessage.setKey(System.currentTimeMillis());public class DatabaseKeyValueSource implements Source<KeyValue<String, Map<String, Object>>> {
private Connection connection;
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
String jdbcUrl = (String) config.get("jdbc.url");
this.connection = DriverManager.getConnection(jdbcUrl);
}
@Override
public Record<KeyValue<String, Map<String, Object>>> read() throws Exception {
PreparedStatement stmt = connection.prepareStatement(
"SELECT id, name, email, created_at FROM users ORDER BY created_at LIMIT 1"
);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
String id = rs.getString("id");
Map<String, Object> userData = new HashMap<>();
userData.put("name", rs.getString("name"));
userData.put("email", rs.getString("email"));
userData.put("created_at", rs.getTimestamp("created_at"));
KeyValue<String, Map<String, Object>> keyValue = new KeyValue<>(id, userData);
return new SimpleRecord<>(null, keyValue);
}
// Wait before checking again
Thread.sleep(5000);
return read();
}
@Override
public void close() throws Exception {
if (connection != null) {
connection.close();
}
}
}public class FileKeyValueSource implements Source<KeyValue<String, String>> {
private BufferedReader reader;
private int lineNumber = 0;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
String filePath = (String) config.get("file.path");
this.reader = new BufferedReader(new FileReader(filePath));
}
@Override
public Record<KeyValue<String, String>> read() throws Exception {
String line = reader.readLine();
if (line != null) {
lineNumber++;
String key = "line-" + lineNumber;
KeyValue<String, String> keyValue = new KeyValue<>(key, line);
return new SimpleRecord<>(null, keyValue);
}
// End of file reached
Thread.sleep(1000);
return read();
}
@Override
public void close() throws Exception {
if (reader != null) {
reader.close();
}
}
}public class KeyValueProcessingSink implements Sink<KeyValue<String, Map<String, Object>>> {
private Map<String, Object> cache = new ConcurrentHashMap<>();
private SinkContext context;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.context = sinkContext;
}
@Override
public void write(Record<KeyValue<String, Map<String, Object>>> record) throws Exception {
KeyValue<String, Map<String, Object>> keyValue = record.getValue();
String key = keyValue.getKey();
Map<String, Object> value = keyValue.getValue();
// Process based on key
if (key.startsWith("user-")) {
processUserData(key, value);
} else if (key.startsWith("order-")) {
processOrderData(key, value);
} else {
processGenericData(key, value);
}
// Cache for later use
cache.put(key, value);
}
private void processUserData(String key, Map<String, Object> userData) {
System.out.println("Processing user: " + key);
// Validate user data
if (!userData.containsKey("email")) {
throw new IllegalArgumentException("User data missing email field");
}
// Additional user-specific processing...
}
private void processOrderData(String key, Map<String, Object> orderData) {
System.out.println("Processing order: " + key);
// Validate order data
if (!orderData.containsKey("amount")) {
throw new IllegalArgumentException("Order data missing amount field");
}
// Additional order-specific processing...
}
private void processGenericData(String key, Map<String, Object> data) {
System.out.println("Processing generic data: " + key);
// Generic processing logic...
}
@Override
public void close() throws Exception {
// Clean up cache or flush pending data
cache.clear();
}
}public class TransformationSource implements Source<KeyValue<String, String>> {
private Source<String> wrappedSource;
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
// Initialize wrapped source
String sourceClass = (String) config.get("wrapped.source.class");
this.wrappedSource = (Source<String>) Class.forName(sourceClass).newInstance();
this.wrappedSource.open(config, sourceContext);
}
@Override
public Record<KeyValue<String, String>> read() throws Exception {
Record<String> originalRecord = wrappedSource.read();
String originalValue = originalRecord.getValue();
// Transform single value into key-value pair
String transformedKey = generateKey(originalValue);
String transformedValue = transformValue(originalValue);
KeyValue<String, String> keyValue = new KeyValue<>(transformedKey, transformedValue);
return new SimpleRecord<>(originalRecord.getKey(), keyValue);
}
private String generateKey(String value) {
// Generate key based on value content
return "transformed-" + value.hashCode();
}
private String transformValue(String value) {
// Apply transformations to value
return value.toUpperCase().trim();
}
@Override
public void close() throws Exception {
if (wrappedSource != null) {
wrappedSource.close();
}
}
}public class BatchKeyValueSource extends BatchPushSource<KeyValue<String, List<String>>> {
private Map<String, List<String>> batchData = new HashMap<>();
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext context) throws Exception {
this.context = context;
}
@Override
public void discover(Consumer<byte[]> taskEater) throws Exception {
// Discover available data sources
String[] dataSources = {"source1", "source2", "source3"};
for (String source : dataSources) {
taskEater.accept(source.getBytes());
}
}
@Override
public void prepare(byte[] task) throws Exception {
String sourceName = new String(task);
// Collect batch data for this source
List<String> batchItems = collectBatchData(sourceName);
// Create key-value pair and push to queue
KeyValue<String, List<String>> keyValue = new KeyValue<>(sourceName, batchItems);
this.consume(new SimpleRecord<>(null, keyValue));
}
private List<String> collectBatchData(String sourceName) {
// Simulate collecting batch data
List<String> items = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
items.add(sourceName + "-item-" + i);
}
return items;
}
@Override
public void close() throws Exception {
batchData.clear();
}
}// Required imports
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core