or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

index.mddocs/

0

# Google Cloud Pub/Sub

1

2

Google Cloud Pub/Sub is a fully-managed real-time messaging service that enables reliable, many-to-many, asynchronous messaging between applications. The Python client library provides comprehensive publisher and subscriber functionality with automatic retry logic, flow control, message ordering capabilities, and authentication integration with Google Cloud identity services.

3

4

## Package Information

5

6

- **Package Name**: google-cloud-pubsub

7

- **Language**: Python

8

- **Installation**: `pip install google-cloud-pubsub`

9

10

## Core Imports

11

12

```python

13

from google.cloud import pubsub_v1

14

```

15

16

Import specific components:

17

18

```python

19

from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient, types

20

```

21

22

## Basic Usage

23

24

```python

25

from google.cloud import pubsub_v1

26

27

# Create clients

28

publisher = pubsub_v1.PublisherClient()

29

subscriber = pubsub_v1.SubscriberClient()

30

31

# Publishing messages

32

topic_path = publisher.topic_path("my-project", "my-topic")

33

message_data = b"Hello, World!"

34

future = publisher.publish(topic_path, message_data)

35

message_id = future.result()

36

37

# Subscribing to messages

38

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

39

40

def callback(message):

41

print(f"Received: {message.data.decode('utf-8')}")

42

message.ack()

43

44

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

45

```

46

47

## Architecture

48

49

Google Cloud Pub/Sub follows a publisher-subscriber messaging pattern with the following key components:

50

51

- **Publisher**: Sends messages to topics with batching, flow control, and ordering support

52

- **Subscriber**: Receives messages from subscriptions with automatic acknowledgment and flow control

53

- **Topics**: Named resources to which publishers send messages

54

- **Subscriptions**: Named resources representing a feed of messages from a specific topic

55

- **Messages**: Data payloads with attributes and metadata

56

- **Schema Service**: Manages message schemas for validation and evolution

57

58

The library provides both high-level clients with advanced features (batching, flow control, futures) and low-level GAPIC clients for direct API access.

59

60

## Capabilities

61

62

### Publisher Client

63

64

High-level client for publishing messages to Pub/Sub topics with automatic batching, flow control, message ordering, and OpenTelemetry integration.

65

66

```python { .api }

67

class PublisherClient:

68

def __init__(self, batch_settings=None, publisher_options=None, **kwargs): ...

69

def publish(self, topic, data, ordering_key=None, **attrs): ...

70

def resume_publish(self, topic, ordering_key): ...

71

def stop(self): ...

72

```

73

74

[Publisher Client](./publisher.md)

75

76

### Subscriber Client

77

78

High-level client for subscribing to Pub/Sub subscriptions with automatic message handling, flow control, and OpenTelemetry integration.

79

80

```python { .api }

81

class SubscriberClient:

82

def __init__(self, flow_control=None, subscriber_options=None, **kwargs): ...

83

def subscribe(self, subscription, callback, flow_control=None, scheduler=None): ...

84

def close(self): ...

85

```

86

87

[Subscriber Client](./subscriber.md)

88

89

### Schema Service Client

90

91

Client for managing Pub/Sub schemas including creation, validation, and evolution of message schemas.

92

93

```python { .api }

94

class SchemaServiceClient:

95

def create_schema(self, request=None, **kwargs): ...

96

def get_schema(self, request=None, **kwargs): ...

97

def list_schemas(self, request=None, **kwargs): ...

98

def validate_schema(self, request=None, **kwargs): ...

99

def validate_message(self, request=None, **kwargs): ...

100

```

101

102

[Schema Service](./schema-service.md)

103

104

### Message Handling

105

106

Message objects and utilities for handling received messages, including acknowledgment, negative acknowledgment, and deadline modification.

107

108

```python { .api }

109

class Message:

110

def ack(self): ...

111

def nack(self): ...

112

def modify_ack_deadline(self, seconds): ...

113

def ack_with_response(self): ...

114

def nack_with_response(self): ...

115

```

116

117

[Message Handling](./message-handling.md)

118

119

### Configuration Types

120

121

Comprehensive configuration options for client behavior including batching, flow control, publisher options, and subscriber options.

122

123

```python { .api }

124

class BatchSettings(NamedTuple):

125

max_bytes: int = 1000000

126

max_latency: float = 0.01

127

max_messages: int = 100

128

129

class PublisherOptions(NamedTuple):

130

enable_message_ordering: bool = False

131

flow_control: PublishFlowControl = PublishFlowControl()

132

retry: OptionalRetry = ...

133

timeout: OptionalTimeout = ...

134

```

135

136

[Configuration](./configuration.md)

137

138

### Protobuf Types

139

140

Complete set of protobuf message types for Pub/Sub operations including topics, subscriptions, messages, and requests/responses.

141

142

```python { .api }

143

class PubsubMessage: ...

144

class Topic: ...

145

class Subscription: ...

146

class PublishRequest: ...

147

class PublishResponse: ...

148

```

149

150

[Types](./types.md)

151

152

### Exception Handling

153

154

Comprehensive exception types for handling errors in publishing and subscribing operations, including flow control errors and acknowledgment failures.

155

156

```python { .api }

157

# Publisher exceptions

158

class PublishError(Exception): ...

159

class MessageTooLargeError(PublishError): ...

160

class PublishToPausedOrderingKeyException(PublishError): ...

161

class FlowControlLimitError(PublishError): ...

162

163

# Subscriber exceptions

164

class AcknowledgeError(Exception): ...

165

class AcknowledgeStatus(Enum): ...

166

class TimeoutError(Exception): ...

167

```

168

169

[Exception Handling](./exceptions.md)

170

171

### Schedulers and Utilities

172

173

Scheduler classes and utility functions for controlling message processing behavior and resource management in subscriber operations.

174

175

```python { .api }

176

class ThreadScheduler:

177

def __init__(self, executor: Optional[ThreadPoolExecutor] = None): ...

178

def schedule(self, callback: Callable, *args, **kwargs) -> Future: ...

179

def shutdown(self, wait: bool = True) -> None: ...

180

181

class StreamingPullFuture:

182

def cancel(self) -> bool: ...

183

def result(self, timeout: Optional[float] = None) -> None: ...

184

```

185

186

[Schedulers and Utilities](./schedulers.md)

187

188

## Types

189

190

```python { .api }

191

from typing import Callable, Optional, Union, Any, Sequence

192

from concurrent.futures import Future as ConcurrentFuture

193

from google.api_core import retry, gapic_v1

194

195

# Core type aliases

196

MessageCallback = Callable[[Message], Any]

197

OptionalRetry = Union[retry.Retry, object]

198

OptionalTimeout = Union[float, object]

199

DEFAULT = gapic_v1.method.DEFAULT

200

201

# Future type for publisher operations

202

class Future(ConcurrentFuture):

203

def result(self, timeout: Optional[float] = None) -> str: ...

204

def add_done_callback(self, callback: Callable[["Future"], None]) -> None: ...

205

206

# Scheduler type for subscriber

207

class ThreadScheduler:

208

"""Custom scheduler for message processing in subscriber."""

209

pass

210

211

# Streaming pull future for subscriber operations

212

class StreamingPullFuture:

213

def cancel(self) -> bool: ...

214

def cancelled(self) -> bool: ...

215

def running(self) -> bool: ...

216

def result(self, timeout: Optional[float] = None) -> None: ...

217

```