A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.
Interfaces for distributed job coordination.
type Locker interface {
Lock(ctx context.Context, key string) (Lock, error)
}
type Lock interface {
Unlock(ctx context.Context) error
}Distributed locking interface for multi-instance deployments.
func WithDistributedLocker(locker Locker) SchedulerOptionApplies to all jobs. Returns ErrWithDistributedLockerNil if nil.
func WithDistributedJobLocker(locker Locker) JobOptionOverrides scheduler-level locker for a specific job. Returns ErrWithDistributedJobLockerNil if nil.
func WithDisabledDistributedJobLocker(disabled bool) JobOptionDisables the scheduler-level locker for a specific job.
type redisLocker struct {
client *redis.Client
}
func (l *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
ok, err := l.client.SetNX(ctx, "gocron:"+key, "1", 30*time.Second).Result()
if err != nil {
return nil, err
}
if !ok {
return nil, errors.New("lock not acquired")
}
return &redisLock{client: l.client, key: "gocron:" + key}, nil
}
type redisLock struct {
client *redis.Client
key string
}
func (l *redisLock) Unlock(ctx context.Context) error {
return l.client.Del(ctx, l.key).Err()
}
s, _ := gocron.NewScheduler(
gocron.WithDistributedLocker(&redisLocker{client: redisClient}),
)type etcdLocker struct {
client *clientv3.Client
ttl int
}
func (l *etcdLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
lease, err := l.client.Grant(ctx, int64(l.ttl))
if err != nil {
return nil, err
}
lockKey := "/gocron/locks/" + key
txn := l.client.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "locked", clientv3.WithLease(lease.ID))).
Else(clientv3.OpGet(lockKey))
resp, err := txn.Commit()
if err != nil {
return nil, err
}
if !resp.Succeeded {
return nil, errors.New("lock not acquired")
}
return &etcdLock{
client: l.client,
key: lockKey,
leaseID: lease.ID,
}, nil
}
type etcdLock struct {
client *clientv3.Client
key string
leaseID clientv3.LeaseID
}
func (l *etcdLock) Unlock(ctx context.Context) error {
_, err := l.client.Revoke(ctx, l.leaseID)
return err
}type Elector interface {
IsLeader(context.Context) error
}Leader election interface for distributed deployments. Only the elected leader runs jobs.
func WithDistributedElector(elector Elector) SchedulerOptionReturns ErrWithDistributedElectorNil if nil.
gocron calls IsLeader before each job run. If it returns an error, the job is skipped.
type etcdElector struct {
session *concurrency.Session
election *concurrency.Election
mu sync.RWMutex
isLeader bool
}
func NewEtcdElector(client *clientv3.Client, electionKey string) (*etcdElector, error) {
session, err := concurrency.NewSession(client)
if err != nil {
return nil, err
}
election := concurrency.NewElection(session, electionKey)
elector := &etcdElector{
session: session,
election: election,
}
// Campaign for leadership in background
go func() {
if err := election.Campaign(context.Background(), "instance"); err != nil {
log.Printf("Campaign error: %v", err)
return
}
elector.mu.Lock()
elector.isLeader = true
elector.mu.Unlock()
}()
return elector, nil
}
func (e *etcdElector) IsLeader(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.isLeader {
return errors.New("not leader")
}
return nil
}
s, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)type k8sElector struct {
leaderElector *leaderelection.LeaderElector
mu sync.RWMutex
isLeader bool
}
func NewK8sElector(clientset *kubernetes.Clientset, namespace, name string) (*k8sElector, error) {
elector := &k8sElector{}
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: os.Getenv("HOSTNAME"),
},
}
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
elector.mu.Lock()
elector.isLeader = true
elector.mu.Unlock()
},
OnStoppedLeading: func() {
elector.mu.Lock()
elector.isLeader = false
elector.mu.Unlock()
},
},
})
if err != nil {
return nil, err
}
elector.leaderElector = leaderElector
go leaderElector.Run(context.Background())
return elector, nil
}
func (e *k8sElector) IsLeader(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.isLeader {
return errors.New("not leader")
}
return nil
}// Only leader runs jobs
s, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
j, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(myFunc),
)
// Runs on leader only// All instances try to run, but only one acquires lock
s, _ := gocron.NewScheduler(
gocron.WithDistributedLocker(locker),
)
j, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(myFunc),
gocron.WithName("my-job"), // Lock key
)
// First to acquire lock runs// Leader attempts lock (belt and suspenders)
s, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
gocron.WithDistributedLocker(locker),
)
j, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(myFunc),
gocron.WithName("my-job"),
)
// Leader tries to acquire lock before runnings, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
gocron.WithDistributedLocker(locker),
)
// Coordinated job
j1, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(coordinatedTask),
gocron.WithName("coordinated"),
)
// Node-local job (no coordination)
j2, _ := s.NewJob(
gocron.DurationJob(30*time.Second),
gocron.NewTask(localTask),
gocron.WithName("local"),
gocron.WithDisabledDistributedJobLocker(true),
)WithName("my-stable-name")Use AfterLockError listener to monitor lock failures:
j, _ := s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(myFunc),
gocron.WithEventListeners(
gocron.AfterLockError(func(jobID uuid.UUID, jobName string, err error) {
log.Printf("Lock failed for %s: %v", jobName, err)
metricsLockFailures.WithLabelValues(jobName).Inc()
}),
),
)| Aspect | Leader Election | Distributed Locking |
|---|---|---|
| Coordination | Instance-level | Job-level |
| Scope | All jobs | Per job |
| Overhead | One election per cluster | Lock per job run |
| Failover | Automatic on leader failure | Automatic on lock timeout |
| Use case | Centralized control | Decentralized safety |
Install with Tessl CLI
npx tessl i tessl/golang-github-com-go-co-op-gocron-v2docs
api
examples
guides