The semaphore package provides a weighted semaphore implementation for controlling concurrent access to resources. Unlike traditional counting semaphores, weighted semaphores allow each acquire operation to request a custom weight, enabling flexible resource management.
import "golang.org/x/sync/semaphore"A weighted semaphore is useful for:
The semaphore integrates with Go's context package for cancellation and timeouts.
func NewWeighted(n int64) *WeightedCreates a new weighted semaphore with the given maximum combined weight for concurrent access.
Parameters:
n: Maximum combined weight that can be held concurrentlyReturns:
*Weighted: New weighted semaphore instanceExample:
// Create semaphore allowing total weight of 10
sem := semaphore.NewWeighted(10)
// Can acquire:
// - 10 goroutines each with weight 1, or
// - 2 goroutines each with weight 5, or
// - 1 goroutine with weight 7 and 3 goroutines with weight 1, etc.type Weighted struct {
// Has unexported fields
}Weighted provides a way to bound concurrent access to a resource where callers can request access with a given weight.
Concurrency:
func (s *Weighted) Acquire(ctx context.Context, n int64) errorAcquires the semaphore with a weight of n, blocking until resources are available or ctx is done.
Parameters:
ctx: Context for cancellation and timeout controln: Weight to acquire (must not exceed the semaphore's maximum size)Returns:
nil: Successfully acquired the semaphorectx.Err(): Context was canceled or timed out before acquisitionBehavior:
Important: If the requested weight n is greater than the semaphore's maximum size, Acquire will never succeed and will wait until the context is canceled.
Example:
sem := semaphore.NewWeighted(10)
ctx := context.Background()
// Acquire weight of 3
if err := sem.Acquire(ctx, 3); err != nil {
log.Printf("Failed to acquire: %v", err)
return
}
defer sem.Release(3)
// Use the resource...func (s *Weighted) TryAcquire(n int64) boolAcquires the semaphore with a weight of n without blocking.
Parameters:
n: Weight to acquireReturns:
true: Successfully acquired the semaphorefalse: Insufficient weight available (semaphore unchanged)Behavior:
Use cases:
Example:
sem := semaphore.NewWeighted(10)
if sem.TryAcquire(3) {
defer sem.Release(3)
// Use the resource...
} else {
// Resource unavailable, use fallback
useCachedData()
}func (s *Weighted) Release(n int64)Releases the semaphore with a weight of n.
Parameters:
n: Weight to release (must match a previous Acquire or TryAcquire)Behavior:
Important: You must release exactly the weight you acquired. Releasing more than held causes a panic.
Example:
sem := semaphore.NewWeighted(10)
ctx := context.Background()
sem.Acquire(ctx, 5)
defer sem.Release(5) // Must release the same weight
// Use the resource...package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"time"
)
func main() {
// Limit to 3 concurrent operations
sem := semaphore.NewWeighted(3)
ctx := context.Background()
for i := 0; i < 10; i++ {
i := i
go func() {
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("Failed to acquire: %v\n", err)
return
}
defer sem.Release(1)
// Simulate work
fmt.Printf("Worker %d starting\n", i)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", i)
}()
}
time.Sleep(5 * time.Second)
}package main
import (
"context"
"golang.org/x/sync/semaphore"
"log"
)
type Task struct {
ID int
Weight int64 // Resource cost of this task
}
func main() {
// Total resource capacity
sem := semaphore.NewWeighted(100)
ctx := context.Background()
tasks := []Task{
{ID: 1, Weight: 10},
{ID: 2, Weight: 50},
{ID: 3, Weight: 25},
{ID: 4, Weight: 15},
}
for _, task := range tasks {
task := task
go func() {
// Acquire resources proportional to task cost
if err := sem.Acquire(ctx, task.Weight); err != nil {
log.Printf("Task %d failed: %v", task.ID, err)
return
}
defer sem.Release(task.Weight)
log.Printf("Task %d running (weight: %d)", task.ID, task.Weight)
processTask(task)
log.Printf("Task %d complete", task.ID)
}()
}
select {}
}package main
import (
"context"
"database/sql"
"golang.org/x/sync/semaphore"
"time"
)
type ConnectionPool struct {
sem *semaphore.Weighted
db *sql.DB
}
func NewConnectionPool(maxConnections int64, db *sql.DB) *ConnectionPool {
return &ConnectionPool{
sem: semaphore.NewWeighted(maxConnections),
db: db,
}
}
func (p *ConnectionPool) Query(ctx context.Context, query string) error {
// Acquire a connection slot
if err := p.sem.Acquire(ctx, 1); err != nil {
return err
}
defer p.sem.Release(1)
// Use the database connection
rows, err := p.db.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
// Process results...
return nil
}
func main() {
db, _ := sql.Open("postgres", "connstr")
pool := NewConnectionPool(10, db)
ctx := context.Background()
err := pool.Query(ctx, "SELECT * FROM users")
if err != nil {
log.Fatal(err)
}
}package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"time"
)
func main() {
sem := semaphore.NewWeighted(2)
// Fill the semaphore
sem.Acquire(context.Background(), 2)
// Try to acquire with timeout
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("Timeout: %v\n", err) // context deadline exceeded
}
}package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"time"
)
func main() {
sem := semaphore.NewWeighted(5)
ctx := context.Background()
// Primary operation with TryAcquire
if sem.TryAcquire(3) {
defer sem.Release(3)
fmt.Println("Using primary (expensive) operation")
expensiveOperation()
} else {
// Fallback to lighter operation
if err := sem.Acquire(ctx, 1); err != nil {
log.Fatal(err)
}
defer sem.Release(1)
fmt.Println("Using fallback (cheaper) operation")
cheaperOperation()
}
}package main
import (
"context"
"golang.org/x/sync/semaphore"
"time"
)
type RateLimiter struct {
sem *semaphore.Weighted
}
func NewRateLimiter(requestsPerSecond int64) *RateLimiter {
rl := &RateLimiter{
sem: semaphore.NewWeighted(requestsPerSecond),
}
// Refill tokens every second
go func() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
// Release all tokens (reset to full capacity)
for i := int64(0); i < requestsPerSecond; i++ {
rl.sem.TryAcquire(1) // Drain if any left
}
for i := int64(0); i < requestsPerSecond; i++ {
rl.sem.Release(1) // Refill to capacity
}
}
}()
return rl
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
return rl.sem.Acquire(ctx, 1)
}
func main() {
limiter := NewRateLimiter(10) // 10 requests per second
for i := 0; i < 100; i++ {
i := i
go func() {
if err := limiter.Wait(context.Background()); err != nil {
return
}
fmt.Printf("Request %d\n", i)
makeRequest()
}()
}
select {}
}package main
import (
"context"
"golang.org/x/sync/semaphore"
)
type ResourceManager struct {
cpu *semaphore.Weighted
memory *semaphore.Weighted
disk *semaphore.Weighted
}
func NewResourceManager() *ResourceManager {
return &ResourceManager{
cpu: semaphore.NewWeighted(100), // 100% CPU
memory: semaphore.NewWeighted(16), // 16 GB
disk: semaphore.NewWeighted(1000), // 1000 IOPS
}
}
func (rm *ResourceManager) RunTask(ctx context.Context, cpuPct, memoryGB, diskIOPS int64) error {
// Acquire all resources
if err := rm.cpu.Acquire(ctx, cpuPct); err != nil {
return err
}
defer rm.cpu.Release(cpuPct)
if err := rm.memory.Acquire(ctx, memoryGB); err != nil {
return err
}
defer rm.memory.Release(memoryGB)
if err := rm.disk.Acquire(ctx, diskIOPS); err != nil {
return err
}
defer rm.disk.Release(diskIOPS)
// Run task with all resources acquired
return runTask()
}Always use defer for Release: Ensure resources are released even if the function panics or returns early:
if err := sem.Acquire(ctx, n); err != nil {
return err
}
defer sem.Release(n)Match Acquire and Release weights: Always release the exact weight you acquired:
sem.Acquire(ctx, 5)
defer sem.Release(5) // Not 3, not 7, exactly 5Use context for timeouts: Always pass a context to enable cancellation:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := sem.Acquire(ctx, 1)Check weight against capacity: Ensure requested weight doesn't exceed semaphore capacity:
capacity := int64(100)
sem := semaphore.NewWeighted(capacity)
weight := calculateWeight()
if weight > capacity {
return fmt.Errorf("weight %d exceeds capacity %d", weight, capacity)
}
sem.Acquire(ctx, weight)Use TryAcquire for non-critical sections: When blocking is unacceptable, use TryAcquire with a fallback:
if !sem.TryAcquire(1) {
return useCachedValue()
}
defer sem.Release(1)Consider fairness: The semaphore uses FIFO ordering to prevent starvation of large weight requests. Don't mix very large and very small weights if latency is critical for small requests.
When context is canceled, Acquire returns ctx.Err():
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
err := sem.Acquire(ctx, 1)
if err != nil {
fmt.Println(err) // "context canceled"
}Releasing more than held causes a panic:
sem := semaphore.NewWeighted(10)
sem.Acquire(context.Background(), 3)
sem.Release(5) // Panic: "semaphore: released more than held"If requested weight exceeds capacity, Acquire blocks until context is canceled:
sem := semaphore.NewWeighted(10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err := sem.Acquire(ctx, 20) // Will always fail
fmt.Println(err) // "context deadline exceeded"