or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

deserialization.mdindex.mdsink.mdsource.md

source.mddocs/

0

# Message Consumption (Source)

1

2

The PubSubSource provides high-performance message consumption from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees through Flink's checkpointing mechanism.

3

4

## Capabilities

5

6

### PubSubSource Builder

7

8

Creates a new PubSubSource using the builder pattern. The builder enforces required parameters through type-safe interfaces.

9

10

```java { .api }

11

/**

12

* Creates a new builder for PubSubSource

13

* @return DeserializationSchemaBuilder for setting deserialization schema

14

*/

15

public static DeserializationSchemaBuilder newBuilder();

16

```

17

18

### Deserialization Schema Configuration

19

20

Configure how messages are deserialized from Pub/Sub.

21

22

```java { .api }

23

public static class DeserializationSchemaBuilder {

24

/**

25

* Set standard Flink DeserializationSchema for message data only

26

* @param deserializationSchema Schema for deserializing message data

27

* @return ProjectNameBuilder for next configuration step

28

*/

29

public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(

30

DeserializationSchema<OUT> deserializationSchema);

31

32

/**

33

* Set PubSub-specific deserialization schema with metadata access

34

* @param deserializationSchema Schema with access to full PubsubMessage

35

* @return ProjectNameBuilder for next configuration step

36

*/

37

public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(

38

PubSubDeserializationSchema<OUT> deserializationSchema);

39

}

40

```

41

42

### Project and Subscription Configuration

43

44

Configure the GCP project and Pub/Sub subscription.

45

46

```java { .api }

47

public interface ProjectNameBuilder<OUT> {

48

/**

49

* Set the GCP project name containing the subscription

50

* @param projectName GCP project name

51

* @return SubscriptionNameBuilder for next configuration step

52

*/

53

SubscriptionNameBuilder<OUT> withProjectName(String projectName);

54

}

55

56

public interface SubscriptionNameBuilder<OUT> {

57

/**

58

* Set the Pub/Sub subscription name to consume from

59

* @param subscriptionName Subscription name

60

* @return PubSubSourceBuilder for additional configuration

61

*/

62

PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);

63

}

64

```

65

66

### Source Builder Configuration

67

68

Main builder class for configuring optional parameters.

69

70

```java { .api }

71

public static class PubSubSourceBuilder<OUT> {

72

/**

73

* Set authentication credentials (optional - uses default credentials if not set)

74

* @param credentials Google Cloud credentials

75

* @return Current builder instance

76

*/

77

public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);

78

79

/**

80

* Set custom subscriber factory for advanced configuration

81

* @param pubSubSubscriberFactory Custom factory implementation

82

* @return Current builder instance

83

*/

84

public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(

85

PubSubSubscriberFactory pubSubSubscriberFactory);

86

87

/**

88

* Configure default subscriber factory with custom parameters

89

* @param maxMessagesPerPull Maximum messages per pull request (default: 100)

90

* @param perRequestTimeout Timeout per request (default: 15 seconds)

91

* @param retries Number of retries on failure (default: 3)

92

* @return Current builder instance

93

*/

94

public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(

95

int maxMessagesPerPull, Duration perRequestTimeout, int retries);

96

97

/**

98

* Set message rate limit per parallel instance (default: 100000 messages/second)

99

* @param messagePerSecondRateLimit Rate limit per parallel subtask

100

* @return Current builder instance

101

*/

102

public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);

103

104

/**

105

* Build the configured PubSubSource

106

* @return Configured PubSubSource instance

107

* @throws IOException If credentials cannot be obtained

108

* @throws IllegalArgumentException If required fields are missing

109

*/

110

public PubSubSource<OUT> build() throws IOException;

111

}

112

```

113

114

### Core Source Methods

115

116

Key methods of the PubSubSource class.

117

118

```java { .api }

119

public class PubSubSource<OUT> extends RichSourceFunction<OUT>

120

implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>,

121

CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {

122

123

/**

124

* Get type information for elements produced by this source

125

* @return TypeInformation for output elements

126

*/

127

public TypeInformation<OUT> getProducedType();

128

129

/**

130

* Called when checkpoint completes - acknowledges messages

131

* @param checkpointId Completed checkpoint ID

132

*/

133

public void notifyCheckpointComplete(long checkpointId) throws Exception;

134

135

/**

136

* Cancel the source function

137

*/

138

public void cancel();

139

}

140

```

