0
# Reactive Programming
1
2
Micronaut provides comprehensive reactive programming support built on Reactive Streams specification. The framework's reactive APIs enable non-blocking I/O operations, backpressure handling, and scalable application development.
3
4
## Capabilities
5
6
### Reactive HTTP
7
8
HTTP controllers can return reactive types for non-blocking request processing.
9
10
```java { .api }
11
/**
12
* Reactive HTTP controller
13
*/
14
@Controller("/stream")
15
public class StreamController {
16
17
@Get(value = "/events", produces = MediaType.TEXT_EVENT_STREAM)
18
Publisher<String> events() {
19
return Flowable.interval(1, TimeUnit.SECONDS)
20
.map(i -> "Event " + i);
21
}
22
23
@Get("/data")
24
Single<ResponseData> getData() {
25
return dataService.loadAsync();
26
}
27
28
@Post("/process")
29
Completable processData(@Body DataRequest request) {
30
return processingService.processAsync(request);
31
}
32
}
33
```
34
35
### Reactive Streams Types
36
37
Support for various reactive types from RxJava and other reactive libraries.
38
39
```java { .api }
40
/**
41
* Publisher - 0 to N items
42
*/
43
Publisher<T> publisher();
44
45
/**
46
* Single - exactly one item
47
*/
48
Single<T> single();
49
50
/**
51
* Maybe - 0 or 1 item
52
*/
53
Maybe<T> maybe();
54
55
/**
56
* Completable - no items, completion signal only
57
*/
58
Completable completable();
59
60
/**
61
* Flowable - backpressure-aware publisher
62
*/
63
Flowable<T> flowable();
64
65
/**
66
* Observable - non-backpressure-aware
67
*/
68
Observable<T> observable();
69
```
70
71
### Reactive Client
72
73
HTTP clients provide reactive APIs for non-blocking external service calls.
74
75
```java { .api }
76
/**
77
* Reactive HTTP client operations
78
*/
79
@Client("/api")
80
public interface ReactiveClient {
81
82
@Get("/users/{id}")
83
Single<User> getUser(Long id);
84
85
@Get("/users")
86
Flowable<User> getUsers();
87
88
@Post("/users")
89
Single<User> createUser(@Body User user);
90
91
@Delete("/users/{id}")
92
Completable deleteUser(Long id);
93
}
94
```
95
96
### Server-Sent Events
97
98
Support for streaming responses using Server-Sent Events.
99
100
```java { .api }
101
/**
102
* Server-Sent Events controller
103
*/
104
@Controller("/sse")
105
public class EventController {
106
107
@Get(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM)
108
Publisher<Event> streamEvents() {
109
return Flowable.interval(1, TimeUnit.SECONDS)
110
.map(i -> Event.of("Event " + i))
111
.doOnNext(event -> event.id(String.valueOf(i)));
112
}
113
}
114
115
/**
116
* Event wrapper for SSE
117
*/
118
public final class Event {
119
public static Event of(Object data);
120
public Event id(String id);
121
public Event name(String name);
122
public Event retry(Duration retry);
123
public Event comment(String comment);
124
}
125
```
126
127
### Reactive Database Operations
128
129
Database operations can return reactive types for non-blocking data access.
130
131
```java { .api }
132
/**
133
* Reactive repository interface
134
*/
135
@Repository
136
public interface ReactiveUserRepository {
137
138
Single<User> save(User user);
139
140
Maybe<User> findById(Long id);
141
142
Flowable<User> findAll();
143
144
Completable deleteById(Long id);
145
146
Single<Long> count();
147
}
148
```
149
150
## Types
151
152
```java { .api }
153
// Core reactive types
154
public interface Publisher<T> {
155
void subscribe(Subscriber<? super T> s);
156
}
157
158
public interface Subscriber<T> {
159
void onSubscribe(Subscription s);
160
void onNext(T t);
161
void onError(Throwable t);
162
void onComplete();
163
}
164
165
public interface Subscription {
166
void request(long n);
167
void cancel();
168
}
169
170
// Event for Server-Sent Events
171
public final class Event {
172
public static Event of(Object data);
173
public Event id(String id);
174
public Event name(String name);
175
public Event retry(Duration retry);
176
public Event comment(String comment);
177
}
178
```