or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.pulsar/pulsar-io-core@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-core@4.0.0

0

# Pulsar IO Core

1

2

Apache Pulsar IO Core provides the foundational interfaces and abstractions for building Pulsar IO connectors that enable data integration between Pulsar and external systems. It includes core interfaces for data ingestion (Sources) and data egress (Sinks), batch processing capabilities, and metadata annotations for connector discovery and configuration.

3

4

## Package Information

5

6

- **Package Name**: pulsar-io-core

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.pulsar

10

- **Artifact ID**: pulsar-io-core

11

- **Installation**: `<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-io-core</artifactId><version>4.0.6</version></dependency>`

12

13

## Core Imports

14

15

```java

16

import org.apache.pulsar.io.core.Source;

17

import org.apache.pulsar.io.core.Sink;

18

import org.apache.pulsar.io.core.BatchSource;

19

import org.apache.pulsar.io.core.PushSource;

20

import org.apache.pulsar.io.core.SourceContext;

21

import org.apache.pulsar.io.core.SinkContext;

22

import org.apache.pulsar.io.core.KeyValue;

23

import org.apache.pulsar.io.core.annotations.Connector;

24

import org.apache.pulsar.io.core.annotations.FieldDoc;

25

import org.apache.pulsar.io.core.annotations.IOType;

26

```

27

28

## Basic Usage

29

30

### Simple Source Connector

31

32

```java

33

import org.apache.pulsar.io.core.Source;

34

import org.apache.pulsar.io.core.SourceContext;

35

import org.apache.pulsar.functions.api.Record;

36

import java.util.Map;

37

38

public class MySource implements Source<String> {

39

@Override

40

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

41

// Initialize your source connector with configuration

42

}

43

44

@Override

45

public Record<String> read() throws Exception {

46

// Read and return the next message from your external system

47

// This method should block if no data is available

48

return null; // Return actual Record<String> object

49

}

50

51

@Override

52

public void close() throws Exception {

53

// Clean up resources

54

}

55

}

56

```

57

58

### Simple Sink Connector

59

60

```java

61

import org.apache.pulsar.io.core.Sink;

62

import org.apache.pulsar.io.core.SinkContext;

63

import org.apache.pulsar.functions.api.Record;

64

import java.util.Map;

65

66

public class MySink implements Sink<String> {

67

@Override

68

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

69

// Initialize your sink connector with configuration

70

}

71

72

@Override

73

public void write(Record<String> record) throws Exception {

74

// Write the record to your external system

75

}

76

77

@Override

78

public void close() throws Exception {

79

// Clean up resources

80

}

81

}

82

```

83

84

## Architecture

85

86

Pulsar IO Core follows a clean separation of concerns:

87

88

- **Connector Interfaces**: Core abstractions (`Source`, `Sink`, `BatchSource`) define the contract for data movement

89

- **Context Objects**: Runtime environment (`SourceContext`, `SinkContext`) provides access to Pulsar capabilities

90

- **Push vs Pull Patterns**: Support for both traditional pull-based (`Source`) and push-based (`PushSource`) patterns

91

- **Batch Processing**: Specialized interfaces for efficient batch data processing

92

- **Lifecycle Management**: Consistent initialization and cleanup through `AutoCloseable`

93

- **Metadata Annotations**: Declarative connector configuration and documentation

94

95

## Capabilities

96

97

### Core Source Interfaces

98

99

Primary interfaces for reading data from external systems and publishing to Pulsar topics.

100

101

```java { .api }

102

// Basic pull-based source interface

103

public interface Source<T> extends AutoCloseable {

104

void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;

105

Record<T> read() throws Exception;

106

}

107

108

// Batch processing source interface

109

public interface BatchSource<T> extends AutoCloseable {

110

void open(Map<String, Object> config, SourceContext context) throws Exception;

111

void discover(Consumer<byte[]> taskEater) throws Exception;

112

void prepare(byte[] task) throws Exception;

113

Record<T> readNext() throws Exception;

114

}

115

```