141

142

## Usage Examples

143

144

### Basic String Consumption

145

146

```java

147

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

148

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;

149

import org.apache.flink.api.common.serialization.SimpleStringSchema;

150

151

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

152

env.enableCheckpointing(30000); // Required for exactly-once processing

153

154

PubSubSource<String> source = PubSubSource.newBuilder()

155

.withDeserializationSchema(new SimpleStringSchema())

156

.withProjectName("my-gcp-project")

157

.withSubscriptionName("my-subscription")

158

.build();

159

160

env.addSource(source)

161

.print();

162

163

env.execute("Basic Pub/Sub Consumer");

164

```

165

166

### JSON Message Consumption with Custom Schema

167

168

```java

169

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

170

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;

171

172

public class JsonDeserializationSchema extends AbstractDeserializationSchema<MyObject> {

173

private ObjectMapper mapper = new ObjectMapper();

174

175

@Override

176

public MyObject deserialize(byte[] message) throws IOException {

177

return mapper.readValue(message, MyObject.class);

178

}

179

}

180

181

PubSubSource<MyObject> source = PubSubSource.newBuilder()

182

.withDeserializationSchema(new JsonDeserializationSchema())

183

.withProjectName("my-gcp-project")

184

.withSubscriptionName("json-messages")

185

.build();

186

```

187

188

### Advanced Configuration with Custom Credentials

189

190

```java

191

import com.google.auth.oauth2.ServiceAccountCredentials;

192

import java.io.FileInputStream;

193

import java.time.Duration;

194

195

// Load service account credentials

196

Credentials credentials = ServiceAccountCredentials

197

.fromStream(new FileInputStream("path/to/service-account.json"));

198

199

PubSubSource<String> source = PubSubSource.newBuilder()

200

.withDeserializationSchema(new SimpleStringSchema())

201

.withProjectName("my-gcp-project")

202

.withSubscriptionName("my-subscription")

203

.withCredentials(credentials)

204

.withPubSubSubscriberFactory(200, Duration.ofSeconds(30), 5) // Custom timeouts

205

.withMessageRateLimit(50000) // Limit to 50k messages/second per subtask

206

.build();

207

```

208

209

### Message Consumption with Metadata Access

210

211

```java

212

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;

213

import org.apache.flink.api.common.typeinfo.TypeInformation;

214

import org.apache.flink.api.common.typeinfo.Types;

215

import com.google.pubsub.v1.PubsubMessage;

216

217

public class MessageWithMetadata {

218

public String data;

219

public String messageId;

220

public long publishTime;

221

public Map<String, String> attributes;

222

}

223

224

public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {

225

@Override

226

public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {

227

MessageWithMetadata result = new MessageWithMetadata();

228

result.data = message.getData().toStringUtf8();

229

result.messageId = message.getMessageId();

230

result.publishTime = message.getPublishTime().getSeconds();

231

result.attributes = message.getAttributesMap();

232

return result;

233

}

234

235

@Override

236

public boolean isEndOfStream(MessageWithMetadata nextElement) {

237

return false;

238

}

239

240

@Override

241

public TypeInformation<MessageWithMetadata> getProducedType() {

242

return TypeInformation.of(MessageWithMetadata.class);

243

}

244

}

245

246

PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()

247

.withDeserializationSchema(new MetadataDeserializationSchema())

248

.withProjectName("my-gcp-project")

249

.withSubscriptionName("my-subscription")

250

.build();

251

```

252

253

## Important Notes

254

255

- **Checkpointing Required**: PubSubSource requires checkpointing to be enabled for exactly-once processing. The source will throw an IllegalArgumentException if checkpointing is disabled.

256

257

- **Acknowledgment Strategy**: Messages are acknowledged only after successful checkpoint completion, ensuring exactly-once processing guarantees.

258

259

- **Rate Limiting**: The rate limit applies per parallel subtask. Total throughput = rate limit × parallelism.

260

261

- **Retries**: Failed pull requests are automatically retried according to the configured retry policy.

262

263

- **Thread Safety**: The source is designed to work safely in Flink's parallel execution environment.