or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Pulsar HTTP Sink Connector

1

2

Apache Pulsar HTTP Sink Connector enables streaming data from Pulsar topics to external HTTP endpoints through webhook-style POST requests. The connector transforms Pulsar records into JSON payloads and enriches HTTP requests with metadata headers, providing a reliable bridge between Pulsar's pub-sub messaging and HTTP-based services.

3

4

## Package Information

5

6

- **Package Name**: pulsar-io-http

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Maven Coordinates**: `org.apache.pulsar:pulsar-io-http:4.0.6`

10

- **Installation**: Include as dependency in Maven/Gradle or deploy as Pulsar connector NAR

11

- **License**: Apache License 2.0

12

13

## Core Imports

14

15

```java

16

import org.apache.pulsar.io.http.HttpSink;

17

import org.apache.pulsar.io.http.HttpSinkConfig;

18

import org.apache.pulsar.io.http.JsonConverter;

19

```

20

21

## Basic Usage

22

23

### Maven Dependency

24

25

```xml

26

<dependency>

27

<groupId>org.apache.pulsar</groupId>

28

<artifactId>pulsar-io-http</artifactId>

29

<version>4.0.6</version>

30

</dependency>

31

```

32

33

### Connector Configuration

34

35

```yaml

36

# http-sink-config.yaml

37

url: "https://example.com/webhook"

38

headers:

39

Authorization: "Bearer your-token"

40

Content-Type: "application/json"

41

X-Custom-Header: "custom-value"

42

```

43

44

### Pulsar Admin Configuration

45

46

```bash

47

# Create sink connector

48

bin/pulsar-admin sinks create \

49

--archive pulsar-io-http-4.0.6.nar \

50

--inputs my-topic \

51

--name http-sink \

52

--sink-config-file http-sink-config.yaml

53

```

54

55

### Programmatic Usage

56

57

```java

58

import java.util.HashMap;

59

import java.util.Map;

60

import org.apache.pulsar.io.http.HttpSink;

61

import org.apache.pulsar.io.http.HttpSinkConfig;

62

63

// Create and configure the sink

64

HttpSink httpSink = new HttpSink();

65

Map<String, Object> config = new HashMap<>();

66

config.put("url", "https://example.com/webhook");

67

Map<String, String> headers = new HashMap<>();

68

headers.put("Authorization", "Bearer token");

69

config.put("headers", headers);

70

71

// Initialize the sink

72

httpSink.open(config, sinkContext);

73

74

// Process records (typically handled by Pulsar runtime)

75

httpSink.write(pulsarRecord);

76

77

// Clean up

78

httpSink.close();

79

```

80

81

## Architecture

82

83

The HTTP Sink Connector follows the Pulsar IO framework architecture:

84

85

- **HttpSink**: Main connector class implementing the `Sink<GenericObject>` interface

86

- **HttpSinkConfig**: Configuration model handling URL and custom headers

87

- **JsonConverter**: Utility for converting AVRO records to JSON format

88

- **HTTP Client**: Java 11 HttpClient for reliable HTTP request handling

89

- **Schema Support**: Built-in support for primitives, AVRO, JSON, and KEY_VALUE schemas

90

91

## Capabilities

92

93

### HTTP Sink Implementation

94

95

Main connector class that processes Pulsar records and sends them as HTTP POST requests.

96

97

```java { .api }

98

public class HttpSink implements Sink<GenericObject> {

99

/**

100

* Initialize the HTTP sink with configuration

101

* @param config - Configuration map with "url" and "headers"

102

* @param sinkContext - Pulsar sink context for runtime information

103

* @throws Exception - Configuration errors or URI parsing failures

104

*/

105

void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;

106

107

/**

108

* Process a Pulsar record and send via HTTP POST

109

* @param record - Pulsar record to be processed and sent via HTTP

110

* @throws Exception - JSON conversion errors or HTTP failures (non-2xx status)

111

*/

112

void write(Record<GenericObject> record) throws Exception;

113

114

/**

115

* Clean up resources (no-op implementation)

116

*/

117

void close();

118

}

119

```

120

