or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-pulsar--pulsar-io-http

Apache Pulsar HTTP Sink Connector that streams topic data to HTTP endpoints as JSON webhooks

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.pulsar/pulsar-io-http@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-http@4.0.0

index.mddocs/

Apache Pulsar HTTP Sink Connector

Apache Pulsar HTTP Sink Connector enables streaming data from Pulsar topics to external HTTP endpoints through webhook-style POST requests. The connector transforms Pulsar records into JSON payloads and enriches HTTP requests with metadata headers, providing a reliable bridge between Pulsar's pub-sub messaging and HTTP-based services.

Package Information

  • Package Name: pulsar-io-http
  • Package Type: maven
  • Language: Java
  • Maven Coordinates: org.apache.pulsar:pulsar-io-http:4.0.6
  • Installation: Include as dependency in Maven/Gradle or deploy as Pulsar connector NAR
  • License: Apache License 2.0

Core Imports

import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;
import org.apache.pulsar.io.http.JsonConverter;

Basic Usage

Maven Dependency

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-io-http</artifactId>
    <version>4.0.6</version>
</dependency>

Connector Configuration

# http-sink-config.yaml
url: "https://example.com/webhook"
headers:
  Authorization: "Bearer your-token"
  Content-Type: "application/json"
  X-Custom-Header: "custom-value"

Pulsar Admin Configuration

# Create sink connector
bin/pulsar-admin sinks create \
    --archive pulsar-io-http-4.0.6.nar \
    --inputs my-topic \
    --name http-sink \
    --sink-config-file http-sink-config.yaml

Programmatic Usage

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;

// Create and configure the sink
HttpSink httpSink = new HttpSink();
Map<String, Object> config = new HashMap<>();
config.put("url", "https://example.com/webhook");
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer token");
config.put("headers", headers);

// Initialize the sink
httpSink.open(config, sinkContext);

// Process records (typically handled by Pulsar runtime)
httpSink.write(pulsarRecord);

// Clean up
httpSink.close();

Architecture

The HTTP Sink Connector follows the Pulsar IO framework architecture:

  • HttpSink: Main connector class implementing the Sink<GenericObject> interface
  • HttpSinkConfig: Configuration model handling URL and custom headers
  • JsonConverter: Utility for converting AVRO records to JSON format
  • HTTP Client: Java 11 HttpClient for reliable HTTP request handling
  • Schema Support: Built-in support for primitives, AVRO, JSON, and KEY_VALUE schemas

Capabilities

HTTP Sink Implementation

Main connector class that processes Pulsar records and sends them as HTTP POST requests.

public class HttpSink implements Sink<GenericObject> {
    /**
     * Initialize the HTTP sink with configuration
     * @param config - Configuration map with "url" and "headers" 
     * @param sinkContext - Pulsar sink context for runtime information
     * @throws Exception - Configuration errors or URI parsing failures
     */
    void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
    
    /**
     * Process a Pulsar record and send via HTTP POST
     * @param record - Pulsar record to be processed and sent via HTTP
     * @throws Exception - JSON conversion errors or HTTP failures (non-2xx status)
     */
    void write(Record<GenericObject> record) throws Exception;
    
    /**
     * Clean up resources (no-op implementation)
     */
    void close();
}

Internal Components:

  • HttpSinkConfig httpSinkConfig: Loaded configuration object
  • HttpClient httpClient: Java 11 HTTP client for requests
  • ObjectMapper mapper: Jackson mapper with JavaTimeModule for JSON serialization
  • URI uri: Parsed target URL for HTTP requests
  • toJsonSerializable(Schema, Object): Private method handling schema-specific JSON conversion

Behavior:

  • Converts record value to JSON payload using schema-aware conversion
  • Adds metadata headers (topic, key, timestamps, message ID, properties)
  • Sends HTTP POST request to configured URL with application/json content type
  • Throws IOException for HTTP errors (status codes outside 200-299 range)

Configuration Management

Configuration class for HTTP sink connector settings with validation and loading capabilities.

@Data
@Accessors(chain = true)
public class HttpSinkConfig implements Serializable {
    @FieldDoc(defaultValue = "http://localhost", help = "The URL of the HTTP server")
    private String url = "http://localhost";
    
    @FieldDoc(defaultValue = "", help = "The list of default headers added to each request")
    private Map<String, String> headers = new HashMap<>();
    
    /**
     * Load configuration from YAML file
     * @param yamlFile - Path to YAML configuration file
     * @return HttpSinkConfig instance
     * @throws IOException - File reading or parsing errors
     */
    static HttpSinkConfig load(String yamlFile) throws IOException;
    
