0
# PubSub Source
1
2
The PubSubSource provides exactly-once message consumption from Google Cloud Pub/Sub subscriptions with automatic acknowledgment management through Flink's checkpointing mechanism.
3
4
## Capabilities
5
6
### Source Creation
7
8
Create a PubSubSource using the builder pattern with required project name, subscription name, and deserialization schema.
9
10
```java { .api }
11
/**
12
* Creates a new builder for PubSubSource configuration
13
* @return DeserializationSchemaBuilder instance to start configuration
14
*/
15
public static DeserializationSchemaBuilder newBuilder();
16
17
public static class DeserializationSchemaBuilder {
18
/**
19
* Set standard Flink DeserializationSchema (extracts only message data)
20
* @param deserializationSchema Schema for deserializing message payload
21
* @return ProjectNameBuilder for next configuration step
22
*/
23
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);
24
25
/**
26
* Set PubSub-specific deserialization schema (provides access to full message)
27
* @param deserializationSchema Schema with access to PubSub message metadata
28
* @return ProjectNameBuilder for next configuration step
29
*/
30
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema);
31
}
32
```
33
34
### Builder Configuration
35
36
Configure project name, subscription name, and optional parameters.
37
38
```java { .api }
39
public interface ProjectNameBuilder<OUT> {
40
/**
41
* Set the GCP project name containing the subscription
42
* @param projectName Google Cloud project name
43
* @return SubscriptionNameBuilder for next configuration step
44
*/
45
SubscriptionNameBuilder<OUT> withProjectName(String projectName);
46
}
47
48
public interface SubscriptionNameBuilder<OUT> {
49
/**
50
* Set the Pub/Sub subscription name to consume from
51
* @param subscriptionName Pub/Sub subscription name
52
* @return PubSubSourceBuilder for optional configuration
53
*/
54
PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
55
}
56
57
public static class PubSubSourceBuilder<OUT> {
58
/**
59
* Set custom GCP credentials (optional, defaults to environment credentials)
60
* @param credentials Google Cloud credentials
61
* @return Current builder instance
62
*/
63
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
64
65
/**
66
* Set custom subscriber factory for advanced configuration (optional)
67
* @param pubSubSubscriberFactory Custom factory for creating subscribers
68
* @return Current builder instance
69
*/
70
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory pubSubSubscriberFactory);
71
72
/**
73
* Configure default subscriber factory with specific parameters (optional)
74
* @param maxMessagesPerPull Number of messages pulled per request (default: 100)
75
* @param perRequestTimeout Timeout per pull request (default: 15 seconds)
76
* @param retries Number of retries for failed requests (default: 3)
77
* @return Current builder instance
78
*/
79
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);
80
81
/**
82
* Set message rate limit per parallel source instance (optional)
83
* @param messagePerSecondRateLimit Messages per second limit (default: 100000)
84
* @return Current builder instance
85
*/
86
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
87
88
/**
89
* Build the configured PubSubSource instance
90
* @return Configured PubSubSource ready for use
91
* @throws IOException If credentials cannot be obtained
92
* @throws IllegalArgumentException If required fields are missing or checkpointing is disabled
93
*/
94
public PubSubSource<OUT> build() throws IOException;
95
}
96
```
97
98
## Usage Examples
99
100
### Basic String Message Consumer
101
102
```java
103
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
104
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
105
import org.apache.flink.api.common.serialization.SimpleStringSchema;
106
107
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
108
env.enableCheckpointing(30000); // Required for exactly-once guarantees
109
110
PubSubSource<String> source = PubSubSource.newBuilder()
111
.withDeserializationSchema(new SimpleStringSchema())
112
.withProjectName("my-gcp-project")
113
.withSubscriptionName("my-subscription")
114
.build();
115
116
DataStream<String> stream = env.addSource(source);
117
stream.print();
118
119
env.execute("Basic PubSub Consumer");
120
```
121
122
### JSON Message Consumer with Custom Deserialization
123
124
```java
125
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
126
import com.google.pubsub.v1.PubsubMessage;
127
import com.fasterxml.jackson.databind.ObjectMapper;
128
129
public class JsonUserDeserializer implements PubSubDeserializationSchema<User> {
130
private transient ObjectMapper objectMapper;
131
132
@Override
133
public void open(DeserializationSchema.InitializationContext context) {
134
objectMapper = new ObjectMapper();
135
}
136
137
@Override
138
public User deserialize(PubsubMessage message) throws Exception {
139
String json = message.getData().toStringUtf8();
140
return objectMapper.readValue(json, User.class);
141
}
142
143
@Override
144
public boolean isEndOfStream(User nextElement) {
145
return false;
146
}
147
148
@Override
149
public TypeInformation<User> getProducedType() {
150
return TypeInformation.of(User.class);
151
}
152
}
153
154
// Usage
155
PubSubSource<User> userSource = PubSubSource.newBuilder()
156
.withDeserializationSchema(new JsonUserDeserializer())
157
.withProjectName("my-project")
158
.withSubscriptionName("user-events")
159
.build();
160
```
161
162
### Advanced Configuration with Rate Limiting
163
164
```java
165
import java.time.Duration;
166
167
PubSubSource<String> source = PubSubSource.newBuilder()
168
.withDeserializationSchema(new SimpleStringSchema())
169
.withProjectName("my-project")
170
.withSubscriptionName("high-volume-subscription")
171
.withPubSubSubscriberFactory(
172
500, // maxMessagesPerPull
173
Duration.ofSeconds(30), // perRequestTimeout
174
5 // retries
175
)
176
.withMessageRateLimit(10000) // 10K messages per second per subtask
177
.build();
178
```
179
180
### Using Custom Credentials
181
182
```java
183
import com.google.auth.oauth2.ServiceAccountCredentials;
184
import java.io.FileInputStream;
185
186
Credentials credentials = ServiceAccountCredentials.fromStream(
187
new FileInputStream("path/to/service-account-key.json")
188
);
189
190
PubSubSource<String> source = PubSubSource.newBuilder()
191
.withDeserializationSchema(new SimpleStringSchema())
192
.withProjectName("my-project")
193
.withSubscriptionName("my-subscription")
194
.withCredentials(credentials)
195
.build();
196
```
197
198
## Message Acknowledgment
199
200
The PubSubSource automatically manages message acknowledgment through Flink's checkpointing mechanism:
201
202
1. **Message Reception**: Messages are pulled from the subscription and added to pending acknowledgments
203
2. **Processing**: Messages are deserialized and emitted to the Flink stream
204
3. **Checkpointing**: When a checkpoint completes successfully, all messages received before that checkpoint are acknowledged
205
4. **Failure Recovery**: If a checkpoint fails, messages remain unacknowledged and will be redelivered by Pub/Sub
206
207
This ensures exactly-once processing semantics - each message is processed exactly once, even in the presence of failures.
208
209
## Important Requirements
210
211
- **Checkpointing**: Flink checkpointing MUST be enabled. The source will throw IllegalArgumentException if checkpointing is disabled
212
- **Checkpoint Frequency**: Checkpoint frequency should be much lower than Pub/Sub's acknowledgment timeout (default 600 seconds)
213
- **Parallel Processing**: Each parallel subtask creates its own subscriber and manages its own acknowledgments
214
- **Rate Limiting**: Rate limits are applied per parallel subtask, not globally
215
216
## Error Handling
217
218
- **Connection Failures**: Automatic retry with exponential backoff through the subscriber factory
219
- **Deserialization Errors**: Exceptions during deserialization will cause the job to fail
220
- **Acknowledgment Failures**: Failed acknowledgments during checkpoint completion will cause checkpoint failure
221
- **Timeout Handling**: Pull request timeouts are handled gracefully with configurable retry logic