0
# Message Publishing (Sink)
1
2
The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery semantics, automatic batching, and configurable retry policies.
3
4
## Capabilities
5
6
### PubSubSink Builder
7
8
Creates a new PubSubSink using the builder pattern. The builder enforces required parameters through type-safe interfaces.
9
10
```java { .api }
11
/**
12
* Creates a new builder for PubSubSink
13
* @return SerializationSchemaBuilder for setting serialization schema
14
*/
15
public static SerializationSchemaBuilder newBuilder();
16
```
17
18
### Serialization Schema Configuration
19
20
Configure how messages are serialized for Pub/Sub.
21
22
```java { .api }
23
public static class SerializationSchemaBuilder {
24
/**
25
* Set serialization schema for converting objects to byte arrays
26
* @param serializationSchema Schema for serializing message data
27
* @return ProjectNameBuilder for next configuration step
28
*/
29
public <IN> ProjectNameBuilder<IN> withSerializationSchema(
30
SerializationSchema<IN> serializationSchema);
31
}
32
```
33
34
### Project and Topic Configuration
35
36
Configure the GCP project and Pub/Sub topic.
37
38
```java { .api }
39
public interface ProjectNameBuilder<IN> {
40
/**
41
* Set the GCP project name containing the topic
42
* @param projectName GCP project name
43
* @return TopicNameBuilder for next configuration step
44
*/
45
TopicNameBuilder<IN> withProjectName(String projectName);
46
}
47
48
public interface TopicNameBuilder<IN> {
49
/**
50
* Set the Pub/Sub topic name to publish to
51
* @param topicName Topic name
52
* @return PubSubSinkBuilder for additional configuration
53
*/
54
PubSubSinkBuilder<IN> withTopicName(String topicName);
55
}
56
```
57
58
### Sink Builder Configuration
59
60
Main builder class for configuring optional parameters.
61
62
```java { .api }
63
public static class PubSubSinkBuilder<IN> {
64
/**
65
* Set authentication credentials (optional - uses default credentials if not set)
66
* @param credentials Google Cloud credentials
67
* @return Current builder instance
68
*/
69
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
70
71
/**
72
* Set custom hostname/port for Pub/Sub emulator (testing only)
73
* @param hostAndPort Host and port combination ("hostname:1234")
74
* @return Current builder instance
75
*/
76
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
77
78
/**
79
* Build the configured PubSubSink
80
* @return Configured PubSubSink instance
81
* @throws IOException If credentials cannot be obtained
82
* @throws IllegalArgumentException If required fields are missing
83
*/
84
public PubSubSink<IN> build() throws IOException;
85
}
86
```
87
88
### Core Sink Methods
89
90
Key methods of the PubSubSink class.
91
92
```java { .api }
93
public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
94
95
/**
96
* Process and publish a message to Pub/Sub
97
* @param message Message to publish
98
* @param context Sink context (provides processing time, etc.)
99
*/
100
public void invoke(IN message, SinkFunction.Context context);
101
102
/**
103
* Called during checkpointing - ensures all pending messages are published
104
* @param context Checkpoint context
105
* @throws Exception If publishing fails
106
*/
107
public void snapshotState(FunctionSnapshotContext context) throws Exception;
108
109
/**
110
* Initialize state - called once per subtask
111
* @param context Initialization context
112
*/
113
public void initializeState(FunctionInitializationContext context);
114
}
115
```
116
117
## Usage Examples
118
119
### Basic String Publishing
120
121
```java
122
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
123
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
124
import org.apache.flink.api.common.serialization.SimpleStringSchema;
125
126
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
127
128
PubSubSink<String> sink = PubSubSink.newBuilder()
129
.withSerializationSchema(new SimpleStringSchema())
130
.withProjectName("my-gcp-project")
131
.withTopicName("my-topic")
132
.build();
133
134
env.fromElements("Hello", "World", "Pub/Sub")
135
.addSink(sink);
136
137
env.execute("Basic Pub/Sub Producer");
138
```
139
140
### JSON Object Publishing
141
142
```java
143
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
144
import org.apache.flink.api.common.serialization.SerializationSchema;
145
146
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
147
private ObjectMapper mapper = new ObjectMapper();
148
149
@Override
150
public byte[] serialize(T element) {
151
try {
152
return mapper.writeValueAsBytes(element);
153
} catch (Exception e) {
154
throw new RuntimeException("Failed to serialize object", e);
155
}
156
}
157
}
158
159
public class MyEvent {
160
public String eventType;
161
public long timestamp;
162
public String userId;
163
// ... other fields
164
}
165
166
PubSubSink<MyEvent> sink = PubSubSink.newBuilder()
167
.withSerializationSchema(new JsonSerializationSchema<MyEvent>())
168
.withProjectName("my-gcp-project")
169
.withTopicName("events")
170
.build();
171
172
DataStream<MyEvent> events = env.addSource(/* some source */);
173
events.addSink(sink);
174
```
175
176
### Advanced Configuration with Custom Credentials
177
178
```java
179
import com.google.auth.oauth2.ServiceAccountCredentials;
180
import java.io.FileInputStream;
181
182
// Load service account credentials
183
Credentials credentials = ServiceAccountCredentials
184
.fromStream(new FileInputStream("path/to/service-account.json"));
185
186
PubSubSink<String> sink = PubSubSink.newBuilder()
187
.withSerializationSchema(new SimpleStringSchema())
188
.withProjectName("my-gcp-project")
189
.withTopicName("my-topic")
190
.withCredentials(credentials)
191
.build();
192
193
env.addSource(/* some source */)
194
.addSink(sink);
195
```
196
197
### Using with Pub/Sub Emulator for Testing
198
199
```java
200
// Start Pub/Sub emulator first:
201
// gcloud beta emulators pubsub start --host-port=localhost:8085
202
203
PubSubSink<String> sink = PubSubSink.newBuilder()
204
.withSerializationSchema(new SimpleStringSchema())
205
.withProjectName("test-project")
206
.withTopicName("test-topic")
207
.withHostAndPortForEmulator("localhost:8085")
208
.build();
209
210
env.fromElements("test-message-1", "test-message-2")
211
.addSink(sink);
212
```
213
214
### Publishing with Custom Message Attributes
215
216
For advanced use cases requiring message attributes, you would need to implement a custom serialization approach or use the Pub/Sub client library directly within a custom sink function:
217
218
```java
219
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
220
import com.google.cloud.pubsub.v1.Publisher;
221
import com.google.pubsub.v1.PubsubMessage;
222
import com.google.protobuf.ByteString;
223
224
public class CustomPubSubSink extends RichSinkFunction<MyEventWithAttributes> {
225
private transient Publisher publisher;
226
227
@Override
228
public void open(Configuration parameters) throws Exception {
229
// Initialize publisher
230
publisher = Publisher.newBuilder(TopicName.of("project", "topic")).build();
231
}
232
233
@Override
234
public void invoke(MyEventWithAttributes event, Context context) throws Exception {
235
PubsubMessage message = PubsubMessage.newBuilder()
236
.setData(ByteString.copyFromUtf8(event.getData()))
237
.putAttributes("eventType", event.getEventType())
238
.putAttributes("source", event.getSource())
239
.build();
240
241
publisher.publish(message);
242
}
243
244
@Override
245
public void close() throws Exception {
246
if (publisher != null) {
247
publisher.shutdown();
248
}
249
}
250
}
251
```
252
253
## Error Handling and Reliability
254
255
### At-Least-Once Delivery
256
257
The PubSubSink provides at-least-once delivery guarantees. Messages may be delivered multiple times in case of failures, but no messages are lost.
258
259
### Automatic Retries
260
261
The sink automatically retries failed publish operations according to the Google Cloud Pub/Sub client's default retry policy:
262
- Maximum attempts: Based on gRPC client configuration
263
- Exponential backoff with jitter
264
- Configurable through publisher settings
265
266
### Checkpoint Integration
267
268
The sink integrates with Flink's checkpointing mechanism:
269
- All outstanding publish requests are completed before checkpoint completion
270
- Failed publish operations will fail the checkpoint
271
- Provides durability guarantees in combination with Flink's state management
272
273
### Back-pressure Handling
274
275
The sink handles back-pressure scenarios:
276
- Blocks on publish when Pub/Sub service is unavailable
277
- Respects Flink's back-pressure mechanisms
278
- Provides flow control to prevent memory issues
279
280
## Performance Considerations
281
282
### Batching
283
284
The underlying Google Cloud Pub/Sub publisher automatically batches messages for optimal throughput while respecting latency requirements.
285
286
### Publisher Settings
287
288
For high-throughput scenarios, consider tuning the publisher settings by implementing a custom sink based on the PubSubSink pattern with explicit publisher configuration.
289
290
### Resource Management
291
292
- Each sink subtask creates its own Publisher instance
293
- Publishers are properly shut down during sink lifecycle management
294
- gRPC channels are managed automatically by the Google Cloud client library
295
296
## Important Notes
297
298
- **Message Ordering**: Pub/Sub does not guarantee message ordering by default. Use message ordering keys if ordering is required.
299
300
- **Message Size Limits**: Pub/Sub has a maximum message size limit (10 MB). Ensure your serialized messages are within this limit.
301
302
- **Topic Creation**: Topics must exist before publishing. The sink does not create topics automatically.
303
304
- **Authentication**: Uses Google Cloud default authentication if no explicit credentials are provided. Ensure proper service account configuration in production environments.