or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md

context-interfaces.mddocs/

0

# Context Interfaces

1

2

Context interfaces provide connector runtime environment and access to Pulsar platform capabilities.

3

4

## SourceContext

5

6

Context interface providing source runtime environment and capabilities for publishing data to Pulsar topics.

7

8

```java { .api }

9

package org.apache.pulsar.io.core;

10

11

@InterfaceAudience.Public

12

@InterfaceStability.Stable

13

public interface SourceContext extends BaseContext {

14

/**

15

* Get the name of the source.

16

*

17

* @return source name

18

*/

19

String getSourceName();

20

21

/**

22

* Get the output topic name where the source publishes messages.

23

*

24

* @return output topic name

25

*/

26

String getOutputTopic();

27

28

/**

29

* Get the source configuration.

30

*

31

* @return source configuration object

32

*/

33

SourceConfig getSourceConfig();

34

35

/**

36

* Create a new output message builder for publishing to a specific topic.

37

*

38

* @param topicName name of the topic to publish to

39

* @param schema schema for message serialization

40

* @return typed message builder for constructing messages

41

* @throws PulsarClientException if unable to create message builder

42

*/

43

<T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;

44

45

/**

46

* Create a new consumer builder for reading from topics.

47

* This is useful for sources that need to consume from other Pulsar topics.

48

*

49

* @param schema schema for message deserialization

50

* @return consumer builder for creating consumers

51

* @throws PulsarClientException if unable to create consumer builder

52

*/

53

<T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;

54

55

// BaseContext inherited methods

56

String getTenant();

57

String getNamespace();

58

int getInstanceId();

59

int getNumInstances();

60

Logger getLogger();

61

String getSecret(String secretName);

62

default <X extends StateStore> X getStateStore(String name);

63

default <X extends StateStore> X getStateStore(String tenant, String ns, String name);

64

void putState(String key, ByteBuffer value);

65

CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);

66

ByteBuffer getState(String key);

67

CompletableFuture<ByteBuffer> getStateAsync(String key);

68

void deleteState(String key);

69

CompletableFuture<Void> deleteStateAsync(String key);

70

void incrCounter(String key, long amount);

71

CompletableFuture<Void> incrCounterAsync(String key, long amount);

72

long getCounter(String key);

73

CompletableFuture<Long> getCounterAsync(String key);

74

void recordMetric(String metricName, double value);

75

default PulsarClient getPulsarClient();

76

default ClientBuilder getPulsarClientBuilder();

77

void fatal(Throwable t);

78

}

79

```

80

81

### Usage Example

82

83

```java

84

public class DatabaseSource implements Source<Map<String, Object>> {

85

private SourceContext context;

86

private Connection connection;

87

88

@Override

89

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

90

this.context = sourceContext;

91

92

// Access source configuration

93

String sourceName = context.getSourceName();

94

String outputTopic = context.getOutputTopic();

95

96

// Get source-specific config

97

SourceConfig sourceConfig = context.getSourceConfig();

98

99

// Initialize database connection

100

String jdbcUrl = (String) config.get("jdbc.url");

101

this.connection = DriverManager.getConnection(jdbcUrl);

102

}

103

104

@Override

105

public Record<Map<String, Object>> read() throws Exception {

106

// Read data from database

107

Map<String, Object> data = readFromDatabase();

108

109

// Create output message with specific schema

110

TypedMessageBuilder<Map<String, Object>> messageBuilder =

111

context.newOutputMessage(context.getOutputTopic(), Schema.JSON(Map.class));

112

113

messageBuilder.value(data);

114

messageBuilder.property("source", context.getSourceName());

115

116

// Send message and return record

117

MessageId messageId = messageBuilder.send();

118

return new SimpleRecord<>(messageId.toString(), data);

119

}

120

}

121

```

122

123

## SinkContext

124

125

Context interface providing sink runtime environment and capabilities for consuming data from Pulsar topics.

126

127

```java { .api }

128

package org.apache.pulsar.io.core;

129

130

@InterfaceAudience.Public

131

@InterfaceStability.Stable

132

public interface SinkContext extends BaseContext {

133

/**

134

* Get the name of the sink.

135

*

136

* @return sink name

137

*/

138

String getSinkName();

139

140

/**

141

* Get the input topics that the sink consumes from.

142

*

143

* @return collection of input topic names

144

*/

145

Collection<String> getInputTopics();

146

147

/**

148

* Get the sink configuration.

149

*

150

* @return sink configuration object

151

*/

152

SinkConfig getSinkConfig();

153

154

/**

155

* Get the subscription type used by the sink.

156

* Default implementation throws UnsupportedOperationException.

157

*

158

* @return subscription type

159

* @throws UnsupportedOperationException if not supported

160

*/

161

default SubscriptionType getSubscriptionType() {

162

throw new UnsupportedOperationException("getSubscriptionType not implemented");

163

}

164

165

/**

166

* Reset subscription position to a specific message ID.

167

*

168

* @param topic topic name

169

* @param partition partition number

170

* @param messageId message ID to seek to

171

* @throws PulsarClientException if seek operation fails

172

*/

173

default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {

174

throw new UnsupportedOperationException("seek not implemented");

175

}

176

177

/**

178

* Pause message consumption from a specific topic partition.

179

*

180

* @param topic topic name

181

* @param partition partition number

182

* @throws PulsarClientException if pause operation fails

183

*/

184

default void pause(String topic, int partition) throws PulsarClientException {

185

throw new UnsupportedOperationException("pause not implemented");

186

}

187

188

/**

189

* Resume message consumption from a specific topic partition.

190

*

191

* @param topic topic name

192

* @param partition partition number

193

* @throws PulsarClientException if resume operation fails

194

*/

195

default void resume(String topic, int partition) throws PulsarClientException {

196

throw new UnsupportedOperationException("resume not implemented");

197

}

198

199

// BaseContext inherited methods

200

String getTenant();

201

String getNamespace();

202

int getInstanceId();

203

int getNumInstances();

204

Logger getLogger();

205

String getSecret(String secretName);

206

default <X extends StateStore> X getStateStore(String name);

207

default <X extends StateStore> X getStateStore(String tenant, String ns, String name);

208

void putState(String key, ByteBuffer value);

209

CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);

210

ByteBuffer getState(String key);

211

CompletableFuture<ByteBuffer> getStateAsync(String key);

212

void deleteState(String key);

213

CompletableFuture<Void> deleteStateAsync(String key);

214

void incrCounter(String key, long amount);

215

CompletableFuture<Void> incrCounterAsync(String key, long amount);

216

long getCounter(String key);

217

CompletableFuture<Long> getCounterAsync(String key);

218

void recordMetric(String metricName, double value);

219

default PulsarClient getPulsarClient();

220

default ClientBuilder getPulsarClientBuilder();

221

void fatal(Throwable t);

222

}

223

```