    /**
     * Load configuration from Map object  
     * @param map - Configuration map with "url" and "headers" keys
     * @return HttpSinkConfig instance
     * @throws IOException - Map conversion or validation errors
     */
    static HttpSinkConfig load(Map<String, Object> map) throws IOException;
    
    // Lombok @Data generates getter and setter methods
    String getUrl();
    HttpSinkConfig setUrl(String url);
    Map<String, String> getHeaders();  
    HttpSinkConfig setHeaders(Map<String, String> headers);
}

Lombok Annotations:

  • @Data: Generates getters, setters, toString, equals, and hashCode methods
  • @Accessors(chain = true): Enables fluent method chaining (setters return this)
  • @FieldDoc: Pulsar IO annotation for configuration field documentation

Configuration Fields:

  • url: Target HTTP endpoint URL (default: "http://localhost")
  • headers: Custom headers to include in HTTP requests (default: empty map)

Loading Methods:

  • load(String yamlFile): Load configuration from YAML file using Jackson YAML factory
  • load(Map<String, Object> map): Load configuration from Map object via JSON conversion

JSON Conversion Utilities

Utility class for converting AVRO GenericRecord objects to Jackson JsonNode format.

public class JsonConverter {
    /**
     * Convert an AVRO GenericRecord to a JsonNode
     * @param genericRecord - AVRO generic record to convert
     * @return JsonNode representation, or null if input is null
     */
    static JsonNode toJson(GenericRecord genericRecord);
    
    /**
     * Convert typed value with schema to JsonNode
     * @param schema - AVRO schema for the value
     * @param value - Value to convert (may be null)
     * @return JsonNode representation
     */
    static JsonNode toJson(Schema schema, Object value);
    
    /**
     * Merge two JSON objects at top level
     * @param n1 - First JsonNode
     * @param n2 - Second JsonNode  
     * @return Merged ObjectNode with fields from both inputs
     */
    static JsonNode topLevelMerge(JsonNode n1, JsonNode n2);
    
    /**
     * Convert JSON object to array with specified field values
     * @param jsonNode - Source JSON object
     * @param fields - List of field names to include in array
     * @return ArrayNode containing values for the specified fields
     */
    static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields);
}

Supported AVRO Types:

  • Primitives: NULL, INT, LONG, DOUBLE, FLOAT, BOOLEAN, BYTES, FIXED, ENUM, STRING
  • Complex Types: ARRAY, MAP, RECORD, UNION
  • Logical Types: decimal, date, time-millis, time-micros, timestamp-millis, timestamp-micros, uuid

HTTP Request Format

Request Structure

All HTTP requests are sent as POST requests with the following characteristics:

  • Method: POST
  • Content-Type: application/json
  • Body: JSON representation of the Pulsar record value

Metadata Headers

The connector automatically adds these headers to every HTTP request:

// Standard headers
"Content-Type" -> "application/json"

// Pulsar metadata headers
"PulsarTopic" -> record.getTopicName()
"PulsarKey" -> record.getKey()
"PulsarEventTime" -> record.getEventTime().toString()
"PulsarPublishTime" -> String.valueOf(message.getPublishTime())
"PulsarMessageId" -> Base64.getEncoder().encodeToString(messageId.toByteArray())

// Custom message properties (prefixed)
"PulsarProperties-<property-name>" -> property-value

// User-configured custom headers
<header-name> -> <header-value>

JSON Payload Examples

Primitive Value:

"hello world"

KEY_VALUE Schema:

{
  "key": "user-123",
  "value": {
    "name": "John Doe",
    "age": 30,
    "active": true
  }
}

AVRO Record:

{
  "id": 123,
  "name": "Product A",
  "price": 29.99,
  "category": {
    "id": 1,
    "name": "Electronics"
  },
  "tags": ["gadget", "mobile"]
}

Schema Support

Supported Schema Types

The connector supports all Pulsar schema types with automatic JSON conversion:

// Primitive schemas - direct JSON representation
Schema.STRING, Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64
Schema.BOOL, Schema.FLOAT, Schema.DOUBLE, Schema.BYTES

// Date/time schemas
Schema.DATE, Schema.TIME, Schema.TIMESTAMP, Schema.INSTANT
Schema.LOCAL_DATE, Schema.LOCAL_TIME, Schema.LOCAL_DATE_TIME

