The singleflight package provides a duplicate function call suppression mechanism. It ensures that only one execution is in-flight for a given key at a time, while duplicate callers wait for the original to complete and receive the same results.
import "golang.org/x/sync/singleflight"Singleflight is particularly useful for:
type Group struct {
// Has unexported fields
}Group represents a class of work and forms a namespace in which units of work can be executed with duplicate suppression.
Zero value:
Concurrency:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)Executes and returns the results of the given function, making sure that only one execution is in-flight for a given key at a time.
Parameters:
key: Unique identifier for this operationfn: Function to execute. Called at most once per key concurrently.Returns:
v: The result value from the functionerr: Any error returned by the functionshared: true if the result was shared with multiple callers (this call waited for another)Behavior:
Example:
var g singleflight.Group
v, err, shared := g.Do("cache-key", func() (interface{}, error) {
// This executes only once even if called by multiple goroutines
return fetchFromDatabase()
})
if shared {
fmt.Println("Result was shared with other callers")
}func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan ResultLike Do but returns a channel that will receive the results when they are ready.
Parameters:
key: Unique identifier for this operationfn: Function to execute. Called at most once per key concurrently.Returns:
<-chan Result: Channel that will receive one Result value when the operation completesBehavior:
Use cases:
Important: The returned channel is never closed. Don't range over it or wait for closure.
Example:
var g singleflight.Group
ch := g.DoChan("cache-key", func() (interface{}, error) {
return fetchFromDatabase()
})
select {
case result := <-ch:
if result.Err != nil {
log.Printf("Error: %v", result.Err)
} else {
log.Printf("Value: %v (shared: %v)", result.Val, result.Shared)
}
case <-time.After(5 * time.Second):
log.Println("Timeout")
}func (g *Group) Forget(key string)Tells the singleflight to forget about a key.
Parameters:
key: The key to forgetBehavior:
Use cases:
Important: Forget only affects future calls. In-flight operations are not affected.
Example:
var g singleflight.Group
// Start an operation
go func() {
g.Do("key", func() (interface{}, error) {
time.Sleep(5 * time.Second)
return "result", nil
})
}()
time.Sleep(100 * time.Millisecond)
// Forget the key (in-flight operation continues)
g.Forget("key")
// This will start a NEW execution instead of waiting
g.Do("key", func() (interface{}, error) {
return "new result", nil
})type Result struct {
Val interface{}
Err error
Shared bool
}Result holds the results of Do, so they can be passed on a channel.
Fields:
Val: The result value from the functionErr: Any error returned by the functionShared: true if v was given to multiple callerspackage main
import (
"fmt"
"golang.org/x/sync/singleflight"
"time"
)
type Cache struct {
g singleflight.Group
store map[string]string
}
func (c *Cache) Get(key string) (string, error) {
// Check cache first
if val, ok := c.store[key]; ok {
return val, nil
}
// Use singleflight to prevent duplicate fetches
v, err, shared := c.g.Do(key, func() (interface{}, error) {
fmt.Printf("Fetching %s from database\n", key)
time.Sleep(100 * time.Millisecond) // Simulate DB call
return fetchFromDB(key)
})
if err != nil {
return "", err
}
result := v.(string)
// Update cache
c.store[key] = result
if shared {
fmt.Printf("Result for %s was shared\n", key)
}
return result, nil
}
func main() {
cache := &Cache{store: make(map[string]string)}
// Multiple concurrent requests for same key
for i := 0; i < 10; i++ {
go func() {
val, err := cache.Get("user:123")
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Got: %s\n", val)
}
}()
}
time.Sleep(time.Second)
// Output: "Fetching user:123 from database" appears only once
}package main
import (
"encoding/json"
"fmt"
"golang.org/x/sync/singleflight"
"net/http"
)
type APIClient struct {
g singleflight.Group
}
func (c *APIClient) FetchUser(userID string) (*User, error) {
key := fmt.Sprintf("user:%s", userID)
v, err, shared := c.g.Do(key, func() (interface{}, error) {
// Only one request to the API even if multiple goroutines call this
resp, err := http.Get(fmt.Sprintf("https://api.example.com/users/%s", userID))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return nil, err
}
return &user, nil
})
if err != nil {
return nil, err
}
if shared {
fmt.Printf("User %s was fetched by another goroutine\n", userID)
}
return v.(*User), nil
}package main
import (
"context"
"fmt"
"golang.org/x/sync/singleflight"
"time"
)
func fetchWithTimeout(g *singleflight.Group, key string, timeout time.Duration) (interface{}, error) {
ch := g.DoChan(key, func() (interface{}, error) {
time.Sleep(2 * time.Second) // Simulate slow operation
return "result", nil
})
select {
case result := <-ch:
return result.Val, result.Err
case <-time.After(timeout):
// Note: The operation continues in the background
return nil, fmt.Errorf("timeout after %v", timeout)
}
}
func main() {
var g singleflight.Group
// This will timeout
val, err := fetchWithTimeout(&g, "key", 1*time.Second)
if err != nil {
fmt.Printf("Error: %v\n", err)
}
// This will get the result from the still-running operation
time.Sleep(1500 * time.Millisecond)
val, err = fetchWithTimeout(&g, "key", 1*time.Second)
if err == nil {
fmt.Printf("Got result: %v\n", val)
}
}package main
import (
"context"
"fmt"
"golang.org/x/sync/singleflight"
"time"
)
func doWithContext(ctx context.Context, g *singleflight.Group, key string) (interface{}, error) {
type result struct {
val interface{}
err error
}
ch := make(chan result, 1)
go func() {
v, err, _ := g.Do(key, func() (interface{}, error) {
time.Sleep(2 * time.Second)
return "result", nil
})
ch <- result{val: v, err: err}
}()
select {
case r := <-ch:
return r.val, r.err
case <-ctx.Done():
// Operation continues, but we stop waiting
return nil, ctx.Err()
}
}
func main() {
var g singleflight.Group
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
val, err := doWithContext(ctx, &g, "key")
if err != nil {
fmt.Printf("Error: %v\n", err) // "context deadline exceeded"
}
}package main
import (
"golang.org/x/sync/singleflight"
"time"
)
type CacheWithTTL struct {
g singleflight.Group
ttl time.Duration
}
func (c *CacheWithTTL) Get(key string) (interface{}, error) {
v, err, _ := c.g.Do(key, func() (interface{}, error) {
// Schedule forget after TTL
time.AfterFunc(c.ttl, func() {
c.g.Forget(key)
})
return fetchExpensiveData(key)
})
return v, err
}
func main() {
cache := &CacheWithTTL{ttl: 5 * time.Second}
// First call: fetches data
cache.Get("key")
// Within TTL: reuses first call's result
time.Sleep(2 * time.Second)
cache.Get("key")
// After TTL: makes new fetch
time.Sleep(4 * time.Second)
cache.Get("key") // New fetch
}package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"time"
)
func main() {
var g singleflight.Group
var wg sync.WaitGroup
// Different keys execute concurrently
keys := []string{"key1", "key2", "key3"}
for _, key := range keys {
for i := 0; i < 5; i++ {
wg.Add(1)
go func(k string, id int) {
defer wg.Done()
v, err, shared := g.Do(k, func() (interface{}, error) {
fmt.Printf("Executing for %s\n", k)
time.Sleep(100 * time.Millisecond)
return k + "-result", nil
})
fmt.Printf("[%s-%d] Got: %v, Shared: %v, Err: %v\n", k, id, v, shared, err)
}(key, i)
}
}
wg.Wait()
// Each key executes once, but different keys execute concurrently
}Errors returned by fn are propagated to all callers:
var g singleflight.Group
v, err, shared := g.Do("key", func() (interface{}, error) {
return nil, fmt.Errorf("something went wrong")
})
// err contains the error, shared indicates if other goroutines also got itIf fn panics, the panic is re-raised in all waiting goroutines:
var g singleflight.Group
// This will panic in all goroutines calling Do with this key
g.Do("key", func() (interface{}, error) {
panic("something went wrong")
})To handle panics, wrap your function:
g.Do("key", func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
// Your code here
return nil
})If fn calls runtime.Goexit(), singleflight handles it specially:
Use descriptive keys: Keys should uniquely identify the operation and its parameters:
key := fmt.Sprintf("user:%s:posts:%d:%d", userID, page, limit)
g.Do(key, fetchFunc)Don't rely on result caching: Singleflight does not cache results. Once all waiting goroutines receive the result, the next call will execute fn again:
// First call: executes fn
g.Do("key", fn)
// All goroutines receive result, key is cleared
// Next call: executes fn again (not cached)
g.Do("key", fn)Type assert results carefully: The result is interface{}, always check the type:
v, err, _ := g.Do("key", fn)
if err != nil {
return err
}
result, ok := v.(*MyType)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}Use Forget for error retry: If fn fails, other goroutines will receive the error. Use Forget to allow retry:
v, err, _ := g.Do("key", fn)
if err != nil {
g.Forget("key") // Allow next call to retry
return err
}Remember DoChan channels don't close: Don't range over or wait for channel closure:
// Wrong
for result := range g.DoChan("key", fn) { // Will block forever
process(result)
}
// Correct
result := <-g.DoChan("key", fn)
process(result)Consider using separate Groups for different operations: Groups with unrelated keys can be separated for clarity:
type Service struct {
userFlight singleflight.Group
postFlight singleflight.Group
}Be aware of in-flight operations: Operations continue even after Forget or timeout. Design fn to be cancellation-aware if needed.