0
# Subscription System
1
2
Event-driven communication system with multiple subscription types for real-time data monitoring in Apache PLC4X Java API.
3
4
## Capabilities
5
6
### PlcSubscriptionRequest
7
8
Interface for building and executing subscription requests for real-time PLC data monitoring.
9
10
```java { .api }
11
/**
12
* Subscription request interface for setting up real-time data monitoring
13
*/
14
public interface PlcSubscriptionRequest extends PlcSubscriptionTagRequest {
15
/**
16
* Execute the subscription request asynchronously
17
* @return CompletableFuture containing the subscription response
18
*/
19
CompletableFuture<? extends PlcSubscriptionResponse> execute();
20
21
/**
22
* Builder interface for constructing subscription requests
23
*/
24
interface Builder extends PlcRequestBuilder {
25
/**
26
* Set global consumer for all subscription events
27
* @param consumer Consumer function to handle PlcSubscriptionEvent
28
* @return Builder instance for method chaining
29
*/
30
Builder setConsumer(Consumer<PlcSubscriptionEvent> consumer);
31
32
/**
33
* Add cyclic subscription by tag address
34
* @param name Logical name for the subscription
35
* @param tagAddress PLC-specific tag address string
36
* @param duration Polling interval
37
* @return Builder instance for method chaining
38
*/
39
Builder addCyclicTagAddress(String name, String tagAddress, Duration duration);
40
41
/**
42
* Add cyclic subscription by tag address with specific consumer
43
* @param name Logical name for the subscription
44
* @param tagAddress PLC-specific tag address string
45
* @param duration Polling interval
46
* @param consumer Consumer function for this specific subscription
47
* @return Builder instance for method chaining
48
*/
49
Builder addCyclicTagAddress(String name, String tagAddress, Duration duration, Consumer<PlcSubscriptionEvent> consumer);
50
51
/**
52
* Add cyclic subscription with pre-parsed tag
53
* @param name Logical name for the subscription
54
* @param tag PlcSubscriptionTag instance
55
* @param duration Polling interval
56
* @return Builder instance for method chaining
57
*/
58
Builder addCyclicTag(String name, PlcSubscriptionTag tag, Duration duration);
59
60
/**
61
* Add cyclic subscription with pre-parsed tag and specific consumer
62
* @param name Logical name for the subscription
63
* @param tag PlcSubscriptionTag instance
64
* @param duration Polling interval
65
* @param consumer Consumer function for this specific subscription
66
* @return Builder instance for method chaining
67
*/
68
Builder addCyclicTag(String name, PlcSubscriptionTag tag, Duration duration, Consumer<PlcSubscriptionEvent> consumer);
69
70
/**
71
* Add change-of-state subscription by tag address
72
* @param name Logical name for the subscription
73
* @param tagAddress PLC-specific tag address string
74
* @return Builder instance for method chaining
75
*/
76
Builder addChangeOfStateTagAddress(String name, String tagAddress);
77
78
/**
79
* Add change-of-state subscription by tag address with specific consumer
80
* @param name Logical name for the subscription
81
* @param tagAddress PLC-specific tag address string
82
* @param consumer Consumer function for this specific subscription
83
* @return Builder instance for method chaining
84
*/
85
Builder addChangeOfStateTagAddress(String name, String tagAddress, Consumer<PlcSubscriptionEvent> consumer);
86
87
/**
88
* Add change-of-state subscription with pre-parsed tag
89
* @param name Logical name for the subscription
90
* @param tag PlcSubscriptionTag instance
91
* @return Builder instance for method chaining
92
*/
93
Builder addChangeOfStateTag(String name, PlcSubscriptionTag tag);
94
95
/**
96
* Add change-of-state subscription with pre-parsed tag and specific consumer
97
* @param name Logical name for the subscription
98
* @param tag PlcSubscriptionTag instance
99
* @param consumer Consumer function for this specific subscription
100
* @return Builder instance for method chaining
101
*/
102
Builder addChangeOfStateTag(String name, PlcSubscriptionTag tag, Consumer<PlcSubscriptionEvent> consumer);
103
104
/**
105
* Add event subscription by tag address
106
* @param name Logical name for the subscription
107
* @param tagAddress PLC-specific tag address string
108
* @return Builder instance for method chaining
109
*/
110
Builder addEventTagAddress(String name, String tagAddress);
111
112
/**
113
* Add event subscription by tag address with specific consumer
114
* @param name Logical name for the subscription
115
* @param tagAddress PLC-specific tag address string
116
* @param consumer Consumer function for this specific subscription
117
* @return Builder instance for method chaining
118
*/
119
Builder addEventTagAddress(String name, String tagAddress, Consumer<PlcSubscriptionEvent> consumer);
120
121
/**
122
* Add event subscription with pre-parsed tag
123
* @param name Logical name for the subscription
124
* @param tag PlcSubscriptionTag instance
125
* @return Builder instance for method chaining
126
*/
127
Builder addEventTag(String name, PlcSubscriptionTag tag);
128
129
/**
130
* Add event subscription with pre-parsed tag and specific consumer
131
* @param name Logical name for the subscription
132
* @param tag PlcSubscriptionTag instance
133
* @param consumer Consumer function for this specific subscription
134
* @return Builder instance for method chaining
135
*/
136
Builder addEventTag(String name, PlcSubscriptionTag tag, Consumer<PlcSubscriptionEvent> consumer);
137
138
/**
139
* Build the subscription request
140
* @return PlcSubscriptionRequest instance ready for execution
141
*/
142
PlcSubscriptionRequest build();
143
}
144
}
145
```
146
147
### PlcSubscriptionResponse
148
149
Interface for accessing subscription response and managing active subscriptions.
150
151
```java { .api }
152
/**
153
* Subscription response interface providing handles for managing active subscriptions
154
*/
155
public interface PlcSubscriptionResponse extends PlcSubscriptionTagResponse {
156
/**
157
* Get the originating subscription request
158
* @return PlcSubscriptionRequest that generated this response
159
*/
160
PlcSubscriptionRequest getRequest();
161
162
/**
163
* Get subscription handle by name for managing the subscription
164
* @param name Subscription name from request
165
* @return PlcSubscriptionHandle for managing the subscription
166
*/
167
PlcSubscriptionHandle getSubscriptionHandle(String name);
168
169
/**
170
* Get all subscription handles
171
* @return Collection of all PlcSubscriptionHandle instances
172
*/
173
Collection<PlcSubscriptionHandle> getSubscriptionHandles();
174
}
175
```
176
177
### PlcSubscriptionEvent
178
179
Interface representing subscription events with timestamp information.
180
181
```java { .api }
182
/**
183
* Subscription event interface extending PlcReadResponse with timestamp
184
*/
185
public interface PlcSubscriptionEvent extends PlcReadResponse {
186
/**
187
* Get the timestamp when the event occurred
188
* @return Instant representing the event timestamp
189
*/
190
Instant getTimestamp();
191
}
192
```
193
194
### PlcSubscriptionHandle
195
196
Interface for managing active subscriptions.
197
198
```java { .api }
199
/**
200
* Handle for managing active subscriptions
201
*/
202
public interface PlcSubscriptionHandle {
203
/**
204
* Register an additional consumer for this subscription
205
* @param consumer Consumer function to handle subscription events
206
* @return PlcConsumerRegistration for managing the consumer
207
*/
208
PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer);
209
}
210
```
211
212
### PlcUnsubscriptionRequest
213
214
Interface for canceling active subscriptions.
215
216
```java { .api }
217
/**
218
* Unsubscription request interface for canceling active subscriptions
219
*/
220
public interface PlcUnsubscriptionRequest extends PlcRequest {
221
/**
222
* Execute the unsubscription request asynchronously
223
* @return CompletableFuture containing the unsubscription response
224
*/
225
CompletableFuture<? extends PlcUnsubscriptionResponse> execute();
226
227
/**
228
* Builder interface for constructing unsubscription requests
229
*/
230
interface Builder extends PlcRequestBuilder {
231
/**
232
* Add subscription handle to unsubscribe
233
* @param subscriptionHandle Handle from PlcSubscriptionResponse
234
* @return Builder instance for method chaining
235
*/
236
Builder addHandles(PlcSubscriptionHandle... subscriptionHandle);
237
238
/**
239
* Build the unsubscription request
240
* @return PlcUnsubscriptionRequest instance ready for execution
241
*/
242
PlcUnsubscriptionRequest build();
243
}
244
}
245
```
246
247
### PlcUnsubscriptionResponse
248
249
Interface for unsubscription response.
250
251
```java { .api }
252
/**
253
* Unsubscription response interface
254
*/
255
public interface PlcUnsubscriptionResponse extends PlcResponse {
256
PlcUnsubscriptionRequest getRequest();
257
}
258
```
259
260
**Usage Examples:**
261
262
```java
263
import org.apache.plc4x.java.DefaultPlcDriverManager;
264
import org.apache.plc4x.java.api.PlcConnection;
265
import org.apache.plc4x.java.api.messages.*;
266
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
267
import org.apache.plc4x.java.api.types.PlcResponseCode;
268
import java.time.Duration;
269
import java.util.function.Consumer;
270
271
// Cyclic subscription with global consumer
272
PlcDriverManager driverManager = new DefaultPlcDriverManager();
273
try (PlcConnection connection = driverManager.getConnection("modbus-tcp://192.168.1.100:502")) {
274
connection.connect();
275
276
// Global event consumer
277
Consumer<PlcSubscriptionEvent> globalConsumer = event -> {
278
System.out.println("Event at " + event.getTimestamp());
279
for (String tagName : event.getTagNames()) {
280
if (event.getResponseCode(tagName) == PlcResponseCode.OK) {
281
System.out.println(tagName + ": " + event.getInteger(tagName));
282
}
283
}
284
};
285
286
// Create cyclic subscription (poll every 1 second)
287
PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
288
.setConsumer(globalConsumer)
289
.addCyclicTagAddress("temperature", "holding-register:1", Duration.ofSeconds(1))
290
.addCyclicTagAddress("pressure", "holding-register:2", Duration.ofSeconds(1))
291
.build();
292
293
PlcSubscriptionResponse response = subscriptionRequest.execute().get();
294
295
// Keep subscription active
296
Thread.sleep(30000); // Monitor for 30 seconds
297
298
// Clean up subscriptions
299
PlcUnsubscriptionRequest unsubRequest = connection.unsubscriptionRequestBuilder()
300
.addHandles(response.getSubscriptionHandles().toArray(new PlcSubscriptionHandle[0]))
301
.build();
302
303
unsubRequest.execute().get();
304
}
305
306
// Individual consumer per subscription
307
try (PlcConnection connection = driverManager.getConnection("s7://192.168.1.200/0/1")) {
308
connection.connect();
309
310
// Individual consumers for different data types
311
Consumer<PlcSubscriptionEvent> temperatureConsumer = event -> {
312
if (event.getResponseCode("temperature") == PlcResponseCode.OK) {
313
float temp = event.getFloat("temperature");
314
System.out.println("Temperature: " + temp + "°C");
315
316
// Temperature-specific logic
317
if (temp > 80.0f) {
318
System.out.println("HIGH TEMPERATURE ALERT!");
319
}
320
}
321
};
322
323
Consumer<PlcSubscriptionEvent> alarmConsumer = event -> {
324
if (event.getResponseCode("alarm") == PlcResponseCode.OK) {
325
boolean alarm = event.getBoolean("alarm");
326
if (alarm) {
327
System.err.println("ALARM TRIGGERED at " + event.getTimestamp());
328
}
329
}
330
};
331
332
PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
333
.addCyclicTagAddress("temperature", "DB1.DBD0:REAL", Duration.ofSeconds(2), temperatureConsumer)
334
.addChangeOfStateTagAddress("alarm", "DB1.DBX10.0:BOOL", alarmConsumer)
335
.build();
336
337
PlcSubscriptionResponse response = subscriptionRequest.execute().get();
338
339
// Monitor subscriptions
340
Thread.sleep(60000); // Monitor for 1 minute
341
}
342
343
// Change-of-state subscription
344
try (PlcConnection connection = driverManager.getConnection("modbus-tcp://192.168.1.100:502")) {
345
connection.connect();
346
347
Consumer<PlcSubscriptionEvent> statusConsumer = event -> {
348
System.out.println("Status changed at " + event.getTimestamp());
349
for (String tagName : event.getTagNames()) {
350
if (event.getResponseCode(tagName) == PlcResponseCode.OK) {
351
boolean status = event.getBoolean(tagName);
352
System.out.println(tagName + " is now " + (status ? "ON" : "OFF"));
353
}
354
}
355
};
356
357
PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
358
.addChangeOfStateTagAddress("motor1", "coil:1", statusConsumer)
359
.addChangeOfStateTagAddress("motor2", "coil:2", statusConsumer)
360
.addChangeOfStateTagAddress("valve1", "coil:10", statusConsumer)
361
.build();
362
363
PlcSubscriptionResponse response = subscriptionRequest.execute().get();
364
365
// Keep monitoring
366
Thread.sleep(120000); // Monitor for 2 minutes
367
}
368
369
// Event subscription for alarm conditions
370
try (PlcConnection connection = driverManager.getConnection("s7://192.168.1.200/0/1")) {
371
connection.connect();
372
373
Consumer<PlcSubscriptionEvent> eventConsumer = event -> {
374
System.out.println("PLC Event occurred at " + event.getTimestamp());
375
376
// Handle event data
377
for (String tagName : event.getTagNames()) {
378
PlcResponseCode code = event.getResponseCode(tagName);
379
if (code == PlcResponseCode.OK) {
380
// Process event based on tag name
381
switch (tagName) {
382
case "emergency_stop":
383
boolean emergencyStop = event.getBoolean(tagName);
384
if (emergencyStop) {
385
System.err.println("EMERGENCY STOP ACTIVATED!");
386
}
387
break;
388
case "high_temperature":
389
boolean highTemp = event.getBoolean(tagName);
390
if (highTemp) {
391
System.err.println("HIGH TEMPERATURE EVENT!");
392
}
393
break;
394
case "low_pressure":
395
boolean lowPressure = event.getBoolean(tagName);
396
if (lowPressure) {
397
System.err.println("LOW PRESSURE EVENT!");
398
}
399
break;
400
}
401
}
402
}
403
};
404
405
PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
406
.addEventTagAddress("emergency_stop", "DB100.DBX0.0:BOOL", eventConsumer)
407
.addEventTagAddress("high_temperature", "DB100.DBX0.1:BOOL", eventConsumer)
408
.addEventTagAddress("low_pressure", "DB100.DBX0.2:BOOL", eventConsumer)
409
.build();
410
411
PlcSubscriptionResponse response = subscriptionRequest.execute().get();
412
413
// Keep event monitoring active
414
Thread.sleep(300000); // Monitor for 5 minutes
415
}
416
417
// Dynamic subscription management
418
try (PlcConnection connection = driverManager.getConnection("modbus-tcp://192.168.1.100:502")) {
419
connection.connect();
420
421
Consumer<PlcSubscriptionEvent> dynamicConsumer = event -> {
422
System.out.println("Dynamic event: " + event.getTimestamp());
423
// Process event...
424
};
425
426
// Initial subscription
427
PlcSubscriptionRequest initialRequest = connection.subscriptionRequestBuilder()
428
.addCyclicTagAddress("sensor1", "holding-register:1", Duration.ofSeconds(1), dynamicConsumer)
429
.build();
430
431
PlcSubscriptionResponse initialResponse = initialRequest.execute().get();
432
433
// Add more consumers to existing subscription
434
PlcSubscriptionHandle sensor1Handle = initialResponse.getSubscriptionHandle("sensor1");
435
PlcConsumerRegistration additionalConsumer = sensor1Handle.register(event -> {
436
System.out.println("Additional processing for sensor1");
437
});
438
439
Thread.sleep(10000); // Monitor with additional consumer
440
441
// Add new subscription dynamically
442
PlcSubscriptionRequest additionalRequest = connection.subscriptionRequestBuilder()
443
.addCyclicTagAddress("sensor2", "holding-register:2", Duration.ofSeconds(2), dynamicConsumer)
444
.build();
445
446
PlcSubscriptionResponse additionalResponse = additionalRequest.execute().get();
447
448
Thread.sleep(20000); // Monitor both subscriptions
449
450
// Clean up all subscriptions
451
PlcUnsubscriptionRequest cleanup = connection.unsubscriptionRequestBuilder()
452
.addHandles(initialResponse.getSubscriptionHandles().toArray(new PlcSubscriptionHandle[0]))
453
.addHandles(additionalResponse.getSubscriptionHandles().toArray(new PlcSubscriptionHandle[0]))
454
.build();
455
456
cleanup.execute().get();
457
System.out.println("All subscriptions cleaned up");
458
}
459
```
460
461
## Types
462
463
### Subscription Types
464
465
```java { .api }
466
public enum PlcSubscriptionType {
467
/**
468
* Cyclic subscription - polls at regular intervals
469
*/
470
CYCLIC,
471
472
/**
473
* Change-of-state subscription - triggers when value changes
474
*/
475
CHANGE_OF_STATE,
476
477
/**
478
* Event subscription - triggers on PLC events
479
*/
480
EVENT
481
}
482
```
483
484
### Model Types
485
486
```java { .api }
487
public interface PlcSubscriptionTag extends PlcTag {
488
// Extends PlcTag with subscription-specific behavior
489
}
490
491
public interface PlcConsumerRegistration {
492
// Handle for managing consumer registration lifecycle
493
}
494
```
495
496
### Base Subscription Types
497
498
```java { .api }
499
public interface PlcSubscriptionTagRequest extends PlcRequest {
500
CompletableFuture<? extends PlcSubscriptionTagResponse> execute();
501
}
502
503
public interface PlcSubscriptionTagResponse extends PlcResponse {
504
PlcSubscriptionTagRequest getRequest();
505
}
506
```