121

**Internal Components:**

122

- `HttpSinkConfig httpSinkConfig`: Loaded configuration object

123

- `HttpClient httpClient`: Java 11 HTTP client for requests

124

- `ObjectMapper mapper`: Jackson mapper with JavaTimeModule for JSON serialization

125

- `URI uri`: Parsed target URL for HTTP requests

126

- `toJsonSerializable(Schema, Object)`: Private method handling schema-specific JSON conversion

127

128

**Behavior:**

129

- Converts record value to JSON payload using schema-aware conversion

130

- Adds metadata headers (topic, key, timestamps, message ID, properties)

131

- Sends HTTP POST request to configured URL with application/json content type

132

- Throws `IOException` for HTTP errors (status codes outside 200-299 range)

133

134

### Configuration Management

135

136

Configuration class for HTTP sink connector settings with validation and loading capabilities.

137

138

```java { .api }

139

@Data

140

@Accessors(chain = true)

141

public class HttpSinkConfig implements Serializable {

142

@FieldDoc(defaultValue = "http://localhost", help = "The URL of the HTTP server")

143

private String url = "http://localhost";

144

145

@FieldDoc(defaultValue = "", help = "The list of default headers added to each request")

146

private Map<String, String> headers = new HashMap<>();

147

148

/**

149

* Load configuration from YAML file

150

* @param yamlFile - Path to YAML configuration file

151

* @return HttpSinkConfig instance

152

* @throws IOException - File reading or parsing errors

153

*/

154

static HttpSinkConfig load(String yamlFile) throws IOException;

155

156

/**

157

* Load configuration from Map object

158

* @param map - Configuration map with "url" and "headers" keys

159

* @return HttpSinkConfig instance

160

* @throws IOException - Map conversion or validation errors

161

*/

162

static HttpSinkConfig load(Map<String, Object> map) throws IOException;

163

164

// Lombok @Data generates getter and setter methods

165

String getUrl();

166

HttpSinkConfig setUrl(String url);

167

Map<String, String> getHeaders();

168

HttpSinkConfig setHeaders(Map<String, String> headers);

169

}

170

```

171

172

**Lombok Annotations:**

173

- `@Data`: Generates getters, setters, toString, equals, and hashCode methods

174

- `@Accessors(chain = true)`: Enables fluent method chaining (setters return `this`)

175

- `@FieldDoc`: Pulsar IO annotation for configuration field documentation

176

177

**Configuration Fields:**

178

- `url`: Target HTTP endpoint URL (default: "http://localhost")

179

- `headers`: Custom headers to include in HTTP requests (default: empty map)

180

181

**Loading Methods:**

182

- `load(String yamlFile)`: Load configuration from YAML file using Jackson YAML factory

183

- `load(Map<String, Object> map)`: Load configuration from Map object via JSON conversion

184

185

### JSON Conversion Utilities

186

187

Utility class for converting AVRO GenericRecord objects to Jackson JsonNode format.

188

189

```java { .api }

190

public class JsonConverter {

191

/**

192

* Convert an AVRO GenericRecord to a JsonNode

193

* @param genericRecord - AVRO generic record to convert

194

* @return JsonNode representation, or null if input is null

195

*/

196

static JsonNode toJson(GenericRecord genericRecord);

197

198

/**

199

* Convert typed value with schema to JsonNode

200

* @param schema - AVRO schema for the value

201

* @param value - Value to convert (may be null)

202

* @return JsonNode representation

203

*/

204

static JsonNode toJson(Schema schema, Object value);

205

206

/**

207

* Merge two JSON objects at top level

208

* @param n1 - First JsonNode

209

* @param n2 - Second JsonNode

210

* @return Merged ObjectNode with fields from both inputs

211

*/

212

static JsonNode topLevelMerge(JsonNode n1, JsonNode n2);

213

214

/**

215

* Convert JSON object to array with specified field values

216

* @param jsonNode - Source JSON object

217

* @param fields - List of field names to include in array

218

* @return ArrayNode containing values for the specified fields

219

*/

220

static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields);

221

}

222

```

223

224

