or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-operations.mdiam-security.mdindex.mdqueue-configuration.mdqueue-management.mdtask-management.mdtask-targeting.md

queue-configuration.mddocs/

0

# Queue Configuration

1

2

Advanced queue configuration options including rate limiting, retry policies, App Engine routing, and logging settings for fine-tuned task processing behavior.

3

4

## Capabilities

5

6

### Queue Structure

7

8

Core queue configuration container with routing, rate limits, retry policies, and logging settings.

9

10

```python { .api }

11

class Queue:

12

name: str # Caller-specified queue name (required in create_queue)

13

app_engine_routing_override: AppEngineRouting # Overrides for task-level App Engine routing

14

rate_limits: RateLimits # Rate limits for task dispatches

15

retry_config: RetryConfig # Settings that determine retry behavior

16

state: Queue.State # Output only queue state

17

purge_time: timestamp_pb2.Timestamp # Output only last purge time

18

stackdriver_logging_config: StackdriverLoggingConfig # Logging configuration

19

20

class State:

21

STATE_UNSPECIFIED = 0 # Unspecified state

22

RUNNING = 1 # Queue is running, tasks can be dispatched

23

PAUSED = 2 # Tasks are paused by user

24

DISABLED = 3 # Queue is disabled

25

```

26

27

### Rate Limiting

28

29

Control task dispatch frequency and concurrency to manage system load and external service rate limits.

30

31

```python { .api }

32

class RateLimits:

33

max_dispatches_per_second: float # Maximum rate at which tasks are dispatched (max 500)

34

max_burst_size: int # Output only max burst size for token bucket algorithm

35

max_concurrent_dispatches: int # Maximum number of concurrent tasks (max 5000)

36

```

37

38

### Retry Configuration

39

40

Fine-tune automatic retry behavior for failed tasks with exponential backoff and limits.

41

42

```python { .api }

43

class RetryConfig:

44

max_attempts: int # Number of attempts per task (-1 for unlimited)

45

max_retry_duration: duration_pb2.Duration # Time limit for retrying failed tasks

46

min_backoff: duration_pb2.Duration # Minimum backoff between retries

47

max_backoff: duration_pb2.Duration # Maximum backoff between retries

48

max_doublings: int # Number of times retry interval doubles

49

```

50

51

### Logging Configuration

52

53

Control what task execution information is logged to Cloud Logging for monitoring and debugging.

54

55

```python { .api }

56

class StackdriverLoggingConfig:

57

sampling_ratio: float # Fraction of operations to log (0.0-1.0, default 0.0)

58

```

59

60

### App Engine Routing Override

61

62

Default routing settings applied to all App Engine tasks in the queue unless overridden at the task level.

63

64

```python { .api }

65

class AppEngineRouting:

66

service: str # App service (default service if not specified)

67

version: str # App version (default version if not specified)

68

instance: str # App instance (available instance if not specified)

69

host: str # Output only host that task is sent to

70

```

71

72

## Usage Examples

73

74

### Basic Queue Configuration

75

76

```python

77

from google.cloud import tasks

78

from google.protobuf import duration_pb2

79

80

client = tasks.CloudTasksClient()

81

82

# Create a queue with rate limits

83

queue_path = client.queue_path('my-project-id', 'us-central1', 'configured-queue')

84

parent = client.common_location_path('my-project-id', 'us-central1')

85

86

queue = tasks.Queue(

87

name=queue_path,

88

rate_limits=tasks.RateLimits(

89

max_dispatches_per_second=5.0, # 5 tasks per second max

90

max_concurrent_dispatches=10 # Max 10 concurrent tasks

91

)

92

)

93

94

created_queue = client.create_queue(parent=parent, queue=queue)

95

print(f'Created queue with rate limits: {created_queue.name}')

96

```

97

98

### Advanced Retry Configuration

99

100

```python

101

from google.cloud import tasks

102

from google.protobuf import duration_pb2

103

104

client = tasks.CloudTasksClient()

105

queue_path = client.queue_path('my-project-id', 'us-central1', 'retry-queue')

106

parent = client.common_location_path('my-project-id', 'us-central1')

107

108

# Configure detailed retry behavior

109

retry_config = tasks.RetryConfig(

110

max_attempts=5, # Try up to 5 times

111

max_retry_duration=duration_pb2.Duration(seconds=3600), # 1 hour max

112

min_backoff=duration_pb2.Duration(seconds=5), # Start with 5 second backoff

113

max_backoff=duration_pb2.Duration(seconds=300), # Cap at 5 minutes

114

max_doublings=3 # Double backoff 3 times: 5s, 10s, 20s, then cap at 300s

115

)

116

117

queue = tasks.Queue(

118

name=queue_path,

119

retry_config=retry_config

120

)

121

122

created_queue = client.create_queue(parent=parent, queue=queue)

123

print(f'Created queue with retry config: {created_queue.retry_config.max_attempts}')

124

```

125

126

### App Engine Routing Configuration

127

128

```python

129

from google.cloud import tasks

130

131

client = tasks.CloudTasksClient()

132

queue_path = client.queue_path('my-project-id', 'us-central1', 'appengine-queue')

133

parent = client.common_location_path('my-project-id', 'us-central1')

134

135

# Configure default App Engine routing for all tasks in queue

136

app_engine_routing = tasks.AppEngineRouting(

137

service='worker-service',

138

version='v2'

139

)

140

141

queue = tasks.Queue(

142

name=queue_path,

143

app_engine_routing_override=app_engine_routing

144

)

145

146

created_queue = client.create_queue(parent=parent, queue=queue)

147

print(f'Created App Engine queue: {created_queue.app_engine_routing_override.service}')

148

```