224

225

### Usage Example

226

227

```java

228

public class ElasticsearchSink implements Sink<Map<String, Object>> {

229

private SinkContext context;

230

private ElasticsearchClient client;

231

232

@Override

233

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

234

this.context = sinkContext;

235

236

// Access sink configuration

237

String sinkName = context.getSinkName();

238

Collection<String> inputTopics = context.getInputTopics();

239

SinkConfig sinkConfig = context.getSinkConfig();

240

241

// Log subscription type if available

242

try {

243

SubscriptionType subType = context.getSubscriptionType();

244

System.out.println("Using subscription type: " + subType);

245

} catch (UnsupportedOperationException e) {

246

System.out.println("Subscription type not available");

247

}

248

249

// Initialize Elasticsearch client

250

String esUrl = (String) config.get("elasticsearch.url");

251

this.client = new ElasticsearchClient(esUrl);

252

}

253

254

@Override

255

public void write(Record<Map<String, Object>> record) throws Exception {

256

Map<String, Object> document = record.getValue();

257

String indexName = determineIndex(record);

258

259

// Index document in Elasticsearch

260

client.index(indexName, document);

261

262

// Optionally seek or pause/resume based on processing results

263

if (shouldPauseProcessing(document)) {

264

String topic = record.getTopicName().orElse("unknown");

265

context.pause(topic, 0); // Pause partition 0

266

}

267

}

268

269

private void handleProcessingError(Record<Map<String, Object>> record, Exception error) {

270

// Example: seek back to retry failed message

271

try {

272

String topic = record.getTopicName().orElse("unknown");

273

MessageId messageId = MessageId.fromByteArray(record.getKey().toString().getBytes());

274

context.seek(topic, 0, messageId);

275

} catch (Exception e) {

276

System.err.println("Failed to seek: " + e.getMessage());

277

}

278

}

279

}

280

```

281

282

## Flow Control Example

283

284

```java

285

public class RateLimitedSink implements Sink<String> {

286

private SinkContext context;

287

private RateLimiter rateLimiter;

288

private Map<String, Boolean> topicPausedState = new ConcurrentHashMap<>();

289

290

@Override

291

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

292

this.context = sinkContext;

293

double maxRate = (Double) config.get("max.rate.per.second");

294

this.rateLimiter = RateLimiter.create(maxRate);

295

}

296

297

@Override

298

public void write(Record<String> record) throws Exception {

299

// Acquire rate limit permit

300

if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {

301

// Rate limit exceeded, pause all input topics

302

pauseAllTopics();

303

304

// Wait for permit

305

rateLimiter.acquire();

306

307

// Resume topics after rate limit allows

308

resumeAllTopics();

309

}

310

311

// Process the record

312

processRecord(record);

313

}

314

315

private void pauseAllTopics() {

316

for (String topic : context.getInputTopics()) {

317

try {

318

context.pause(topic, 0);

319

topicPausedState.put(topic, true);

320

} catch (Exception e) {

321

System.err.println("Failed to pause topic " + topic + ": " + e.getMessage());

322

}

323

}

324

}

325

326

private void resumeAllTopics() {

327

for (String topic : topicPausedState.keySet()) {

328

try {

329

context.resume(topic, 0);

330

topicPausedState.remove(topic);

331

} catch (Exception e) {

332

System.err.println("Failed to resume topic " + topic + ": " + e.getMessage());

333

}

334

}

335

}

336

}

337

```

338

339

## Types

340

341

```java { .api }

342

// Required imports

343

import java.nio.ByteBuffer;

344

import java.util.Collection;

345

import java.util.concurrent.CompletableFuture;

346

import org.apache.pulsar.client.api.ClientBuilder;

347

import org.apache.pulsar.client.api.ConsumerBuilder;

348

import org.apache.pulsar.client.api.MessageId;

349

import org.apache.pulsar.client.api.PulsarClient;

350

import org.apache.pulsar.client.api.PulsarClientException;

351

import org.apache.pulsar.client.api.Schema;

352

import org.apache.pulsar.client.api.SubscriptionType;

353

import org.apache.pulsar.client.api.TypedMessageBuilder;

354

import org.apache.pulsar.common.classification.InterfaceAudience;

355

import org.apache.pulsar.common.classification.InterfaceStability;

356

import org.apache.pulsar.functions.api.BaseContext;

357

import org.apache.pulsar.functions.api.StateStore;

358

import org.apache.pulsar.io.core.SinkConfig;

359

import org.apache.pulsar.io.core.SourceConfig;

360

import org.slf4j.Logger;

361

```