**Supported AVRO Types:**

225

- **Primitives**: NULL, INT, LONG, DOUBLE, FLOAT, BOOLEAN, BYTES, FIXED, ENUM, STRING

226

- **Complex Types**: ARRAY, MAP, RECORD, UNION

227

- **Logical Types**: decimal, date, time-millis, time-micros, timestamp-millis, timestamp-micros, uuid

228

229

## HTTP Request Format

230

231

### Request Structure

232

233

All HTTP requests are sent as POST requests with the following characteristics:

234

235

- **Method**: POST

236

- **Content-Type**: application/json

237

- **Body**: JSON representation of the Pulsar record value

238

239

### Metadata Headers

240

241

The connector automatically adds these headers to every HTTP request:

242

243

```java { .api }

244

// Standard headers

245

"Content-Type" -> "application/json"

246

247

// Pulsar metadata headers

248

"PulsarTopic" -> record.getTopicName()

249

"PulsarKey" -> record.getKey()

250

"PulsarEventTime" -> record.getEventTime().toString()

251

"PulsarPublishTime" -> String.valueOf(message.getPublishTime())

252

"PulsarMessageId" -> Base64.getEncoder().encodeToString(messageId.toByteArray())

253

254

// Custom message properties (prefixed)

255

"PulsarProperties-<property-name>" -> property-value

256

257

// User-configured custom headers

258

<header-name> -> <header-value>

259

```

260

261

### JSON Payload Examples

262

263

**Primitive Value:**

264

```json

265

"hello world"

266

```

267

268

**KEY_VALUE Schema:**

269

```json

270

{

271

"key": "user-123",

272

"value": {

273

"name": "John Doe",

274

"age": 30,

275

"active": true

276

}

277

}

278

```

279

280

**AVRO Record:**

281

```json

282

{

283

"id": 123,

284

"name": "Product A",

285

"price": 29.99,

286

"category": {

287

"id": 1,

288

"name": "Electronics"

289

},

290

"tags": ["gadget", "mobile"]

291

}

292

```

293

294

## Schema Support

295

296

### Supported Schema Types

297

298

The connector supports all Pulsar schema types with automatic JSON conversion:

299

300

```java { .api }

301

// Primitive schemas - direct JSON representation

302

Schema.STRING, Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64

303

Schema.BOOL, Schema.FLOAT, Schema.DOUBLE, Schema.BYTES

304

305

// Date/time schemas

306

Schema.DATE, Schema.TIME, Schema.TIMESTAMP, Schema.INSTANT

307

Schema.LOCAL_DATE, Schema.LOCAL_TIME, Schema.LOCAL_DATE_TIME

308

309

// Complex schemas

310

Schema.JSON(Class) // Passed through as-is

311

Schema.AVRO(Class) // Converted via JsonConverter

312

Schema.KeyValue(keySchema, valueSchema) // Converted to {"key": ..., "value": ...}

313

```

314

315

### Schema Conversion Examples

316

317

```java

318

// Primitive types

319

Schema.STRING.encode("hello") -> "hello"

320

Schema.INT32.encode(42) -> 42

321

Schema.BOOL.encode(true) -> true

322

323

// Key-Value pairs

324

KeyValueSchema<String, User> kvSchema = Schema.KeyValue(Schema.STRING, Schema.AVRO(User.class));

325

// Produces: {"key": "user-id", "value": {"name": "John", "age": 30}}

326

327

// AVRO records automatically converted to JSON objects

328

// JSON schema values passed through unchanged

329

```

330

331

## Error Handling

332

333

### HTTP Response Handling

334

335

```java { .api }

336

// Successful responses (200-299 status codes)

337

// - Request completes successfully

338

// - No exception thrown

339

// - Record processing continues

340

341

// Error responses (all other status codes)

342

// - Throws IOException with status code

343

// - Stops record processing

344

// - Requires manual intervention or retry logic

345

```

346

347

### Exception Types

348

349

