0
# Context Interfaces
1
2
Context interfaces provide connector runtime environment and access to Pulsar platform capabilities.
3
4
## SourceContext
5
6
Context interface providing source runtime environment and capabilities for publishing data to Pulsar topics.
7
8
```java { .api }
9
package org.apache.pulsar.io.core;
10
11
@InterfaceAudience.Public
12
@InterfaceStability.Stable
13
public interface SourceContext extends BaseContext {
14
/**
15
* Get the name of the source.
16
*
17
* @return source name
18
*/
19
String getSourceName();
20
21
/**
22
* Get the output topic name where the source publishes messages.
23
*
24
* @return output topic name
25
*/
26
String getOutputTopic();
27
28
/**
29
* Get the source configuration.
30
*
31
* @return source configuration object
32
*/
33
SourceConfig getSourceConfig();
34
35
/**
36
* Create a new output message builder for publishing to a specific topic.
37
*
38
* @param topicName name of the topic to publish to
39
* @param schema schema for message serialization
40
* @return typed message builder for constructing messages
41
* @throws PulsarClientException if unable to create message builder
42
*/
43
<T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;
44
45
/**
46
* Create a new consumer builder for reading from topics.
47
* This is useful for sources that need to consume from other Pulsar topics.
48
*
49
* @param schema schema for message deserialization
50
* @return consumer builder for creating consumers
51
* @throws PulsarClientException if unable to create consumer builder
52
*/
53
<T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;
54
55
// BaseContext inherited methods
56
String getTenant();
57
String getNamespace();
58
int getInstanceId();
59
int getNumInstances();
60
Logger getLogger();
61
String getSecret(String secretName);
62
default <X extends StateStore> X getStateStore(String name);
63
default <X extends StateStore> X getStateStore(String tenant, String ns, String name);
64
void putState(String key, ByteBuffer value);
65
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
66
ByteBuffer getState(String key);
67
CompletableFuture<ByteBuffer> getStateAsync(String key);
68
void deleteState(String key);
69
CompletableFuture<Void> deleteStateAsync(String key);
70
void incrCounter(String key, long amount);
71
CompletableFuture<Void> incrCounterAsync(String key, long amount);
72
long getCounter(String key);
73
CompletableFuture<Long> getCounterAsync(String key);
74
void recordMetric(String metricName, double value);
75
default PulsarClient getPulsarClient();
76
default ClientBuilder getPulsarClientBuilder();
77
void fatal(Throwable t);
78
}
79
```
80
81
### Usage Example
82
83
```java
84
public class DatabaseSource implements Source<Map<String, Object>> {
85
private SourceContext context;
86
private Connection connection;
87
88
@Override
89
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
90
this.context = sourceContext;
91
92
// Access source configuration
93
String sourceName = context.getSourceName();
94
String outputTopic = context.getOutputTopic();
95
96
// Get source-specific config
97
SourceConfig sourceConfig = context.getSourceConfig();
98
99
// Initialize database connection
100
String jdbcUrl = (String) config.get("jdbc.url");
101
this.connection = DriverManager.getConnection(jdbcUrl);
102
}
103
104
@Override
105
public Record<Map<String, Object>> read() throws Exception {
106
// Read data from database
107
Map<String, Object> data = readFromDatabase();
108
109
// Create output message with specific schema
110
TypedMessageBuilder<Map<String, Object>> messageBuilder =
111
context.newOutputMessage(context.getOutputTopic(), Schema.JSON(Map.class));
112
113
messageBuilder.value(data);
114
messageBuilder.property("source", context.getSourceName());
115
116
// Send message and return record
117
MessageId messageId = messageBuilder.send();
118
return new SimpleRecord<>(messageId.toString(), data);
119
}
120
}
121
```
122
123
## SinkContext
124
125
Context interface providing sink runtime environment and capabilities for consuming data from Pulsar topics.
126
127
```java { .api }
128
package org.apache.pulsar.io.core;
129
130
@InterfaceAudience.Public
131
@InterfaceStability.Stable
132
public interface SinkContext extends BaseContext {
133
/**
134
* Get the name of the sink.
135
*
136
* @return sink name
137
*/
138
String getSinkName();
139
140
/**
141
* Get the input topics that the sink consumes from.
142
*
143
* @return collection of input topic names
144
*/
145
Collection<String> getInputTopics();
146
147
/**
148
* Get the sink configuration.
149
*
150
* @return sink configuration object
151
*/
152
SinkConfig getSinkConfig();
153
154
/**
155
* Get the subscription type used by the sink.
156
* Default implementation throws UnsupportedOperationException.
157
*
158
* @return subscription type
159
* @throws UnsupportedOperationException if not supported
160
*/
161
default SubscriptionType getSubscriptionType() {
162
throw new UnsupportedOperationException("getSubscriptionType not implemented");
163
}
164
165
/**
166
* Reset subscription position to a specific message ID.
167
*
168
* @param topic topic name
169
* @param partition partition number
170
* @param messageId message ID to seek to
171
* @throws PulsarClientException if seek operation fails
172
*/
173
default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
174
throw new UnsupportedOperationException("seek not implemented");
175
}
176
177
/**
178
* Pause message consumption from a specific topic partition.
179
*
180
* @param topic topic name
181
* @param partition partition number
182
* @throws PulsarClientException if pause operation fails
183
*/
184
default void pause(String topic, int partition) throws PulsarClientException {
185
throw new UnsupportedOperationException("pause not implemented");
186
}
187
188
/**
189
* Resume message consumption from a specific topic partition.
190
*
191
* @param topic topic name
192
* @param partition partition number
193
* @throws PulsarClientException if resume operation fails
194
*/
195
default void resume(String topic, int partition) throws PulsarClientException {
196
throw new UnsupportedOperationException("resume not implemented");
197
}
198
199
// BaseContext inherited methods
200
String getTenant();
201
String getNamespace();
202
int getInstanceId();
203
int getNumInstances();
204
Logger getLogger();
205
String getSecret(String secretName);
206
default <X extends StateStore> X getStateStore(String name);
207
default <X extends StateStore> X getStateStore(String tenant, String ns, String name);
208
void putState(String key, ByteBuffer value);
209
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
210
ByteBuffer getState(String key);
211
CompletableFuture<ByteBuffer> getStateAsync(String key);
212
void deleteState(String key);
213
CompletableFuture<Void> deleteStateAsync(String key);
214
void incrCounter(String key, long amount);
215
CompletableFuture<Void> incrCounterAsync(String key, long amount);
216
long getCounter(String key);
217
CompletableFuture<Long> getCounterAsync(String key);
218
void recordMetric(String metricName, double value);
219
default PulsarClient getPulsarClient();
220
default ClientBuilder getPulsarClientBuilder();
221
void fatal(Throwable t);
222
}
223
```
224
225
### Usage Example
226
227
```java
228
public class ElasticsearchSink implements Sink<Map<String, Object>> {
229
private SinkContext context;
230
private ElasticsearchClient client;
231
232
@Override
233
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
234
this.context = sinkContext;
235
236
// Access sink configuration
237
String sinkName = context.getSinkName();
238
Collection<String> inputTopics = context.getInputTopics();
239
SinkConfig sinkConfig = context.getSinkConfig();
240
241
// Log subscription type if available
242
try {
243
SubscriptionType subType = context.getSubscriptionType();
244
System.out.println("Using subscription type: " + subType);
245
} catch (UnsupportedOperationException e) {
246
System.out.println("Subscription type not available");
247
}
248
249
// Initialize Elasticsearch client
250
String esUrl = (String) config.get("elasticsearch.url");
251
this.client = new ElasticsearchClient(esUrl);
252
}
253
254
@Override
255
public void write(Record<Map<String, Object>> record) throws Exception {
256
Map<String, Object> document = record.getValue();
257
String indexName = determineIndex(record);
258
259
// Index document in Elasticsearch
260
client.index(indexName, document);
261
262
// Optionally seek or pause/resume based on processing results
263
if (shouldPauseProcessing(document)) {
264
String topic = record.getTopicName().orElse("unknown");
265
context.pause(topic, 0); // Pause partition 0
266
}
267
}
268
269
private void handleProcessingError(Record<Map<String, Object>> record, Exception error) {
270
// Example: seek back to retry failed message
271
try {
272
String topic = record.getTopicName().orElse("unknown");
273
MessageId messageId = MessageId.fromByteArray(record.getKey().toString().getBytes());
274
context.seek(topic, 0, messageId);
275
} catch (Exception e) {
276
System.err.println("Failed to seek: " + e.getMessage());
277
}
278
}
279
}
280
```
281
282
## Flow Control Example
283
284
```java
285
public class RateLimitedSink implements Sink<String> {
286
private SinkContext context;
287
private RateLimiter rateLimiter;
288
private Map<String, Boolean> topicPausedState = new ConcurrentHashMap<>();
289
290
@Override
291
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
292
this.context = sinkContext;
293
double maxRate = (Double) config.get("max.rate.per.second");
294
this.rateLimiter = RateLimiter.create(maxRate);
295
}
296
297
@Override
298
public void write(Record<String> record) throws Exception {
299
// Acquire rate limit permit
300
if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
301
// Rate limit exceeded, pause all input topics
302
pauseAllTopics();
303
304
// Wait for permit
305
rateLimiter.acquire();
306
307
// Resume topics after rate limit allows
308
resumeAllTopics();
309
}
310
311
// Process the record
312
processRecord(record);
313
}
314
315
private void pauseAllTopics() {
316
for (String topic : context.getInputTopics()) {
317
try {
318
context.pause(topic, 0);
319
topicPausedState.put(topic, true);
320
} catch (Exception e) {
321
System.err.println("Failed to pause topic " + topic + ": " + e.getMessage());
322
}
323
}
324
}
325
326
private void resumeAllTopics() {
327
for (String topic : topicPausedState.keySet()) {
328
try {
329
context.resume(topic, 0);
330
topicPausedState.remove(topic);
331
} catch (Exception e) {
332
System.err.println("Failed to resume topic " + topic + ": " + e.getMessage());
333
}
334
}
335
}
336
}
337
```
338
339
## Types
340
341
```java { .api }
342
// Required imports
343
import java.nio.ByteBuffer;
344
import java.util.Collection;
345
import java.util.concurrent.CompletableFuture;
346
import org.apache.pulsar.client.api.ClientBuilder;
347
import org.apache.pulsar.client.api.ConsumerBuilder;
348
import org.apache.pulsar.client.api.MessageId;
349
import org.apache.pulsar.client.api.PulsarClient;
350
import org.apache.pulsar.client.api.PulsarClientException;
351
import org.apache.pulsar.client.api.Schema;
352
import org.apache.pulsar.client.api.SubscriptionType;
353
import org.apache.pulsar.client.api.TypedMessageBuilder;
354
import org.apache.pulsar.common.classification.InterfaceAudience;
355
import org.apache.pulsar.common.classification.InterfaceStability;
356
import org.apache.pulsar.functions.api.BaseContext;
357
import org.apache.pulsar.functions.api.StateStore;
358
import org.apache.pulsar.io.core.SinkConfig;
359
import org.apache.pulsar.io.core.SourceConfig;
360
import org.slf4j.Logger;
361
```