0
# Queue Configuration
1
2
Advanced queue configuration options including rate limiting, retry policies, App Engine routing, and logging settings for fine-tuned task processing behavior.
3
4
## Capabilities
5
6
### Queue Structure
7
8
Core queue configuration container with routing, rate limits, retry policies, and logging settings.
9
10
```python { .api }
11
class Queue:
12
name: str # Caller-specified queue name (required in create_queue)
13
app_engine_routing_override: AppEngineRouting # Overrides for task-level App Engine routing
14
rate_limits: RateLimits # Rate limits for task dispatches
15
retry_config: RetryConfig # Settings that determine retry behavior
16
state: Queue.State # Output only queue state
17
purge_time: timestamp_pb2.Timestamp # Output only last purge time
18
stackdriver_logging_config: StackdriverLoggingConfig # Logging configuration
19
20
class State:
21
STATE_UNSPECIFIED = 0 # Unspecified state
22
RUNNING = 1 # Queue is running, tasks can be dispatched
23
PAUSED = 2 # Tasks are paused by user
24
DISABLED = 3 # Queue is disabled
25
```
26
27
### Rate Limiting
28
29
Control task dispatch frequency and concurrency to manage system load and external service rate limits.
30
31
```python { .api }
32
class RateLimits:
33
max_dispatches_per_second: float # Maximum rate at which tasks are dispatched (max 500)
34
max_burst_size: int # Output only max burst size for token bucket algorithm
35
max_concurrent_dispatches: int # Maximum number of concurrent tasks (max 5000)
36
```
37
38
### Retry Configuration
39
40
Fine-tune automatic retry behavior for failed tasks with exponential backoff and limits.
41
42
```python { .api }
43
class RetryConfig:
44
max_attempts: int # Number of attempts per task (-1 for unlimited)
45
max_retry_duration: duration_pb2.Duration # Time limit for retrying failed tasks
46
min_backoff: duration_pb2.Duration # Minimum backoff between retries
47
max_backoff: duration_pb2.Duration # Maximum backoff between retries
48
max_doublings: int # Number of times retry interval doubles
49
```
50
51
### Logging Configuration
52
53
Control what task execution information is logged to Cloud Logging for monitoring and debugging.
54
55
```python { .api }
56
class StackdriverLoggingConfig:
57
sampling_ratio: float # Fraction of operations to log (0.0-1.0, default 0.0)
58
```
59
60
### App Engine Routing Override
61
62
Default routing settings applied to all App Engine tasks in the queue unless overridden at the task level.
63
64
```python { .api }
65
class AppEngineRouting:
66
service: str # App service (default service if not specified)
67
version: str # App version (default version if not specified)
68
instance: str # App instance (available instance if not specified)
69
host: str # Output only host that task is sent to
70
```
71
72
## Usage Examples
73
74
### Basic Queue Configuration
75
76
```python
77
from google.cloud import tasks
78
from google.protobuf import duration_pb2
79
80
client = tasks.CloudTasksClient()
81
82
# Create a queue with rate limits
83
queue_path = client.queue_path('my-project-id', 'us-central1', 'configured-queue')
84
parent = client.common_location_path('my-project-id', 'us-central1')
85
86
queue = tasks.Queue(
87
name=queue_path,
88
rate_limits=tasks.RateLimits(
89
max_dispatches_per_second=5.0, # 5 tasks per second max
90
max_concurrent_dispatches=10 # Max 10 concurrent tasks
91
)
92
)
93
94
created_queue = client.create_queue(parent=parent, queue=queue)
95
print(f'Created queue with rate limits: {created_queue.name}')
96
```
97
98
### Advanced Retry Configuration
99
100
```python
101
from google.cloud import tasks
102
from google.protobuf import duration_pb2
103
104
client = tasks.CloudTasksClient()
105
queue_path = client.queue_path('my-project-id', 'us-central1', 'retry-queue')
106
parent = client.common_location_path('my-project-id', 'us-central1')
107
108
# Configure detailed retry behavior
109
retry_config = tasks.RetryConfig(
110
max_attempts=5, # Try up to 5 times
111
max_retry_duration=duration_pb2.Duration(seconds=3600), # 1 hour max
112
min_backoff=duration_pb2.Duration(seconds=5), # Start with 5 second backoff
113
max_backoff=duration_pb2.Duration(seconds=300), # Cap at 5 minutes
114
max_doublings=3 # Double backoff 3 times: 5s, 10s, 20s, then cap at 300s
115
)
116
117
queue = tasks.Queue(
118
name=queue_path,
119
retry_config=retry_config
120
)
121
122
created_queue = client.create_queue(parent=parent, queue=queue)
123
print(f'Created queue with retry config: {created_queue.retry_config.max_attempts}')
124
```
125
126
### App Engine Routing Configuration
127
128
```python
129
from google.cloud import tasks
130
131
client = tasks.CloudTasksClient()
132
queue_path = client.queue_path('my-project-id', 'us-central1', 'appengine-queue')
133
parent = client.common_location_path('my-project-id', 'us-central1')
134
135
# Configure default App Engine routing for all tasks in queue
136
app_engine_routing = tasks.AppEngineRouting(
137
service='worker-service',
138
version='v2'
139
)
140
141
queue = tasks.Queue(
142
name=queue_path,
143
app_engine_routing_override=app_engine_routing
144
)
145
146
created_queue = client.create_queue(parent=parent, queue=queue)
147
print(f'Created App Engine queue: {created_queue.app_engine_routing_override.service}')
148
```
149
150
### Logging Configuration
151
152
```python
153
from google.cloud import tasks
154
155
client = tasks.CloudTasksClient()
156
queue_path = client.queue_path('my-project-id', 'us-central1', 'logged-queue')
157
parent = client.common_location_path('my-project-id', 'us-central1')
158
159
# Configure logging for debugging
160
logging_config = tasks.StackdriverLoggingConfig(
161
sampling_ratio=1.0 # Log all operations (for debugging)
162
)
163
164
queue = tasks.Queue(
165
name=queue_path,
166
stackdriver_logging_config=logging_config
167
)
168
169
created_queue = client.create_queue(parent=parent, queue=queue)
170
print(f'Created queue with full logging: {created_queue.stackdriver_logging_config.sampling_ratio}')
171
```
172
173
### Complete Queue Configuration
174
175
```python
176
from google.cloud import tasks
177
from google.protobuf import duration_pb2
178
179
client = tasks.CloudTasksClient()
180
queue_path = client.queue_path('my-project-id', 'us-central1', 'production-queue')
181
parent = client.common_location_path('my-project-id', 'us-central1')
182
183
# Production-ready queue with all configurations
184
queue = tasks.Queue(
185
name=queue_path,
186
187
# Rate limiting for external service protection
188
rate_limits=tasks.RateLimits(
189
max_dispatches_per_second=10.0,
190
max_concurrent_dispatches=20
191
),
192
193
# Robust retry configuration
194
retry_config=tasks.RetryConfig(
195
max_attempts=3,
196
max_retry_duration=duration_pb2.Duration(seconds=1800), # 30 minutes
197
min_backoff=duration_pb2.Duration(seconds=10),
198
max_backoff=duration_pb2.Duration(seconds=300),
199
max_doublings=2
200
),
201
202
# Default App Engine routing
203
app_engine_routing_override=tasks.AppEngineRouting(
204
service='task-processor',
205
version='stable'
206
),
207
208
# Moderate logging for monitoring
209
stackdriver_logging_config=tasks.StackdriverLoggingConfig(
210
sampling_ratio=0.1 # Log 10% of operations
211
)
212
)
213
214
created_queue = client.create_queue(parent=parent, queue=queue)
215
print(f'Created production queue: {created_queue.name}')
216
print(f'- Rate limit: {created_queue.rate_limits.max_dispatches_per_second}/sec')
217
print(f'- Max attempts: {created_queue.retry_config.max_attempts}')
218
print(f'- Default service: {created_queue.app_engine_routing_override.service}')
219
```
220
221
### Dynamic Queue Reconfiguration
222
223
```python
224
from google.cloud import tasks
225
from google.protobuf import field_mask_pb2, duration_pb2
226
227
client = tasks.CloudTasksClient()
228
queue_path = client.queue_path('my-project-id', 'us-central1', 'dynamic-queue')
229
230
# Get current queue configuration
231
current_queue = client.get_queue(name=queue_path)
232
233
# Modify rate limits for high traffic period
234
current_queue.rate_limits.max_dispatches_per_second = 50.0
235
current_queue.rate_limits.max_concurrent_dispatches = 100
236
237
# Update only the rate limits
238
update_mask = field_mask_pb2.FieldMask(
239
paths=[
240
'rate_limits.max_dispatches_per_second',
241
'rate_limits.max_concurrent_dispatches'
242
]
243
)
244
245
updated_queue = client.update_queue(
246
queue=current_queue,
247
update_mask=update_mask
248
)
249
250
print(f'Updated rate limits to {updated_queue.rate_limits.max_dispatches_per_second}/sec')
251
252
# Later, modify retry configuration for better reliability
253
current_queue.retry_config.max_attempts = 5
254
current_queue.retry_config.min_backoff = duration_pb2.Duration(seconds=30)
255
256
retry_update_mask = field_mask_pb2.FieldMask(
257
paths=[
258
'retry_config.max_attempts',
259
'retry_config.min_backoff'
260
]
261
)
262
263
updated_queue = client.update_queue(
264
queue=current_queue,
265
update_mask=retry_update_mask
266
)
267
268
print(f'Updated retry config: {updated_queue.retry_config.max_attempts} attempts')
269
```
270
271
### Queue State Management
272
273
```python
274
from google.cloud import tasks
275
276
client = tasks.CloudTasksClient()
277
queue_path = client.queue_path('my-project-id', 'us-central1', 'state-queue')
278
279
# Check queue state
280
queue = client.get_queue(name=queue_path)
281
print(f'Queue state: {queue.state}')
282
283
if queue.state == tasks.Queue.State.RUNNING:
284
print('Queue is actively processing tasks')
285
elif queue.state == tasks.Queue.State.PAUSED:
286
print('Queue is paused - tasks are not being dispatched')
287
elif queue.state == tasks.Queue.State.DISABLED:
288
print('Queue is disabled')
289
290
# Pause queue for maintenance
291
paused_queue = client.pause_queue(name=queue_path)
292
print(f'Queue paused: {paused_queue.state == tasks.Queue.State.PAUSED}')
293
294
# Resume normal operations
295
resumed_queue = client.resume_queue(name=queue_path)
296
print(f'Queue resumed: {resumed_queue.state == tasks.Queue.State.RUNNING}')
297
```