```java { .api }

350

// HTTP request failures (write method)

351

IOException: HTTP request failed with non-2xx status code

352

// Example: "HTTP call to https://example.com/webhook failed with status code 404"

353

354

// Configuration errors (open method)

355

Exception: Configuration loading failures (invalid YAML, missing required fields)

356

URISyntaxException: Invalid URL format in configuration

357

358

// JSON conversion errors (write method)

359

UnsupportedOperationException: Unsupported schema type (not AVRO, JSON, KEY_VALUE, or primitive)

360

IllegalArgumentException: Invalid logical type values (e.g., non-BigDecimal for decimal type)

361

362

// HTTP client errors (write method)

363

IOException: Network connectivity issues, request timeout, or other HTTP client failures

364

```

365

366

**Specific Error Conditions:**

367

- **Status Code Validation**: Any HTTP response with status < 200 or >= 300 throws IOException

368

- **Schema Type Support**: Only AVRO, JSON, KEY_VALUE, and primitive schemas supported

369

- **Logical Type Validation**: Strict type checking for AVRO logical types (decimal, date, time, uuid)

370

- **URI Parsing**: Malformed URLs in configuration cause URISyntaxException during open()

371

- **Network Failures**: Connection timeouts, DNS resolution failures throw IOException

372

373

### Error Recovery

374

375

The connector does not implement automatic retry logic. Error handling strategies:

376

377

1. **Pulsar Framework**: Configure dead letter topic and retry policies at the Pulsar level

378

2. **Monitoring**: Monitor connector status and HTTP endpoint availability

379

3. **Configuration**: Ensure target HTTP endpoints are reliable and properly configured

380

381

## Configuration Examples

382

383

### Basic Webhook Configuration

384

385

```yaml

386

url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"

387

headers:

388

Content-Type: "application/json"

389

```

390

391

### Authenticated API Integration

392

393

```yaml

394

url: "https://api.example.com/v1/events"

395

headers:

396

Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

397

X-API-Key: "your-api-key"

398

X-Source: "pulsar-connector"

399

```

400

401

### Custom Webhook with Routing

402

403

```yaml

404

url: "https://webhook.example.com/pulsar-events"

405

headers:

406

X-Event-Source: "pulsar"

407

X-Environment: "production"

408

X-Team: "data-platform"

409

```

410

411

## Deployment

412

413

### NAR Package Deployment

414

415

```bash

416

# Download or build the NAR file

417

wget https://archive.apache.org/dist/pulsar/pulsar-4.0.6/connectors/pulsar-io-http-4.0.6.nar

418

419

# Copy to Pulsar connectors directory

420

cp pulsar-io-http-4.0.6.nar $PULSAR_HOME/connectors/

421

422

# Create sink instance

423

bin/pulsar-admin sinks create \

424

--archive pulsar-io-http-4.0.6.nar \

425

--inputs persistent://public/default/events \

426

--name http-webhook-sink \

427

--sink-config '{

428

"url": "https://example.com/webhook",

429

"headers": {

430

"Authorization": "Bearer token"

431

}

432

}'

433

```

434

435

### Docker Deployment

436

437

```bash

438

# Using Pulsar Docker image

439

docker run -it \

440

-v /path/to/config:/pulsar/conf \

441

-v /path/to/connectors:/pulsar/connectors \

442

apachepulsar/pulsar:4.0.6 \

443

bin/pulsar-admin sinks create \

444

--archive /pulsar/connectors/pulsar-io-http-4.0.6.nar \

445

--inputs my-topic \

446

--name http-sink \

447

--sink-config-file /pulsar/conf/http-sink.yaml

448

```

449

450

### Management Operations

451

452

```bash

453

# List running sinks

454

bin/pulsar-admin sinks list

455

456

# Get sink status

457

bin/pulsar-admin sinks status --name http-sink

458

459

# Update sink configuration

460

bin/pulsar-admin sinks update \

461

--name http-sink \

462

--sink-config '{

463

"url": "https://new-endpoint.com/webhook",

464

"headers": {"Authorization": "Bearer new-token"}

465

}'

466

467

# Stop and delete sink

468

bin/pulsar-admin sinks stop --name http-sink

469

bin/pulsar-admin sinks delete --name http-sink

470

```