149

150

### Logging Configuration

151

152

```python

153

from google.cloud import tasks

154

155

client = tasks.CloudTasksClient()

156

queue_path = client.queue_path('my-project-id', 'us-central1', 'logged-queue')

157

parent = client.common_location_path('my-project-id', 'us-central1')

158

159

# Configure logging for debugging

160

logging_config = tasks.StackdriverLoggingConfig(

161

sampling_ratio=1.0 # Log all operations (for debugging)

162

)

163

164

queue = tasks.Queue(

165

name=queue_path,

166

stackdriver_logging_config=logging_config

167

)

168

169

created_queue = client.create_queue(parent=parent, queue=queue)

170

print(f'Created queue with full logging: {created_queue.stackdriver_logging_config.sampling_ratio}')

171

```

172

173

### Complete Queue Configuration

174

175

```python

176

from google.cloud import tasks

177

from google.protobuf import duration_pb2

178

179

client = tasks.CloudTasksClient()

180

queue_path = client.queue_path('my-project-id', 'us-central1', 'production-queue')

181

parent = client.common_location_path('my-project-id', 'us-central1')

182

183

# Production-ready queue with all configurations

184

queue = tasks.Queue(

185

name=queue_path,

186

187

# Rate limiting for external service protection

188

rate_limits=tasks.RateLimits(

189

max_dispatches_per_second=10.0,

190

max_concurrent_dispatches=20

191

),

192

193

# Robust retry configuration

194

retry_config=tasks.RetryConfig(

195

max_attempts=3,

196

max_retry_duration=duration_pb2.Duration(seconds=1800), # 30 minutes

197

min_backoff=duration_pb2.Duration(seconds=10),

198

max_backoff=duration_pb2.Duration(seconds=300),

199

max_doublings=2

200

),

201

202

# Default App Engine routing

203

app_engine_routing_override=tasks.AppEngineRouting(

204

service='task-processor',

205

version='stable'

206

),

207

208

# Moderate logging for monitoring

209

stackdriver_logging_config=tasks.StackdriverLoggingConfig(

210

sampling_ratio=0.1 # Log 10% of operations

211

)

212

)

213

214

created_queue = client.create_queue(parent=parent, queue=queue)

215

print(f'Created production queue: {created_queue.name}')

216

print(f'- Rate limit: {created_queue.rate_limits.max_dispatches_per_second}/sec')

217

print(f'- Max attempts: {created_queue.retry_config.max_attempts}')

218

print(f'- Default service: {created_queue.app_engine_routing_override.service}')

219

```

220

221

### Dynamic Queue Reconfiguration

222

223

```python

224

from google.cloud import tasks

225

from google.protobuf import field_mask_pb2, duration_pb2

226

227

client = tasks.CloudTasksClient()

228

queue_path = client.queue_path('my-project-id', 'us-central1', 'dynamic-queue')

229

230

# Get current queue configuration

231

current_queue = client.get_queue(name=queue_path)

232

233

# Modify rate limits for high traffic period

234

current_queue.rate_limits.max_dispatches_per_second = 50.0

235

current_queue.rate_limits.max_concurrent_dispatches = 100

236

237

# Update only the rate limits

238

update_mask = field_mask_pb2.FieldMask(

239

paths=[

240

'rate_limits.max_dispatches_per_second',

241

'rate_limits.max_concurrent_dispatches'

242

]

243

)

244

245

updated_queue = client.update_queue(

246

queue=current_queue,

247

update_mask=update_mask

248

)

249

250

print(f'Updated rate limits to {updated_queue.rate_limits.max_dispatches_per_second}/sec')

251

252

# Later, modify retry configuration for better reliability

253

current_queue.retry_config.max_attempts = 5

254

current_queue.retry_config.min_backoff = duration_pb2.Duration(seconds=30)

255

256

retry_update_mask = field_mask_pb2.FieldMask(

257

paths=[

258

'retry_config.max_attempts',

259

'retry_config.min_backoff'

260

]

261

)

262

263

updated_queue = client.update_queue(

264

queue=current_queue,

265

update_mask=retry_update_mask

266

)

267

268

print(f'Updated retry config: {updated_queue.retry_config.max_attempts} attempts')

269

```

270

271

### Queue State Management

272

273

```python

274

from google.cloud import tasks

275

276

client = tasks.CloudTasksClient()

277

queue_path = client.queue_path('my-project-id', 'us-central1', 'state-queue')

278

279

# Check queue state

280

queue = client.get_queue(name=queue_path)

281

print(f'Queue state: {queue.state}')

282

283

if queue.state == tasks.Queue.State.RUNNING:

284

print('Queue is actively processing tasks')

285

elif queue.state == tasks.Queue.State.PAUSED:

286

print('Queue is paused - tasks are not being dispatched')

287

elif queue.state == tasks.Queue.State.DISABLED:

288

print('Queue is disabled')

289

290

# Pause queue for maintenance

291

paused_queue = client.pause_queue(name=queue_path)

292

print(f'Queue paused: {paused_queue.state == tasks.Queue.State.PAUSED}')

293

294

# Resume normal operations

295

resumed_queue = client.resume_queue(name=queue_path)

296

print(f'Queue resumed: {resumed_queue.state == tasks.Queue.State.RUNNING}')

297

```