0
# Message Consumption (Source)
1
2
The PubSubSource provides high-performance message consumption from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees through Flink's checkpointing mechanism.
3
4
## Capabilities
5
6
### PubSubSource Builder
7
8
Creates a new PubSubSource using the builder pattern. The builder enforces required parameters through type-safe interfaces.
9
10
```java { .api }
11
/**
12
* Creates a new builder for PubSubSource
13
* @return DeserializationSchemaBuilder for setting deserialization schema
14
*/
15
public static DeserializationSchemaBuilder newBuilder();
16
```
17
18
### Deserialization Schema Configuration
19
20
Configure how messages are deserialized from Pub/Sub.
21
22
```java { .api }
23
public static class DeserializationSchemaBuilder {
24
/**
25
* Set standard Flink DeserializationSchema for message data only
26
* @param deserializationSchema Schema for deserializing message data
27
* @return ProjectNameBuilder for next configuration step
28
*/
29
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
30
DeserializationSchema<OUT> deserializationSchema);
31
32
/**
33
* Set PubSub-specific deserialization schema with metadata access
34
* @param deserializationSchema Schema with access to full PubsubMessage
35
* @return ProjectNameBuilder for next configuration step
36
*/
37
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
38
PubSubDeserializationSchema<OUT> deserializationSchema);
39
}
40
```
41
42
### Project and Subscription Configuration
43
44
Configure the GCP project and Pub/Sub subscription.
45
46
```java { .api }
47
public interface ProjectNameBuilder<OUT> {
48
/**
49
* Set the GCP project name containing the subscription
50
* @param projectName GCP project name
51
* @return SubscriptionNameBuilder for next configuration step
52
*/
53
SubscriptionNameBuilder<OUT> withProjectName(String projectName);
54
}
55
56
public interface SubscriptionNameBuilder<OUT> {
57
/**
58
* Set the Pub/Sub subscription name to consume from
59
* @param subscriptionName Subscription name
60
* @return PubSubSourceBuilder for additional configuration
61
*/
62
PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
63
}
64
```
65
66
### Source Builder Configuration
67
68
Main builder class for configuring optional parameters.
69
70
```java { .api }
71
public static class PubSubSourceBuilder<OUT> {
72
/**
73
* Set authentication credentials (optional - uses default credentials if not set)
74
* @param credentials Google Cloud credentials
75
* @return Current builder instance
76
*/
77
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
78
79
/**
80
* Set custom subscriber factory for advanced configuration
81
* @param pubSubSubscriberFactory Custom factory implementation
82
* @return Current builder instance
83
*/
84
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
85
PubSubSubscriberFactory pubSubSubscriberFactory);
86
87
/**
88
* Configure default subscriber factory with custom parameters
89
* @param maxMessagesPerPull Maximum messages per pull request (default: 100)
90
* @param perRequestTimeout Timeout per request (default: 15 seconds)
91
* @param retries Number of retries on failure (default: 3)
92
* @return Current builder instance
93
*/
94
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
95
int maxMessagesPerPull, Duration perRequestTimeout, int retries);
96
97
/**
98
* Set message rate limit per parallel instance (default: 100000 messages/second)
99
* @param messagePerSecondRateLimit Rate limit per parallel subtask
100
* @return Current builder instance
101
*/
102
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
103
104
/**
105
* Build the configured PubSubSource
106
* @return Configured PubSubSource instance
107
* @throws IOException If credentials cannot be obtained
108
* @throws IllegalArgumentException If required fields are missing
109
*/
110
public PubSubSource<OUT> build() throws IOException;
111
}
112
```
113
114
### Core Source Methods
115
116
Key methods of the PubSubSource class.
117
118
```java { .api }
119
public class PubSubSource<OUT> extends RichSourceFunction<OUT>
120
implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>,
121
CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
122
123
/**
124
* Get type information for elements produced by this source
125
* @return TypeInformation for output elements
126
*/
127
public TypeInformation<OUT> getProducedType();
128
129
/**
130
* Called when checkpoint completes - acknowledges messages
131
* @param checkpointId Completed checkpoint ID
132
*/
133
public void notifyCheckpointComplete(long checkpointId) throws Exception;
134
135
/**
136
* Cancel the source function
137
*/
138
public void cancel();
139
}
140
```
141
142
## Usage Examples
143
144
### Basic String Consumption
145
146
```java
147
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
148
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
149
import org.apache.flink.api.common.serialization.SimpleStringSchema;
150
151
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
152
env.enableCheckpointing(30000); // Required for exactly-once processing
153
154
PubSubSource<String> source = PubSubSource.newBuilder()
155
.withDeserializationSchema(new SimpleStringSchema())
156
.withProjectName("my-gcp-project")
157
.withSubscriptionName("my-subscription")
158
.build();
159
160
env.addSource(source)
161
.print();
162
163
env.execute("Basic Pub/Sub Consumer");
164
```
165
166
### JSON Message Consumption with Custom Schema
167
168
```java
169
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
170
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
171
172
public class JsonDeserializationSchema extends AbstractDeserializationSchema<MyObject> {
173
private ObjectMapper mapper = new ObjectMapper();
174
175
@Override
176
public MyObject deserialize(byte[] message) throws IOException {
177
return mapper.readValue(message, MyObject.class);
178
}
179
}
180
181
PubSubSource<MyObject> source = PubSubSource.newBuilder()
182
.withDeserializationSchema(new JsonDeserializationSchema())
183
.withProjectName("my-gcp-project")
184
.withSubscriptionName("json-messages")
185
.build();
186
```
187
188
### Advanced Configuration with Custom Credentials
189
190
```java
191
import com.google.auth.oauth2.ServiceAccountCredentials;
192
import java.io.FileInputStream;
193
import java.time.Duration;
194
195
// Load service account credentials
196
Credentials credentials = ServiceAccountCredentials
197
.fromStream(new FileInputStream("path/to/service-account.json"));
198
199
PubSubSource<String> source = PubSubSource.newBuilder()
200
.withDeserializationSchema(new SimpleStringSchema())
201
.withProjectName("my-gcp-project")
202
.withSubscriptionName("my-subscription")
203
.withCredentials(credentials)
204
.withPubSubSubscriberFactory(200, Duration.ofSeconds(30), 5) // Custom timeouts
205
.withMessageRateLimit(50000) // Limit to 50k messages/second per subtask
206
.build();
207
```
208
209
### Message Consumption with Metadata Access
210
211
```java
212
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
213
import org.apache.flink.api.common.typeinfo.TypeInformation;
214
import org.apache.flink.api.common.typeinfo.Types;
215
import com.google.pubsub.v1.PubsubMessage;
216
217
public class MessageWithMetadata {
218
public String data;
219
public String messageId;
220
public long publishTime;
221
public Map<String, String> attributes;
222
}
223
224
public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {
225
@Override
226
public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {
227
MessageWithMetadata result = new MessageWithMetadata();
228
result.data = message.getData().toStringUtf8();
229
result.messageId = message.getMessageId();
230
result.publishTime = message.getPublishTime().getSeconds();
231
result.attributes = message.getAttributesMap();
232
return result;
233
}
234
235
@Override
236
public boolean isEndOfStream(MessageWithMetadata nextElement) {
237
return false;
238
}
239
240
@Override
241
public TypeInformation<MessageWithMetadata> getProducedType() {
242
return TypeInformation.of(MessageWithMetadata.class);
243
}
244
}
245
246
PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()
247
.withDeserializationSchema(new MetadataDeserializationSchema())
248
.withProjectName("my-gcp-project")
249
.withSubscriptionName("my-subscription")
250
.build();
251
```
252
253
## Important Notes
254
255
- **Checkpointing Required**: PubSubSource requires checkpointing to be enabled for exactly-once processing. The source will throw an IllegalArgumentException if checkpointing is disabled.
256
257
- **Acknowledgment Strategy**: Messages are acknowledged only after successful checkpoint completion, ensuring exactly-once processing guarantees.
258
259
- **Rate Limiting**: The rate limit applies per parallel subtask. Total throughput = rate limit × parallelism.
260
261
- **Retries**: Failed pull requests are automatically retried according to the configured retry policy.
262
263
- **Thread Safety**: The source is designed to work safely in Flink's parallel execution environment.