or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

errgroup.mdindex.mdsemaphore.mdsingleflight.mdsyncmap.md
tile.json

semaphore.mddocs/

Semaphore - Weighted Resource Limiting

The semaphore package provides a weighted semaphore implementation for controlling concurrent access to resources. Unlike traditional counting semaphores, weighted semaphores allow each acquire operation to request a custom weight, enabling flexible resource management.

Import

import "golang.org/x/sync/semaphore"

Overview

A weighted semaphore is useful for:

  • Limiting concurrent access to resources (connection pools, file handles, etc.)
  • Implementing rate limiting with different costs per operation
  • Managing resources with varying consumption rates
  • Coordinating access to multiple units of the same resource

The semaphore integrates with Go's context package for cancellation and timeouts.

API Reference

Creating a Semaphore

NewWeighted

func NewWeighted(n int64) *Weighted

Creates a new weighted semaphore with the given maximum combined weight for concurrent access.

Parameters:

  • n: Maximum combined weight that can be held concurrently

Returns:

  • *Weighted: New weighted semaphore instance

Example:

// Create semaphore allowing total weight of 10
sem := semaphore.NewWeighted(10)

// Can acquire:
// - 10 goroutines each with weight 1, or
// - 2 goroutines each with weight 5, or
// - 1 goroutine with weight 7 and 3 goroutines with weight 1, etc.

Weighted Type

type Weighted struct {
    // Has unexported fields
}

Weighted provides a way to bound concurrent access to a resource where callers can request access with a given weight.

Concurrency:

  • Safe for concurrent use by multiple goroutines
  • All methods can be called concurrently

Acquiring Resources

Acquire

func (s *Weighted) Acquire(ctx context.Context, n int64) error

Acquires the semaphore with a weight of n, blocking until resources are available or ctx is done.