// Complex schemas
Schema.JSON(Class)           // Passed through as-is
Schema.AVRO(Class)          // Converted via JsonConverter
Schema.KeyValue(keySchema, valueSchema)  // Converted to {"key": ..., "value": ...}

Schema Conversion Examples

// Primitive types
Schema.STRING.encode("hello") -> "hello"
Schema.INT32.encode(42) -> 42
Schema.BOOL.encode(true) -> true

// Key-Value pairs
KeyValueSchema<String, User> kvSchema = Schema.KeyValue(Schema.STRING, Schema.AVRO(User.class));
// Produces: {"key": "user-id", "value": {"name": "John", "age": 30}}

// AVRO records automatically converted to JSON objects
// JSON schema values passed through unchanged

Error Handling

HTTP Response Handling

// Successful responses (200-299 status codes)
// - Request completes successfully
// - No exception thrown
// - Record processing continues

// Error responses (all other status codes)
// - Throws IOException with status code
// - Stops record processing
// - Requires manual intervention or retry logic

Exception Types

// HTTP request failures (write method)
IOException: HTTP request failed with non-2xx status code
// Example: "HTTP call to https://example.com/webhook failed with status code 404"

// Configuration errors (open method)  
Exception: Configuration loading failures (invalid YAML, missing required fields)
URISyntaxException: Invalid URL format in configuration

// JSON conversion errors (write method)
UnsupportedOperationException: Unsupported schema type (not AVRO, JSON, KEY_VALUE, or primitive)
IllegalArgumentException: Invalid logical type values (e.g., non-BigDecimal for decimal type)

// HTTP client errors (write method)
IOException: Network connectivity issues, request timeout, or other HTTP client failures

Specific Error Conditions:

  • Status Code Validation: Any HTTP response with status < 200 or >= 300 throws IOException
  • Schema Type Support: Only AVRO, JSON, KEY_VALUE, and primitive schemas supported
  • Logical Type Validation: Strict type checking for AVRO logical types (decimal, date, time, uuid)
  • URI Parsing: Malformed URLs in configuration cause URISyntaxException during open()
  • Network Failures: Connection timeouts, DNS resolution failures throw IOException

Error Recovery

The connector does not implement automatic retry logic. Error handling strategies:

  1. Pulsar Framework: Configure dead letter topic and retry policies at the Pulsar level
  2. Monitoring: Monitor connector status and HTTP endpoint availability
  3. Configuration: Ensure target HTTP endpoints are reliable and properly configured

Configuration Examples

Basic Webhook Configuration

url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
headers:
  Content-Type: "application/json"

Authenticated API Integration

url: "https://api.example.com/v1/events"
headers:
  Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
  X-API-Key: "your-api-key"
  X-Source: "pulsar-connector"

Custom Webhook with Routing

url: "https://webhook.example.com/pulsar-events"
headers:
  X-Event-Source: "pulsar"
  X-Environment: "production"
  X-Team: "data-platform"

Deployment

NAR Package Deployment

# Download or build the NAR file
wget https://archive.apache.org/dist/pulsar/pulsar-4.0.6/connectors/pulsar-io-http-4.0.6.nar

# Copy to Pulsar connectors directory
cp pulsar-io-http-4.0.6.nar $PULSAR_HOME/connectors/

# Create sink instance
bin/pulsar-admin sinks create \
    --archive pulsar-io-http-4.0.6.nar \
    --inputs persistent://public/default/events \
    --name http-webhook-sink \
    --sink-config '{
        "url": "https://example.com/webhook",
        "headers": {
            "Authorization": "Bearer token"
        }
    }'

Docker Deployment

# Using Pulsar Docker image
docker run -it \
    -v /path/to/config:/pulsar/conf \
    -v /path/to/connectors:/pulsar/connectors \
    apachepulsar/pulsar:4.0.6 \
    bin/pulsar-admin sinks create \
    --archive /pulsar/connectors/pulsar-io-http-4.0.6.nar \
    --inputs my-topic \
    --name http-sink \
    --sink-config-file /pulsar/conf/http-sink.yaml

Management Operations

# List running sinks
bin/pulsar-admin sinks list

# Get sink status
bin/pulsar-admin sinks status --name http-sink

# Update sink configuration
bin/pulsar-admin sinks update \
    --name http-sink \
    --sink-config '{
        "url": "https://new-endpoint.com/webhook",
        "headers": {"Authorization": "Bearer new-token"}
    }'

# Stop and delete sink
bin/pulsar-admin sinks stop --name http-sink
bin/pulsar-admin sinks delete --name http-sink