116

117

[Source Interfaces](./source-interfaces.md)

118

119

### Core Sink Interfaces

120

121

Primary interfaces for writing data from Pulsar to external systems.

122

123

```java { .api }

124

// Basic sink interface

125

public interface Sink<T> extends AutoCloseable {

126

void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;

127

void write(Record<T> record) throws Exception;

128

}

129

```

130

131

[Sink Interfaces](./sink-interfaces.md)

132

133

### Push-Based Sources

134

135

Abstract classes providing queue-based push source functionality for asynchronous data ingestion.

136

137

```java { .api }

138

// Push-based source using consumer callback pattern

139

public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {

140

// Inherits push mechanism functionality

141

}

142

143

// Base class for push sources with internal queue

144

public abstract class AbstractPushSource<T> {

145

static final int DEFAULT_QUEUE_LENGTH = 1000;

146

public AbstractPushSource();

147

public void consume(Record<T> record);

148

public void notifyError(Exception ex);

149

public int getQueueLength();

150

protected Record<T> readNext() throws Exception;

151

}

152

```

153

154

[Push Source Classes](./push-sources.md)

155

156

### Runtime Context

157

158

Context interfaces providing connector runtime environment and Pulsar platform capabilities.

159

160

```java { .api }

161

// Source runtime context

162

public interface SourceContext extends BaseContext {

163

String getSourceName();

164

String getOutputTopic();

165

SourceConfig getSourceConfig();

166

<T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;

167

<T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;

168

169

// BaseContext methods for tenant/namespace info, logging, state, counters, metrics

170

String getTenant();

171

String getNamespace();

172

Logger getLogger();

173

void putState(String key, ByteBuffer value);

174

ByteBuffer getState(String key);

175

void incrCounter(String key, long amount);

176

long getCounter(String key);

177

void recordMetric(String metricName, double value);

178

}

179

180

// Sink runtime context

181

public interface SinkContext extends BaseContext {

182

String getSinkName();

183

Collection<String> getInputTopics();

184

SinkConfig getSinkConfig();

185

default SubscriptionType getSubscriptionType();

186

default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException;

187

default void pause(String topic, int partition) throws PulsarClientException;

188

default void resume(String topic, int partition) throws PulsarClientException;

189

190

// BaseContext methods for tenant/namespace info, logging, state, counters, metrics

191

String getTenant();

192

String getNamespace();

193

Logger getLogger();

194

void putState(String key, ByteBuffer value);

195

ByteBuffer getState(String key);

196

void incrCounter(String key, long amount);

197

long getCounter(String key);

198

void recordMetric(String metricName, double value);

199

}

200

```

201

202

[Context Interfaces](./context-interfaces.md)

203

204

### Connector Annotations

205

206

Annotation-based metadata system for connector discovery, configuration, and documentation.

207

208

```java { .api }

209

// Connector metadata annotation

210

@Target(TYPE)

211

@Retention(RUNTIME)

212

public @interface Connector {

213

String name();

214

IOType type();

215

String help();

216

Class configClass();

217

}

218

219

// Configuration field documentation

220

@Target(FIELD)

221

@Retention(RUNTIME)

222

public @interface FieldDoc {

223

boolean required() default false;

224

String defaultValue();

225

boolean sensitive() default false;

226

String help();

227

}

228

```

229

230

[Connector Annotations](./connector-annotations.md)

231

232

### Utility Classes

233

234

Helper classes for common data structures and operations.

235

236

```java { .api }

237

// Generic key-value pair container

238

public class KeyValue<K, V> {

239

public KeyValue(K key, V value);

240

K getKey();

241

V getValue();

242

void setKey(K key);

243

void setValue(V value);

244

}

245

```

246

247

[Utility Classes](./utility-classes.md)