Parameters:

  • ctx: Context for cancellation and timeout control
  • n: Weight to acquire (must not exceed the semaphore's maximum size)

Returns:

  • nil: Successfully acquired the semaphore
  • ctx.Err(): Context was canceled or timed out before acquisition

Behavior:

  • Blocks if insufficient weight is available
  • Maintains FIFO ordering for waiters to prevent starvation
  • If n exceeds the semaphore's total size, waits for ctx cancellation and returns ctx.Err()
  • Leaves the semaphore unchanged on failure

Important: If the requested weight n is greater than the semaphore's maximum size, Acquire will never succeed and will wait until the context is canceled.

Example:

sem := semaphore.NewWeighted(10)
ctx := context.Background()

// Acquire weight of 3
if err := sem.Acquire(ctx, 3); err != nil {
    log.Printf("Failed to acquire: %v", err)
    return
}
defer sem.Release(3)

// Use the resource...

TryAcquire

func (s *Weighted) TryAcquire(n int64) bool

Acquires the semaphore with a weight of n without blocking.

Parameters:

  • n: Weight to acquire

Returns:

  • true: Successfully acquired the semaphore
  • false: Insufficient weight available (semaphore unchanged)

Behavior:

  • Returns immediately without blocking
  • Only succeeds if the full weight n is immediately available
  • Leaves the semaphore unchanged on failure

Use cases:

  • Optimistic concurrency patterns
  • Avoiding blocking in critical sections
  • Implementing fallback strategies when resources are unavailable

Example:

sem := semaphore.NewWeighted(10)

if sem.TryAcquire(3) {
    defer sem.Release(3)
    // Use the resource...
} else {
    // Resource unavailable, use fallback
    useCachedData()
}

Releasing Resources

Release

func (s *Weighted) Release(n int64)

Releases the semaphore with a weight of n.

Parameters:

  • n: Weight to release (must match a previous Acquire or TryAcquire)

Behavior:

  • Releases the specified weight back to the semaphore
  • May wake up waiting goroutines if sufficient weight becomes available
  • Panics if releasing more than currently held

Important: You must release exactly the weight you acquired. Releasing more than held causes a panic.

Example:

sem := semaphore.NewWeighted(10)
ctx := context.Background()

sem.Acquire(ctx, 5)
defer sem.Release(5)  // Must release the same weight

// Use the resource...

Usage Examples

Basic Resource Limiting

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func main() {
    // Limit to 3 concurrent operations
    sem := semaphore.NewWeighted(3)
    ctx := context.Background()

    for i := 0; i < 10; i++ {
        i := i
        go func() {
            if err := sem.Acquire(ctx, 1); err != nil {
                fmt.Printf("Failed to acquire: %v\n", err)
                return
            }
            defer sem.Release(1)

            // Simulate work
            fmt.Printf("Worker %d starting\n", i)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d done\n", i)
        }()
    }

    time.Sleep(5 * time.Second)
}

Weighted Resource Management

package main

import (
    "context"
    "golang.org/x/sync/semaphore"
    "log"
)

type Task struct {
    ID     int
    Weight int64  // Resource cost of this task
}

func main() {
    // Total resource capacity
    sem := semaphore.NewWeighted(100)
    ctx := context.Background()

    tasks := []Task{
        {ID: 1, Weight: 10},
        {ID: 2, Weight: 50},
        {ID: 3, Weight: 25},
        {ID: 4, Weight: 15},
    }

    for _, task := range tasks {
        task := task
        go func() {
            // Acquire resources proportional to task cost
            if err := sem.Acquire(ctx, task.Weight); err != nil {
                log.Printf("Task %d failed: %v", task.ID, err)
                return
            }
            defer sem.Release(task.Weight)

            log.Printf("Task %d running (weight: %d)", task.ID, task.Weight)
            processTask(task)
            log.Printf("Task %d complete", task.ID)
        }()
    }

    select {}
}

Connection Pool Management

package main

import (
    "context"
    "database/sql"
    "golang.org/x/sync/semaphore"
    "time"
)

type ConnectionPool struct {
    sem *semaphore.Weighted
    db  *sql.DB
}

func NewConnectionPool(maxConnections int64, db *sql.DB) *ConnectionPool {
    return &ConnectionPool{
        sem: semaphore.NewWeighted(maxConnections),
        db:  db,
    }
}

func (p *ConnectionPool) Query(ctx context.Context, query string) error {
    // Acquire a connection slot
    if err := p.sem.Acquire(ctx, 1); err != nil {
        return err
    }
    defer p.sem.Release(1)

    // Use the database connection
    rows, err := p.db.QueryContext(ctx, query)
    if err != nil {
        return err
    }
    defer rows.Close()

    // Process results...
    return nil
}

func main() {
    db, _ := sql.Open("postgres", "connstr")
    pool := NewConnectionPool(10, db)

    ctx := context.Background()
    err := pool.Query(ctx, "SELECT * FROM users")
    if err != nil {
        log.Fatal(err)
    }
}

With Timeout

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func main() {
    sem := semaphore.NewWeighted(2)

    // Fill the semaphore
    sem.Acquire(context.Background(), 2)

    // Try to acquire with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()

    if err := sem.Acquire(ctx, 1); err != nil {
        fmt.Printf("Timeout: %v\n", err)  // context deadline exceeded
    }
}

Try Pattern with Fallback

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func main() {
    sem := semaphore.NewWeighted(5)
    ctx := context.Background()

    // Primary operation with TryAcquire
    if sem.TryAcquire(3) {
        defer sem.Release(3)
        fmt.Println("Using primary (expensive) operation")
        expensiveOperation()
    } else {
        // Fallback to lighter operation
        if err := sem.Acquire(ctx, 1); err != nil {
            log.Fatal(err)
        }
        defer sem.Release(1)
        fmt.Println("Using fallback (cheaper) operation")
        cheaperOperation()
    }
}

Rate Limiting

package main

import (
    "context"
    "golang.org/x/sync/semaphore"
    "time"
)

type RateLimiter struct {
    sem *semaphore.Weighted
}

func NewRateLimiter(requestsPerSecond int64) *RateLimiter {
    rl := &RateLimiter{
        sem: semaphore.NewWeighted(requestsPerSecond),
    }

    // Refill tokens every second
    go func() {
        ticker := time.NewTicker(time.Second)
        for range ticker.C {
            // Release all tokens (reset to full capacity)
            for i := int64(0); i < requestsPerSecond; i++ {
                rl.sem.TryAcquire(1)  // Drain if any left
            }
            for i := int64(0); i < requestsPerSecond; i++ {
                rl.sem.Release(1)  // Refill to capacity
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Wait(ctx context.Context) error {
    return rl.sem.Acquire(ctx, 1)
}

func main() {
    limiter := NewRateLimiter(10)  // 10 requests per second

    for i := 0; i < 100; i++ {
        i := i
        go func() {
            if err := limiter.Wait(context.Background()); err != nil {
                return
            }
            fmt.Printf("Request %d\n", i)
            makeRequest()
        }()
    }

    select {}
}

Multi-Resource Coordination

package main

import (
    "context"
    "golang.org/x/sync/semaphore"
)

type ResourceManager struct {
    cpu    *semaphore.Weighted
    memory *semaphore.Weighted
    disk   *semaphore.Weighted
}

func NewResourceManager() *ResourceManager {
    return &ResourceManager{
        cpu:    semaphore.NewWeighted(100),  // 100% CPU
        memory: semaphore.NewWeighted(16),   // 16 GB
        disk:   semaphore.NewWeighted(1000), // 1000 IOPS
    }
}

func (rm *ResourceManager) RunTask(ctx context.Context, cpuPct, memoryGB, diskIOPS int64) error {
    // Acquire all resources
    if err := rm.cpu.Acquire(ctx, cpuPct); err != nil {
        return err
    }
    defer rm.cpu.Release(cpuPct)

    if err := rm.memory.Acquire(ctx, memoryGB); err != nil {
        return err
    }
    defer rm.memory.Release(memoryGB)

    if err := rm.disk.Acquire(ctx, diskIOPS); err != nil {
        return err
    }
    defer rm.disk.Release(diskIOPS)

    // Run task with all resources acquired
    return runTask()
}

Best Practices

  1. Always use defer for Release: Ensure resources are released even if the function panics or returns early:

    if err := sem.Acquire(ctx, n); err != nil {
        return err
    }
    defer sem.Release(n)
  2. Match Acquire and Release weights: Always release the exact weight you acquired:

    sem.Acquire(ctx, 5)
    defer sem.Release(5)  // Not 3, not 7, exactly 5
  3. Use context for timeouts: Always pass a context to enable cancellation:

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    err := sem.Acquire(ctx, 1)
  4. Check weight against capacity: Ensure requested weight doesn't exceed semaphore capacity:

    capacity := int64(100)
    sem := semaphore.NewWeighted(capacity)
    
    weight := calculateWeight()
    if weight > capacity {
        return fmt.Errorf("weight %d exceeds capacity %d", weight, capacity)
    }
    sem.Acquire(ctx, weight)
  5. Use TryAcquire for non-critical sections: When blocking is unacceptable, use TryAcquire with a fallback:

    if !sem.TryAcquire(1) {
        return useCachedValue()
    }
    defer sem.Release(1)
  6. Consider fairness: The semaphore uses FIFO ordering to prevent starvation of large weight requests. Don't mix very large and very small weights if latency is critical for small requests.

Error Handling

Context Cancellation

When context is canceled, Acquire returns ctx.Err():

ctx, cancel := context.WithCancel(context.Background())
go func() {
    time.Sleep(100 * time.Millisecond)
    cancel()
}()

err := sem.Acquire(ctx, 1)
if err != nil {
    fmt.Println(err)  // "context canceled"
}

Panic on Over-Release

Releasing more than held causes a panic:

sem := semaphore.NewWeighted(10)
sem.Acquire(context.Background(), 3)
sem.Release(5)  // Panic: "semaphore: released more than held"

Weight Exceeds Capacity

If requested weight exceeds capacity, Acquire blocks until context is canceled:

sem := semaphore.NewWeighted(10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := sem.Acquire(ctx, 20)  // Will always fail
fmt.Println(err)  // "context deadline exceeded"