or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md

connector-annotations.mddocs/

0

# Connector Annotations

1

2

Annotation-based metadata system for connector discovery, configuration, and documentation.

3

4

## @Connector

5

6

Annotation for documenting connector metadata, providing essential information for Pulsar to discover and manage connectors.

7

8

```java { .api }

9

package org.apache.pulsar.io.core.annotations;

10

11

@Target(ElementType.TYPE)

12

@Retention(RetentionPolicy.RUNTIME)

13

@InterfaceAudience.Public

14

@InterfaceStability.Stable

15

public @interface Connector {

16

/**

17

* Name of the connector.

18

*

19

* @return connector name

20

*/

21

String name();

22

23

/**

24

* Type of the connector (SOURCE or SINK).

25

*

26

* @return connector type

27

*/

28

IOType type();

29

30

/**

31

* Description of what the connector does.

32

*

33

* @return connector help text

34

*/

35

String help();

36

37

/**

38

* Configuration class that defines the connector's configuration schema.

39

*

40

* @return configuration class

41

*/

42

Class configClass();

43

}

44

```

45

46

### Usage Example

47

48

```java

49

@Connector(

50

name = "file-source",

51

type = IOType.SOURCE,

52

help = "Reads data from files and publishes to Pulsar topics",

53

configClass = FileSourceConfig.class

54

)

55

public class FileSource implements Source<String> {

56

@Override

57

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

58

// Implementation

59

}

60

61

@Override

62

public Record<String> read() throws Exception {

63

// Implementation

64

return null;

65

}

66

67

@Override

68

public void close() throws Exception {

69

// Implementation

70

}

71

}

72

73

// Configuration class referenced by @Connector

74

public class FileSourceConfig {

75

@FieldDoc(

76

required = true,

77

help = "Path to the input file"

78

)

79

private String filePath;

80

81

@FieldDoc(

82

required = false,

83

defaultValue = "1000",

84

help = "Polling interval in milliseconds"

85

)

86

private int pollingIntervalMs = 1000;

87

88

// Getters and setters...

89

}

90

```

91

92

## @FieldDoc

93

94

Annotation for documenting configuration fields, providing metadata about field requirements, defaults, and descriptions.

95

96

```java { .api }

97

package org.apache.pulsar.io.core.annotations;

98

99

@Target(ElementType.FIELD)

100

@Retention(RetentionPolicy.RUNTIME)

101

@InterfaceAudience.Public

102

@InterfaceStability.Stable

103

public @interface FieldDoc {

104

/**

105

* Whether this field is required.

106

*

107

* @return true if field is required, false otherwise (default: false)

108

*/

109

boolean required() default false;

110

111

/**

112

* Default value description for the field.

113

*

114

* @return default value description

115

*/

116

String defaultValue();

117

118

/**

119

* Whether this field contains sensitive data.

120

* Sensitive fields may be handled differently for security purposes.

121

*

122

* @return true if field is sensitive, false otherwise (default: false)

123

*/

124

boolean sensitive() default false;

125

126

/**

127

* Help text describing what this field does.

128

*

129

* @return field description

130

*/

131

String help();

132

}

133

```

134

135

### Usage Example

136

137

```java

138

public class DatabaseSinkConfig {

139

@FieldDoc(

140

required = true,

141

help = "JDBC URL for database connection"

142

)

143

private String jdbcUrl;

144

145

@FieldDoc(

146

required = true,

147

sensitive = true,

148

help = "Database username"

149

)

150

private String username;

151

152

@FieldDoc(

153

required = true,

154

sensitive = true,

155

help = "Database password"

156

)

157

private String password;

158

159

@FieldDoc(

160

required = false,

161

defaultValue = "data_table",

162

help = "Name of the table to insert data into"

163

)

164

private String tableName = "data_table";

165

166

@FieldDoc(

167

required = false,

168

defaultValue = "100",

169

help = "Batch size for bulk inserts"

170

)

171

private int batchSize = 100;

172

173

@FieldDoc(

174

required = false,

175

defaultValue = "30000",

176

help = "Connection timeout in milliseconds"

177

)

178

private int connectionTimeoutMs = 30000;

179

180

// Getters and setters...

181

public String getJdbcUrl() { return jdbcUrl; }

182

public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; }

183

184

public String getUsername() { return username; }

185

public void setUsername(String username) { this.username = username; }

186

187

public String getPassword() { return password; }

188

public void setPassword(String password) { this.password = password; }

189

190

public String getTableName() { return tableName; }

191

public void setTableName(String tableName) { this.tableName = tableName; }

192

193

public int getBatchSize() { return batchSize; }

194

public void setBatchSize(int batchSize) { this.batchSize = batchSize; }

195

196

public int getConnectionTimeoutMs() { return connectionTimeoutMs; }

197

public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; }

198

}

199

```

200

201

## IOType

202

203

Enumeration of connector types used by the @Connector annotation.

204

205

