0
# Pulsar IO Core
1
2
Apache Pulsar IO Core provides the foundational interfaces and abstractions for building Pulsar IO connectors that enable data integration between Pulsar and external systems. It includes core interfaces for data ingestion (Sources) and data egress (Sinks), batch processing capabilities, and metadata annotations for connector discovery and configuration.
3
4
## Package Information
5
6
- **Package Name**: pulsar-io-core
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.pulsar
10
- **Artifact ID**: pulsar-io-core
11
- **Installation**: `<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-io-core</artifactId><version>4.0.6</version></dependency>`
12
13
## Core Imports
14
15
```java
16
import org.apache.pulsar.io.core.Source;
17
import org.apache.pulsar.io.core.Sink;
18
import org.apache.pulsar.io.core.BatchSource;
19
import org.apache.pulsar.io.core.PushSource;
20
import org.apache.pulsar.io.core.SourceContext;
21
import org.apache.pulsar.io.core.SinkContext;
22
import org.apache.pulsar.io.core.KeyValue;
23
import org.apache.pulsar.io.core.annotations.Connector;
24
import org.apache.pulsar.io.core.annotations.FieldDoc;
25
import org.apache.pulsar.io.core.annotations.IOType;
26
```
27
28
## Basic Usage
29
30
### Simple Source Connector
31
32
```java
33
import org.apache.pulsar.io.core.Source;
34
import org.apache.pulsar.io.core.SourceContext;
35
import org.apache.pulsar.functions.api.Record;
36
import java.util.Map;
37
38
public class MySource implements Source<String> {
39
@Override
40
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
41
// Initialize your source connector with configuration
42
}
43
44
@Override
45
public Record<String> read() throws Exception {
46
// Read and return the next message from your external system
47
// This method should block if no data is available
48
return null; // Return actual Record<String> object
49
}
50
51
@Override
52
public void close() throws Exception {
53
// Clean up resources
54
}
55
}
56
```
57
58
### Simple Sink Connector
59
60
```java
61
import org.apache.pulsar.io.core.Sink;
62
import org.apache.pulsar.io.core.SinkContext;
63
import org.apache.pulsar.functions.api.Record;
64
import java.util.Map;
65
66
public class MySink implements Sink<String> {
67
@Override
68
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
69
// Initialize your sink connector with configuration
70
}
71
72
@Override
73
public void write(Record<String> record) throws Exception {
74
// Write the record to your external system
75
}
76
77
@Override
78
public void close() throws Exception {
79
// Clean up resources
80
}
81
}
82
```
83
84
## Architecture
85
86
Pulsar IO Core follows a clean separation of concerns:
87
88
- **Connector Interfaces**: Core abstractions (`Source`, `Sink`, `BatchSource`) define the contract for data movement
89
- **Context Objects**: Runtime environment (`SourceContext`, `SinkContext`) provides access to Pulsar capabilities
90
- **Push vs Pull Patterns**: Support for both traditional pull-based (`Source`) and push-based (`PushSource`) patterns
91
- **Batch Processing**: Specialized interfaces for efficient batch data processing
92
- **Lifecycle Management**: Consistent initialization and cleanup through `AutoCloseable`
93
- **Metadata Annotations**: Declarative connector configuration and documentation
94
95
## Capabilities
96
97
### Core Source Interfaces
98
99
Primary interfaces for reading data from external systems and publishing to Pulsar topics.
100
101
```java { .api }
102
// Basic pull-based source interface
103
public interface Source<T> extends AutoCloseable {
104
void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
105
Record<T> read() throws Exception;
106
}
107
108
// Batch processing source interface
109
public interface BatchSource<T> extends AutoCloseable {
110
void open(Map<String, Object> config, SourceContext context) throws Exception;
111
void discover(Consumer<byte[]> taskEater) throws Exception;
112
void prepare(byte[] task) throws Exception;
113
Record<T> readNext() throws Exception;
114
}
115
```
116
117
[Source Interfaces](./source-interfaces.md)
118
119
### Core Sink Interfaces
120
121
Primary interfaces for writing data from Pulsar to external systems.
122
123
```java { .api }
124
// Basic sink interface
125
public interface Sink<T> extends AutoCloseable {
126
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
127
void write(Record<T> record) throws Exception;
128
}
129
```
130
131
[Sink Interfaces](./sink-interfaces.md)
132
133
### Push-Based Sources
134
135
Abstract classes providing queue-based push source functionality for asynchronous data ingestion.
136
137
```java { .api }
138
// Push-based source using consumer callback pattern
139
public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {
140
// Inherits push mechanism functionality
141
}
142
143
// Base class for push sources with internal queue
144
public abstract class AbstractPushSource<T> {
145
static final int DEFAULT_QUEUE_LENGTH = 1000;
146
public AbstractPushSource();
147
public void consume(Record<T> record);
148
public void notifyError(Exception ex);
149
public int getQueueLength();
150
protected Record<T> readNext() throws Exception;
151
}
152
```
153
154
[Push Source Classes](./push-sources.md)
155
156
### Runtime Context
157
158
Context interfaces providing connector runtime environment and Pulsar platform capabilities.
159
160
```java { .api }
161
// Source runtime context
162
public interface SourceContext extends BaseContext {
163
String getSourceName();
164
String getOutputTopic();
165
SourceConfig getSourceConfig();
166
<T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;
167
<T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;
168
169
// BaseContext methods for tenant/namespace info, logging, state, counters, metrics
170
String getTenant();
171
String getNamespace();
172
Logger getLogger();
173
void putState(String key, ByteBuffer value);
174
ByteBuffer getState(String key);
175
void incrCounter(String key, long amount);
176
long getCounter(String key);
177
void recordMetric(String metricName, double value);
178
}
179
180
// Sink runtime context
181
public interface SinkContext extends BaseContext {
182
String getSinkName();
183
Collection<String> getInputTopics();
184
SinkConfig getSinkConfig();
185
default SubscriptionType getSubscriptionType();
186
default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException;
187
default void pause(String topic, int partition) throws PulsarClientException;
188
default void resume(String topic, int partition) throws PulsarClientException;
189
190
// BaseContext methods for tenant/namespace info, logging, state, counters, metrics
191
String getTenant();
192
String getNamespace();
193
Logger getLogger();
194
void putState(String key, ByteBuffer value);
195
ByteBuffer getState(String key);
196
void incrCounter(String key, long amount);
197
long getCounter(String key);
198
void recordMetric(String metricName, double value);
199
}
200
```
201
202
[Context Interfaces](./context-interfaces.md)
203
204
### Connector Annotations
205
206
Annotation-based metadata system for connector discovery, configuration, and documentation.
207
208
```java { .api }
209
// Connector metadata annotation
210
@Target(TYPE)
211
@Retention(RUNTIME)
212
public @interface Connector {
213
String name();
214
IOType type();
215
String help();
216
Class configClass();
217
}
218
219
// Configuration field documentation
220
@Target(FIELD)
221
@Retention(RUNTIME)
222
public @interface FieldDoc {
223
boolean required() default false;
224
String defaultValue();
225
boolean sensitive() default false;
226
String help();
227
}
228
```
229
230
[Connector Annotations](./connector-annotations.md)
231
232
### Utility Classes
233
234
Helper classes for common data structures and operations.
235
236
```java { .api }
237
// Generic key-value pair container
238
public class KeyValue<K, V> {
239
public KeyValue(K key, V value);
240
K getKey();
241
V getValue();
242
void setKey(K key);
243
void setValue(V value);
244
}
245
```
246
247
[Utility Classes](./utility-classes.md)