CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Pending
Overview
Eval results
Files

utility-classes.mddocs/

Utility Classes

Helper classes for common data structures and operations in Pulsar IO connectors.

KeyValue<K, V>

Simple generic key-value pair container for representing paired data.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class KeyValue<K, V> {
    /**
     * Constructor to create a key-value pair.
     *
     * @param key the key
     * @param value the value
     */
    public KeyValue(K key, V value);

    /**
     * Get the key.
     *
     * @return the key
     */
    K getKey();

    /**
     * Get the value.
     *
     * @return the value
     */
    V getValue();

    /**
     * Set the key.
     *
     * @param key the key to set
     */
    void setKey(K key);

    /**
     * Set the value.
     *
     * @param value the value to set
     */
    void setValue(V value);
}

Usage Examples

Basic Key-Value Usage

// Create key-value pairs
KeyValue<String, Integer> userScore = new KeyValue<>("user123", 850);
KeyValue<Long, String> timestampMessage = new KeyValue<>(System.currentTimeMillis(), "Hello World");

// Access data
String userId = userScore.getKey();
Integer score = userScore.getValue();

// Update data
userScore.setValue(900);
timestampMessage.setKey(System.currentTimeMillis());

Database Source with Key-Value Records

public class DatabaseKeyValueSource implements Source<KeyValue<String, Map<String, Object>>> {
    private Connection connection;
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        String jdbcUrl = (String) config.get("jdbc.url");
        this.connection = DriverManager.getConnection(jdbcUrl);
    }

    @Override
    public Record<KeyValue<String, Map<String, Object>>> read() throws Exception {
        PreparedStatement stmt = connection.prepareStatement(
            "SELECT id, name, email, created_at FROM users ORDER BY created_at LIMIT 1"
        );
        
        ResultSet rs = stmt.executeQuery();
        if (rs.next()) {
            String id = rs.getString("id");
            Map<String, Object> userData = new HashMap<>();
            userData.put("name", rs.getString("name"));
            userData.put("email", rs.getString("email"));
            userData.put("created_at", rs.getTimestamp("created_at"));
            
            KeyValue<String, Map<String, Object>> keyValue = new KeyValue<>(id, userData);
            return new SimpleRecord<>(null, keyValue);
        }
        
        // Wait before checking again
        Thread.sleep(5000);
        return read();
    }

    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
    }
}

File Processing with Key-Value

public class FileKeyValueSource implements Source<KeyValue<String, String>> {
    private BufferedReader reader;
    private int lineNumber = 0;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        String filePath = (String) config.get("file.path");
        this.reader = new BufferedReader(new FileReader(filePath));
    }

    @Override
    public Record<KeyValue<String, String>> read() throws Exception {
        String line = reader.readLine();
        if (line != null) {
            lineNumber++;
            String key = "line-" + lineNumber;
            KeyValue<String, String> keyValue = new KeyValue<>(key, line);
            return new SimpleRecord<>(null, keyValue);
        }
        
        // End of file reached
        Thread.sleep(1000);
        return read();
    }

    @Override
    public void close() throws Exception {
        if (reader != null) {
            reader.close();
        }
    }
}

Key-Value Sink Processing

public class KeyValueProcessingSink implements Sink<KeyValue<String, Map<String, Object>>> {
    private Map<String, Object> cache = new ConcurrentHashMap<>();
    private SinkContext context;

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.context = sinkContext;
    }

    @Override
    public void write(Record<KeyValue<String, Map<String, Object>>> record) throws Exception {
        KeyValue<String, Map<String, Object>> keyValue = record.getValue();
        String key = keyValue.getKey();
        Map<String, Object> value = keyValue.getValue();
        
        // Process based on key
        if (key.startsWith("user-")) {
            processUserData(key, value);
        } else if (key.startsWith("order-")) {
            processOrderData(key, value);
        } else {
            processGenericData(key, value);
        }
        
        // Cache for later use
        cache.put(key, value);
    }

    private void processUserData(String key, Map<String, Object> userData) {
        System.out.println("Processing user: " + key);
        // Validate user data
        if (!userData.containsKey("email")) {
            throw new IllegalArgumentException("User data missing email field");
        }
        // Additional user-specific processing...
    }

    private void processOrderData(String key, Map<String, Object> orderData) {
        System.out.println("Processing order: " + key);
        // Validate order data
        if (!orderData.containsKey("amount")) {
            throw new IllegalArgumentException("Order data missing amount field");
        }
        // Additional order-specific processing...
    }

    private void processGenericData(String key, Map<String, Object> data) {
        System.out.println("Processing generic data: " + key);
        // Generic processing logic...
    }

    @Override
    public void close() throws Exception {
        // Clean up cache or flush pending data
        cache.clear();
    }
}

Key-Value Transformation Source

public class TransformationSource implements Source<KeyValue<String, String>> {
    private Source<String> wrappedSource;
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        
        // Initialize wrapped source
        String sourceClass = (String) config.get("wrapped.source.class");
        this.wrappedSource = (Source<String>) Class.forName(sourceClass).newInstance();
        this.wrappedSource.open(config, sourceContext);
    }

    @Override
    public Record<KeyValue<String, String>> read() throws Exception {
        Record<String> originalRecord = wrappedSource.read();
        String originalValue = originalRecord.getValue();
        
        // Transform single value into key-value pair
        String transformedKey = generateKey(originalValue);
        String transformedValue = transformValue(originalValue);
        
        KeyValue<String, String> keyValue = new KeyValue<>(transformedKey, transformedValue);
        return new SimpleRecord<>(originalRecord.getKey(), keyValue);
    }

    private String generateKey(String value) {
        // Generate key based on value content
        return "transformed-" + value.hashCode();
    }

    private String transformValue(String value) {
        // Apply transformations to value
        return value.toUpperCase().trim();
    }

    @Override
    public void close() throws Exception {
        if (wrappedSource != null) {
            wrappedSource.close();
        }
    }
}

Batch Key-Value Processing

public class BatchKeyValueSource extends BatchPushSource<KeyValue<String, List<String>>> {
    private Map<String, List<String>> batchData = new HashMap<>();
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext context) throws Exception {
        this.context = context;
    }

    @Override
    public void discover(Consumer<byte[]> taskEater) throws Exception {
        // Discover available data sources
        String[] dataSources = {"source1", "source2", "source3"};
        for (String source : dataSources) {
            taskEater.accept(source.getBytes());
        }
    }

    @Override
    public void prepare(byte[] task) throws Exception {
        String sourceName = new String(task);
        
        // Collect batch data for this source
        List<String> batchItems = collectBatchData(sourceName);
        
        // Create key-value pair and push to queue
        KeyValue<String, List<String>> keyValue = new KeyValue<>(sourceName, batchItems);
        this.consume(new SimpleRecord<>(null, keyValue));
    }

    private List<String> collectBatchData(String sourceName) {
        // Simulate collecting batch data
        List<String> items = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            items.add(sourceName + "-item-" + i);
        }
        return items;
    }

    @Override
    public void close() throws Exception {
        batchData.clear();
    }
}

Types

// Required imports
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core

docs

connector-annotations.md

context-interfaces.md

index.md

push-sources.md

sink-interfaces.md

source-interfaces.md

utility-classes.md

tile.json