0
# Connector Annotations
1
2
Annotation-based metadata system for connector discovery, configuration, and documentation.
3
4
## @Connector
5
6
Annotation for documenting connector metadata, providing essential information for Pulsar to discover and manage connectors.
7
8
```java { .api }
9
package org.apache.pulsar.io.core.annotations;
10
11
@Target(ElementType.TYPE)
12
@Retention(RetentionPolicy.RUNTIME)
13
@InterfaceAudience.Public
14
@InterfaceStability.Stable
15
public @interface Connector {
16
/**
17
* Name of the connector.
18
*
19
* @return connector name
20
*/
21
String name();
22
23
/**
24
* Type of the connector (SOURCE or SINK).
25
*
26
* @return connector type
27
*/
28
IOType type();
29
30
/**
31
* Description of what the connector does.
32
*
33
* @return connector help text
34
*/
35
String help();
36
37
/**
38
* Configuration class that defines the connector's configuration schema.
39
*
40
* @return configuration class
41
*/
42
Class configClass();
43
}
44
```
45
46
### Usage Example
47
48
```java
49
@Connector(
50
name = "file-source",
51
type = IOType.SOURCE,
52
help = "Reads data from files and publishes to Pulsar topics",
53
configClass = FileSourceConfig.class
54
)
55
public class FileSource implements Source<String> {
56
@Override
57
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
58
// Implementation
59
}
60
61
@Override
62
public Record<String> read() throws Exception {
63
// Implementation
64
return null;
65
}
66
67
@Override
68
public void close() throws Exception {
69
// Implementation
70
}
71
}
72
73
// Configuration class referenced by @Connector
74
public class FileSourceConfig {
75
@FieldDoc(
76
required = true,
77
help = "Path to the input file"
78
)
79
private String filePath;
80
81
@FieldDoc(
82
required = false,
83
defaultValue = "1000",
84
help = "Polling interval in milliseconds"
85
)
86
private int pollingIntervalMs = 1000;
87
88
// Getters and setters...
89
}
90
```
91
92
## @FieldDoc
93
94
Annotation for documenting configuration fields, providing metadata about field requirements, defaults, and descriptions.
95
96
```java { .api }
97
package org.apache.pulsar.io.core.annotations;
98
99
@Target(ElementType.FIELD)
100
@Retention(RetentionPolicy.RUNTIME)
101
@InterfaceAudience.Public
102
@InterfaceStability.Stable
103
public @interface FieldDoc {
104
/**
105
* Whether this field is required.
106
*
107
* @return true if field is required, false otherwise (default: false)
108
*/
109
boolean required() default false;
110
111
/**
112
* Default value description for the field.
113
*
114
* @return default value description
115
*/
116
String defaultValue();
117
118
/**
119
* Whether this field contains sensitive data.
120
* Sensitive fields may be handled differently for security purposes.
121
*
122
* @return true if field is sensitive, false otherwise (default: false)
123
*/
124
boolean sensitive() default false;
125
126
/**
127
* Help text describing what this field does.
128
*
129
* @return field description
130
*/
131
String help();
132
}
133
```
134
135
### Usage Example
136
137
```java
138
public class DatabaseSinkConfig {
139
@FieldDoc(
140
required = true,
141
help = "JDBC URL for database connection"
142
)
143
private String jdbcUrl;
144
145
@FieldDoc(
146
required = true,
147
sensitive = true,
148
help = "Database username"
149
)
150
private String username;
151
152
@FieldDoc(
153
required = true,
154
sensitive = true,
155
help = "Database password"
156
)
157
private String password;
158
159
@FieldDoc(
160
required = false,
161
defaultValue = "data_table",
162
help = "Name of the table to insert data into"
163
)
164
private String tableName = "data_table";
165
166
@FieldDoc(
167
required = false,
168
defaultValue = "100",
169
help = "Batch size for bulk inserts"
170
)
171
private int batchSize = 100;
172
173
@FieldDoc(
174
required = false,
175
defaultValue = "30000",
176
help = "Connection timeout in milliseconds"
177
)
178
private int connectionTimeoutMs = 30000;
179
180
// Getters and setters...
181
public String getJdbcUrl() { return jdbcUrl; }
182
public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; }
183
184
public String getUsername() { return username; }
185
public void setUsername(String username) { this.username = username; }
186
187
public String getPassword() { return password; }
188
public void setPassword(String password) { this.password = password; }
189
190
public String getTableName() { return tableName; }
191
public void setTableName(String tableName) { this.tableName = tableName; }
192
193
public int getBatchSize() { return batchSize; }
194
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
195
196
public int getConnectionTimeoutMs() { return connectionTimeoutMs; }
197
public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; }
198
}
199
```
200
201
## IOType
202
203
Enumeration of connector types used by the @Connector annotation.
204
205
```java { .api }
206
package org.apache.pulsar.io.core.annotations;
207
208
@InterfaceAudience.Public
209
@InterfaceStability.Stable
210
public enum IOType {
211
/**
212
* Source connector type - reads data from external systems into Pulsar.
213
*/
214
SOURCE,
215
216
/**
217
* Sink connector type - writes data from Pulsar to external systems.
218
*/
219
SINK
220
}
221
```
222
223
## Complete Sink Example with Annotations
224
225
```java
226
@Connector(
227
name = "elasticsearch-sink",
228
type = IOType.SINK,
229
help = "Writes data from Pulsar topics to Elasticsearch indices",
230
configClass = ElasticsearchSinkConfig.class
231
)
232
public class ElasticsearchSink implements Sink<Map<String, Object>> {
233
private ElasticsearchClient client;
234
private ElasticsearchSinkConfig config;
235
236
@Override
237
public void open(Map<String, Object> configMap, SinkContext sinkContext) throws Exception {
238
// Convert Map to strongly typed config object
239
this.config = ConfigurationUtils.create(configMap, ElasticsearchSinkConfig.class,
240
sinkContext.getSinkConfig().getConfigs());
241
242
// Initialize Elasticsearch client
243
this.client = ElasticsearchClient.builder()
244
.hosts(config.getElasticsearchUrl())
245
.username(config.getUsername())
246
.password(config.getPassword())
247
.connectTimeout(config.getConnectionTimeoutMs())
248
.build();
249
}
250
251
@Override
252
public void write(Record<Map<String, Object>> record) throws Exception {
253
Map<String, Object> document = record.getValue();
254
String indexName = config.getIndexName();
255
256
client.index(indexName, document);
257
}
258
259
@Override
260
public void close() throws Exception {
261
if (client != null) {
262
client.close();
263
}
264
}
265
}
266
267
public class ElasticsearchSinkConfig {
268
@FieldDoc(
269
required = true,
270
help = "Elasticsearch server URL (e.g., http://localhost:9200)"
271
)
272
private String elasticsearchUrl;
273
274
@FieldDoc(
275
required = false,
276
defaultValue = "pulsar-data",
277
help = "Name of the Elasticsearch index to write to"
278
)
279
private String indexName = "pulsar-data";
280
281
@FieldDoc(
282
required = false,
283
sensitive = true,
284
help = "Username for Elasticsearch authentication"
285
)
286
private String username;
287
288
@FieldDoc(
289
required = false,
290
sensitive = true,
291
help = "Password for Elasticsearch authentication"
292
)
293
private String password;
294
295
@FieldDoc(
296
required = false,
297
defaultValue = "30000",
298
help = "Connection timeout in milliseconds"
299
)
300
private int connectionTimeoutMs = 30000;
301
302
@FieldDoc(
303
required = false,
304
defaultValue = "100",
305
help = "Batch size for bulk operations"
306
)
307
private int batchSize = 100;
308
309
// Getters and setters...
310
}
311
```
312
313
## Connector Discovery Example
314
315
```java
316
// Utility class for discovering connectors using annotations
317
public class ConnectorDiscovery {
318
public static ConnectorMetadata getConnectorMetadata(Class<?> connectorClass) {
319
Connector connectorAnnotation = connectorClass.getAnnotation(Connector.class);
320
if (connectorAnnotation == null) {
321
throw new IllegalArgumentException("Class is not annotated with @Connector");
322
}
323
324
ConnectorMetadata metadata = new ConnectorMetadata();
325
metadata.setName(connectorAnnotation.name());
326
metadata.setType(connectorAnnotation.type());
327
metadata.setHelp(connectorAnnotation.help());
328
metadata.setConfigClass(connectorAnnotation.configClass());
329
330
// Discover configuration fields
331
Field[] fields = connectorAnnotation.configClass().getDeclaredFields();
332
for (Field field : fields) {
333
FieldDoc fieldDoc = field.getAnnotation(FieldDoc.class);
334
if (fieldDoc != null) {
335
ConfigFieldMetadata fieldMetadata = new ConfigFieldMetadata();
336
fieldMetadata.setName(field.getName());
337
fieldMetadata.setType(field.getType());
338
fieldMetadata.setRequired(fieldDoc.required());
339
fieldMetadata.setDefaultValue(fieldDoc.defaultValue());
340
fieldMetadata.setSensitive(fieldDoc.sensitive());
341
fieldMetadata.setHelp(fieldDoc.help());
342
343
metadata.addConfigField(fieldMetadata);
344
}
345
}
346
347
return metadata;
348
}
349
}
350
```
351
352
## Types
353
354
```java { .api }
355
// Required imports
356
import java.lang.annotation.ElementType;
357
import java.lang.annotation.Retention;
358
import java.lang.annotation.RetentionPolicy;
359
import java.lang.annotation.Target;
360
import org.apache.pulsar.common.classification.InterfaceAudience;
361
import org.apache.pulsar.common.classification.InterfaceStability;
362
```