```java { .api }

206

package org.apache.pulsar.io.core.annotations;

207

208

@InterfaceAudience.Public

209

@InterfaceStability.Stable

210

public enum IOType {

211

/**

212

* Source connector type - reads data from external systems into Pulsar.

213

*/

214

SOURCE,

215

216

/**

217

* Sink connector type - writes data from Pulsar to external systems.

218

*/

219

SINK

220

}

221

```

222

223

## Complete Sink Example with Annotations

224

225

```java

226

@Connector(

227

name = "elasticsearch-sink",

228

type = IOType.SINK,

229

help = "Writes data from Pulsar topics to Elasticsearch indices",

230

configClass = ElasticsearchSinkConfig.class

231

)

232

public class ElasticsearchSink implements Sink<Map<String, Object>> {

233

private ElasticsearchClient client;

234

private ElasticsearchSinkConfig config;

235

236

@Override

237

public void open(Map<String, Object> configMap, SinkContext sinkContext) throws Exception {

238

// Convert Map to strongly typed config object

239

this.config = ConfigurationUtils.create(configMap, ElasticsearchSinkConfig.class,

240

sinkContext.getSinkConfig().getConfigs());

241

242

// Initialize Elasticsearch client

243

this.client = ElasticsearchClient.builder()

244

.hosts(config.getElasticsearchUrl())

245

.username(config.getUsername())

246

.password(config.getPassword())

247

.connectTimeout(config.getConnectionTimeoutMs())

248

.build();

249

}

250

251

@Override

252

public void write(Record<Map<String, Object>> record) throws Exception {

253

Map<String, Object> document = record.getValue();

254

String indexName = config.getIndexName();

255

256

client.index(indexName, document);

257

}

258

259

@Override

260

public void close() throws Exception {

261

if (client != null) {

262

client.close();

263

}

264

}

265

}

266

267

public class ElasticsearchSinkConfig {

268

@FieldDoc(

269

required = true,

270

help = "Elasticsearch server URL (e.g., http://localhost:9200)"

271

)

272

private String elasticsearchUrl;

273

274

@FieldDoc(

275

required = false,

276

defaultValue = "pulsar-data",

277

help = "Name of the Elasticsearch index to write to"

278

)

279

private String indexName = "pulsar-data";

280

281

@FieldDoc(

282

required = false,

283

sensitive = true,

284

help = "Username for Elasticsearch authentication"

285

)

286

private String username;

287

288

@FieldDoc(

289

required = false,

290

sensitive = true,

291

help = "Password for Elasticsearch authentication"

292

)

293

private String password;

294

295

@FieldDoc(

296

required = false,

297

defaultValue = "30000",

298

help = "Connection timeout in milliseconds"

299

)

300

private int connectionTimeoutMs = 30000;

301

302

@FieldDoc(

303

required = false,

304

defaultValue = "100",

305

help = "Batch size for bulk operations"

306

)

307

private int batchSize = 100;

308

309

// Getters and setters...

310

}

311

```

312

313

## Connector Discovery Example

314

315

```java

316

// Utility class for discovering connectors using annotations

317

public class ConnectorDiscovery {

318

public static ConnectorMetadata getConnectorMetadata(Class<?> connectorClass) {

319

Connector connectorAnnotation = connectorClass.getAnnotation(Connector.class);

320

if (connectorAnnotation == null) {

321

throw new IllegalArgumentException("Class is not annotated with @Connector");

322

}

323

324

ConnectorMetadata metadata = new ConnectorMetadata();

325

metadata.setName(connectorAnnotation.name());

326

metadata.setType(connectorAnnotation.type());

327

metadata.setHelp(connectorAnnotation.help());

328

metadata.setConfigClass(connectorAnnotation.configClass());

329

330

// Discover configuration fields

331

Field[] fields = connectorAnnotation.configClass().getDeclaredFields();

332

for (Field field : fields) {

333

FieldDoc fieldDoc = field.getAnnotation(FieldDoc.class);

334

if (fieldDoc != null) {

335

ConfigFieldMetadata fieldMetadata = new ConfigFieldMetadata();

336

fieldMetadata.setName(field.getName());

337

fieldMetadata.setType(field.getType());

338

fieldMetadata.setRequired(fieldDoc.required());

339

fieldMetadata.setDefaultValue(fieldDoc.defaultValue());

340

fieldMetadata.setSensitive(fieldDoc.sensitive());

341

fieldMetadata.setHelp(fieldDoc.help());

342

343

metadata.addConfigField(fieldMetadata);

344

}

345

}

346

347

return metadata;

348

}

349

}

350

```

351

352

## Types

353

354

```java { .api }

355

// Required imports

356

import java.lang.annotation.ElementType;

357

import java.lang.annotation.Retention;

358

import java.lang.annotation.RetentionPolicy;

359

import java.lang.annotation.Target;

360

import org.apache.pulsar.common.classification.InterfaceAudience;

361

import org.apache.pulsar.common.classification.InterfaceStability;

362

```