A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.
Elector interface, implementations, and use cases for distributed deployments.
Leader election ensures that only one instance of your application runs scheduled jobs, even when multiple instances are deployed. This is an all-or-nothing approach: the leader runs all jobs, followers run none.
type Elector interface {
IsLeader(context.Context) error
}type Elector interface {
IsLeader(context.Context) error
}IsLeader is called before each job execution:
nil → this instance is the leader → job runsfunc WithDistributedElector(elector Elector) SchedulerOptionSet the elector for the scheduler:
s, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)Returns ErrWithDistributedElectorNil if elector is nil.
elector.IsLeader(ctx)Instance A (Leader):
09:00 - IsLeader() → nil → Job runs
09:01 - IsLeader() → nil → Job runs
Instance B (Follower):
09:00 - IsLeader() → error → Job skipped
09:01 - IsLeader() → error → Job skippedWhen leadership changes, the new leader starts running jobs:
09:00 - Instance A is leader
09:05 - Instance A crashes
09:05 - Instance B detects leader loss, becomes leader
09:06 - Instance B starts running jobsUse Elector when:
Don't use Elector when:
Use Distributed Locking instead for per-job control.
import (
"context"
"log"
"os"
"time"
"github.com/go-co-op/gocron/v2"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
type etcdElector struct {
client *clientv3.Client
session *concurrency.Session
election *concurrency.Election
instanceID string
}
func NewEtcdElector(client *clientv3.Client, electionPrefix string) (*etcdElector, error) {
// Create session with TTL
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
return nil, err
}
// Create election
election := concurrency.NewElection(session, electionPrefix)
hostname, _ := os.Hostname()
instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid())
e := &etcdElector{
client: client,
session: session,
election: election,
instanceID: instanceID,
}
// Campaign for leadership in background
go func() {
ctx := context.Background()
if err := e.election.Campaign(ctx, instanceID); err != nil {
log.Printf("Campaign failed: %v", err)
}
}()
return e, nil
}
func (e *etcdElector) IsLeader(ctx context.Context) error {
resp, err := e.election.Leader(ctx)
if err != nil {
return err
}
if len(resp.Kvs) == 0 {
return errors.New("no leader elected")
}
if string(resp.Kvs[0].Value) == e.instanceID {
return nil // This instance is leader
}
return errors.New("not the leader")
}
func (e *etcdElector) Close() error {
// Resign from election
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := e.election.Resign(ctx); err != nil {
log.Printf("Resign failed: %v", err)
}
return e.session.Close()
}func main() {
// Connect to etcd
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create elector
elector, err := NewEtcdElector(client, "/gocron/election")
if err != nil {
log.Fatal(err)
}
defer elector.Close()
// Create scheduler with elector
s, err := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
if err != nil {
log.Fatal(err)
}
defer s.Shutdown()
// Add jobs
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(func() {
log.Println("Job running on leader")
}),
gocron.WithName("my-job"),
)
// Start scheduler
s.Start()
// Wait for interrupt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
}import (
"context"
"errors"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
)
type redisElector struct {
rs *redsync.Redsync
mutex *redsync.Mutex
instanceID string
cancel context.CancelFunc
}
func NewRedisElector(client *redis.Client) *redisElector {
pool := goredis.NewPool(client)
rs := redsync.New(pool)
hostname, _ := os.Hostname()
instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid())
e := &redisElector{
rs: rs,
instanceID: instanceID,
mutex: rs.NewMutex(
"gocron-leader",
redsync.WithExpiry(10*time.Second),
redsync.WithTries(1),
),
}
// Continuously try to acquire leadership
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
go e.campaignLoop(ctx)
return e
}
func (e *redisElector) campaignLoop(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Try to acquire or extend lock
if err := e.mutex.ExtendContext(ctx); err != nil {
// Lost lock, try to acquire
e.mutex.LockContext(ctx)
}
}
}
}
func (e *redisElector) IsLeader(ctx context.Context) error {
// Check if we still hold the lock
_, err := e.mutex.ExtendContext(ctx)
if err != nil {
return errors.New("not the leader")
}
return nil
}
func (e *redisElector) Close() error {
e.cancel()
_, err := e.mutex.Unlock()
return err
}func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer client.Close()
elector := NewRedisElector(client)
defer elector.Close()
s, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
defer s.Shutdown()
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(doWork),
)
s.Start()
select {}
}import (
"context"
"errors"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/go-co-op/gocron/v2"
)
type consulElector struct {
client *consulapi.Client
sessionID string
key string
instanceID string
cancel context.CancelFunc
}
func NewConsulElector(client *consulapi.Client, keyPrefix string) (*consulElector, error) {
hostname, _ := os.Hostname()
instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid())
// Create session
session := client.Session()
sessionID, _, err := session.Create(&consulapi.SessionEntry{
Name: "gocron-leader",
TTL: "10s",
Behavior: "release",
LockDelay: time.Second,
}, nil)
if err != nil {
return nil, err
}
e := &consulElector{
client: client,
sessionID: sessionID,
key: keyPrefix + "/leader",
instanceID: instanceID,
}
// Try to acquire lock in background
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
go e.campaignLoop(ctx)
return e, nil
}
func (e *consulElector) campaignLoop(ctx context.Context) {
kv := e.client.KV()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Try to acquire lock
acquired, _, err := kv.Acquire(&consulapi.KVPair{
Key: e.key,
Value: []byte(e.instanceID),
Session: e.sessionID,
}, nil)
if err != nil {
log.Printf("Lock acquire failed: %v", err)
} else if acquired {
log.Printf("Acquired leadership")
}
}
}
}
func (e *consulElector) IsLeader(ctx context.Context) error {
kv := e.client.KV()
pair, _, err := kv.Get(e.key, nil)
if err != nil {
return err
}
if pair == nil || pair.Session != e.sessionID {
return errors.New("not the leader")
}
return nil
}
func (e *consulElector) Close() error {
e.cancel()
// Release lock
kv := e.client.KV()
_, _, err := kv.Release(&consulapi.KVPair{
Key: e.key,
Session: e.sessionID,
}, nil)
// Destroy session
session := e.client.Session()
_, _ = session.Destroy(e.sessionID, nil)
return err
}type resilientElector struct {
elector Elector
isLeader bool
mu sync.RWMutex
lastCheck time.Time
cacheTTL time.Duration
}
func (e *resilientElector) IsLeader(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
// Cache result for short time
if time.Since(e.lastCheck) < e.cacheTTL {
if e.isLeader {
return nil
}
return errors.New("not leader (cached)")
}
// Check actual leadership
err := e.elector.IsLeader(ctx)
e.isLeader = (err == nil)
e.lastCheck = time.Now()
if err != nil {
log.Printf("Leadership check failed: %v", err)
}
return err
}func monitorLeadership(elector Elector) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := elector.IsLeader(ctx)
cancel()
if err == nil {
log.Println("Status: Leader")
metrics.RecordLeadershipStatus(1)
} else {
log.Println("Status: Follower")
metrics.RecordLeadershipStatus(0)
}
}
}// TTL should balance:
// - Fast failover (shorter TTL)
// - Network stability (longer TTL)
// Fast failover (5-10s)
session, _ := concurrency.NewSession(client, concurrency.WithTTL(5))
// Stable networks (15-30s)
session, _ := concurrency.NewSession(client, concurrency.WithTTL(20))Recommendation: Start with 10s, adjust based on network stability and failover requirements.
type healthyElector struct {
elector Elector
healthChecker func() bool
}
func (e *healthyElector) IsLeader(ctx context.Context) error {
// Check our own health first
if !e.healthChecker() {
log.Println("Instance unhealthy, not claiming leadership")
return errors.New("instance unhealthy")
}
return e.elector.IsLeader(ctx)
}type loggingElector struct {
elector Elector
wasLeader bool
mu sync.Mutex
}
func (e *loggingElector) IsLeader(ctx context.Context) error {
err := e.elector.IsLeader(ctx)
isLeader := (err == nil)
e.mu.Lock()
defer e.mu.Unlock()
if isLeader != e.wasLeader {
if isLeader {
log.Println("*** Became leader ***")
} else {
log.Println("*** Lost leadership ***")
}
e.wasLeader = isLeader
}
return err
}j, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(func(ctx context.Context) {
// Check context for leadership loss
select {
case <-ctx.Done():
log.Println("Leadership lost during execution, cleaning up...")
cleanup()
return
default:
doWork()
}
}),
)Prevent split-brain with fencing tokens:
type fencedElector struct {
elector Elector
leaderEpoch int64
mu sync.RWMutex
}
func (e *fencedElector) IsLeader(ctx context.Context) error {
err := e.elector.IsLeader(ctx)
if err != nil {
return err
}
// Get current epoch from coordinator
epoch := e.fetchEpoch(ctx)
e.mu.Lock()
defer e.mu.Unlock()
if epoch > e.leaderEpoch {
e.leaderEpoch = epoch
log.Printf("New leadership epoch: %d", epoch)
}
return nil
}Voluntarily give up leadership on errors:
j, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(doWork),
gocron.WithEventListeners(
gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
// Critical error → resign leadership
if isCriticalError(err) {
log.Printf("Critical error, resigning leadership: %v", err)
elector.Resign()
}
}),
),
)Symptom: All instances skip jobs.
Causes:
IsLeader() checkSolutions:
Symptom: Jobs run on multiple instances simultaneously.
Causes:
Solutions:
Symptom: Leadership flips between instances often.
Causes:
Solutions:
Symptom: Leader crashes, but follower doesn't take over.
Causes:
Solutions:
Elector interfaceWithDistributedElectorInstall with Tessl CLI
npx tessl i tessl/golang-github-com-go-co-op-gocron-v2docs
api
examples
guides