A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.
Run gocron v2 across multiple instances using Elector and Locker implementations.
Complete Redis-based distributed locking:
package main
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/redis/go-redis/v9"
)
type redisLocker struct {
client *redis.Client
}
func NewRedisLocker(addr string) (*redisLocker, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
})
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis connection failed: %w", err)
}
return &redisLocker{client: client}, nil
}
func (l *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
lockKey := "gocron:lock:" + key
// TTL should be 2-3x max expected job duration
ttl := 5 * time.Minute
ok, err := l.client.SetNX(ctx, lockKey, "1", ttl).Result()
if err != nil {
return nil, fmt.Errorf("lock acquisition failed: %w", err)
}
if !ok {
return nil, errors.New("lock already held")
}
return &redisLock{client: l.client, key: lockKey}, nil
}
func (l *redisLocker) Close() error {
return l.client.Close()
}
type redisLock struct {
client *redis.Client
key string
}
func (l *redisLock) Unlock(ctx context.Context) error {
return l.client.Del(ctx, l.key).Err()
}
func main() {
// Create Redis locker
locker, err := NewRedisLocker("localhost:6379")
if err != nil {
panic(err)
}
defer locker.Close()
// Create scheduler with distributed locker
s, err := gocron.NewScheduler(
gocron.WithDistributedLocker(locker),
gocron.WithLogger(gocron.NewLogger(gocron.LogLevelInfo)),
)
if err != nil {
panic(err)
}
defer s.Shutdown()
// Add jobs with stable names
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(func() {
fmt.Println("Job 1 running on instance", os.Getenv("INSTANCE_ID"))
}),
gocron.WithName("job-1"), // Stable name for lock key
gocron.WithEventListeners(
gocron.AfterLockError(func(jobID uuid.UUID, jobName string, err error) {
fmt.Printf("Lock failed for %s: %v\n", jobName, err)
}),
),
)
s.NewJob(
gocron.DurationJob(2*time.Minute),
gocron.NewTask(func() {
fmt.Println("Job 2 running on instance", os.Getenv("INSTANCE_ID"))
}),
gocron.WithName("job-2"),
)
s.Start()
fmt.Println("Scheduler started on instance", os.Getenv("INSTANCE_ID"))
// Wait for interrupt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
fmt.Println("Shutting down...")
}For long-running jobs, renew the lock periodically:
type renewableRedisLock struct {
client *redis.Client
key string
ttl time.Duration
cancel context.CancelFunc
}
func (l *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
lockKey := "gocron:lock:" + key
ttl := 5 * time.Minute
ok, err := l.client.SetNX(ctx, lockKey, "1", ttl).Result()
if err != nil {
return nil, err
}
if !ok {
return nil, errors.New("lock already held")
}
renewCtx, cancel := context.WithCancel(context.Background())
lock := &renewableRedisLock{
client: l.client,
key: lockKey,
ttl: ttl,
cancel: cancel,
}
// Start renewal goroutine
go lock.renewLoop(renewCtx)
return lock, nil
}
func (l *renewableRedisLock) renewLoop(ctx context.Context) {
ticker := time.NewTicker(l.ttl / 2) // Renew at half TTL
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Renew lock
l.client.Expire(ctx, l.key, l.ttl)
}
}
}
func (l *renewableRedisLock) Unlock(ctx context.Context) error {
l.cancel() // Stop renewal
return l.client.Del(ctx, l.key).Err()
}Leader election using etcd:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-co-op/gocron/v2"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
type etcdElector struct {
client *clientv3.Client
session *concurrency.Session
election *concurrency.Election
cancel context.CancelFunc
}
func NewEtcdElector(endpoints []string, electionPrefix string) (*etcdElector, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, err
}
election := concurrency.NewElection(session, electionPrefix)
ctx, cancel := context.WithCancel(context.Background())
e := &etcdElector{
client: client,
session: session,
election: election,
cancel: cancel,
}
// Campaign for leadership
go func() {
instanceID := os.Getenv("INSTANCE_ID")
if instanceID == "" {
instanceID = fmt.Sprintf("instance-%d", time.Now().Unix())
}
if err := e.election.Campaign(ctx, instanceID); err != nil && ctx.Err() == nil {
fmt.Printf("Campaign failed: %v\n", err)
}
}()
return e, nil
}
func (e *etcdElector) IsLeader(ctx context.Context) error {
resp, err := e.election.Leader(ctx)
if err != nil {
return err
}
instanceID := os.Getenv("INSTANCE_ID")
if instanceID == "" {
instanceID = fmt.Sprintf("instance-%d", time.Now().Unix())
}
if string(resp.Kvs[0].Value) == instanceID {
return nil // This instance is leader
}
return fmt.Errorf("not the leader")
}
func (e *etcdElector) Close() error {
e.cancel()
if err := e.election.Resign(context.Background()); err != nil {
return err
}
if err := e.session.Close(); err != nil {
return err
}
return e.client.Close()
}
func main() {
// Create elector
elector, err := NewEtcdElector([]string{"localhost:2379"}, "/gocron/election")
if err != nil {
panic(err)
}
defer elector.Close()
// Create scheduler with elector
s, err := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
gocron.WithLogger(gocron.NewLogger(gocron.LogLevelInfo)),
)
if err != nil {
panic(err)
}
defer s.Shutdown()
// Add jobs
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(func() {
fmt.Println("Job running on leader:", os.Getenv("INSTANCE_ID"))
}),
gocron.WithName("leader-job"),
)
s.Start()
fmt.Println("Scheduler started on instance", os.Getenv("INSTANCE_ID"))
// Wait for interrupt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
fmt.Println("Shutting down...")
}Using PostgreSQL for distributed locking:
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"hash/fnv"
"time"
"github.com/go-co-op/gocron/v2"
_ "github.com/lib/pq"
)
type postgresLocker struct {
db *sql.DB
}
func NewPostgresLocker(connStr string) (*postgresLocker, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
db.Close()
return nil, fmt.Errorf("postgres connection failed: %w", err)
}
return &postgresLocker{db: db}, nil
}
func (l *postgresLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
// Hash key to 64-bit integer for PostgreSQL advisory lock
lockID := hashKeyToInt64(key)
var acquired bool
err := l.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
if err != nil {
return nil, err
}
if !acquired {
return nil, errors.New("lock already held")
}
return &postgresLock{db: l.db, lockID: lockID}, nil
}
func (l *postgresLocker) Close() error {
return l.db.Close()
}
type postgresLock struct {
db *sql.DB
lockID int64
}
func (l *postgresLock) Unlock(ctx context.Context) error {
var released bool
err := l.db.QueryRowContext(ctx, "SELECT pg_advisory_unlock($1)", l.lockID).Scan(&released)
if err != nil {
return err
}
if !released {
return errors.New("lock was not held")
}
return nil
}
func hashKeyToInt64(key string) int64 {
h := fnv.New64a()
h.Write([]byte(key))
return int64(h.Sum64())
}
func main() {
// Create locker
locker, err := NewPostgresLocker("postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
panic(err)
}
defer locker.Close()
// Create scheduler
s, err := gocron.NewScheduler(
gocron.WithDistributedLocker(locker),
)
if err != nil {
panic(err)
}
defer s.Shutdown()
// Add jobs
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(func() {
fmt.Println("Job running")
}),
gocron.WithName("postgres-job"),
)
s.Start()
select {}
}Ensure all instances schedule jobs at the same time:
func main() {
locker, _ := NewRedisLocker("localhost:6379")
defer locker.Close()
s, _ := gocron.NewScheduler(
gocron.WithDistributedLocker(locker),
)
defer s.Shutdown()
// Align to 5-minute boundaries
now := time.Now()
next5Min := now.Truncate(5 * time.Minute).Add(5 * time.Minute)
s.NewJob(
gocron.DurationJob(5*time.Minute),
gocron.NewTask(doWork),
gocron.WithName("aligned-job"),
gocron.WithStartAt(gocron.WithStartDateTime(next5Min)),
)
s.Start()
select {}
}Use different lockers for different jobs:
func main() {
// Redis for most jobs
redisLocker, _ := NewRedisLocker("localhost:6379")
defer redisLocker.Close()
// PostgreSQL for critical jobs
pgLocker, _ := NewPostgresLocker("postgres://...")
defer pgLocker.Close()
s, _ := gocron.NewScheduler(
gocron.WithDistributedLocker(redisLocker), // Default
)
defer s.Shutdown()
// Regular job uses Redis
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(doRegularWork),
gocron.WithName("regular-job"),
)
// Critical job uses PostgreSQL
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(doCriticalWork),
gocron.WithName("critical-job"),
gocron.WithDistributedJobLocker(pgLocker), // Override
)
// Local-only job (no locking)
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(doLocalWork),
gocron.WithName("local-job"),
gocron.WithDisabledDistributedJobLocker(true),
)
s.Start()
select {}
}Use both strategies (uncommon):
func main() {
elector, _ := NewEtcdElector([]string{"localhost:2379"}, "/gocron/election")
defer elector.Close()
locker, _ := NewRedisLocker("localhost:6379")
defer locker.Close()
s, _ := gocron.NewScheduler(
gocron.WithDistributedElector(elector), // Only leader runs jobs
gocron.WithDistributedLocker(locker), // Leader uses locks for per-job mutex
)
defer s.Shutdown()
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(doWork),
gocron.WithName("combined-job"),
)
s.Start()
select {}
}Track lock failures and distribution:
package main
import (
"fmt"
"sync/atomic"
"time"
"github.com/go-co-op/gocron/v2"
)
type distributedMetrics struct {
lockFailures atomic.Int64
lockSuccesses atomic.Int64
}
func (m *distributedMetrics) recordLockFailure() {
m.lockFailures.Add(1)
}
func (m *distributedMetrics) recordLockSuccess() {
m.lockSuccesses.Add(1)
}
func (m *distributedMetrics) printStats() {
failures := m.lockFailures.Load()
successes := m.lockSuccesses.Load()
total := failures + successes
if total == 0 {
fmt.Println("No lock attempts yet")
return
}
successRate := float64(successes) / float64(total) * 100
fmt.Printf("Lock stats: %d/%d successful (%.1f%%)\n", successes, total, successRate)
}
func main() {
metrics := &distributedMetrics{}
locker, _ := NewRedisLocker("localhost:6379")
defer locker.Close()
s, _ := gocron.NewScheduler(
gocron.WithDistributedLocker(locker),
)
defer s.Shutdown()
// Add jobs with lock monitoring
s.NewJob(
gocron.DurationJob(time.Minute),
gocron.NewTask(func() {
metrics.recordLockSuccess()
fmt.Println("Job running")
}),
gocron.WithName("monitored-job"),
gocron.WithEventListeners(
gocron.AfterLockError(func(jobID uuid.UUID, jobName string, err error) {
metrics.recordLockFailure()
fmt.Printf("Lock failed for %s: %v\n", jobName, err)
}),
),
)
// Print stats periodically
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
metrics.printStats()
}
}()
s.Start()
select {}
}Mock implementations for testing:
package main
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/go-co-op/gocron/v2"
)
// Mock locker for testing
type mockLocker struct {
locks map[string]bool
mu sync.Mutex
}
func newMockLocker() *mockLocker {
return &mockLocker{locks: make(map[string]bool)}
}
func (l *mockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.locks[key] {
return nil, errors.New("already locked")
}
l.locks[key] = true
return &mockLock{locker: l, key: key}, nil
}
type mockLock struct {
locker *mockLocker
key string
}
func (l *mockLock) Unlock(ctx context.Context) error {
l.locker.mu.Lock()
defer l.locker.mu.Unlock()
delete(l.locker.locks, l.key)
return nil
}
// Mock elector for testing
type mockElector struct {
isLeader bool
mu sync.RWMutex
}
func (e *mockElector) IsLeader(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.isLeader {
return errors.New("not leader")
}
return nil
}
func (e *mockElector) SetLeader(isLeader bool) {
e.mu.Lock()
defer e.mu.Unlock()
e.isLeader = isLeader
}
// Test distributed locking
func TestDistributedLocking(t *testing.T) {
locker := newMockLocker()
// Create two schedulers
s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
defer s1.Shutdown()
defer s2.Shutdown()
var runs1, runs2 int
var mu sync.Mutex
// Same job on both schedulers
s1.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
mu.Lock()
runs1++
mu.Unlock()
}),
gocron.WithName("shared-job"),
)
s2.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
mu.Lock()
runs2++
mu.Unlock()
}),
gocron.WithName("shared-job"),
)
s1.Start()
s2.Start()
time.Sleep(5 * time.Second)
mu.Lock()
total := runs1 + runs2
mu.Unlock()
t.Logf("Scheduler 1: %d runs, Scheduler 2: %d runs, Total: %d", runs1, runs2, total)
// Both schedulers should have attempted runs, but locks prevent simultaneous execution
if total == 0 {
t.Fatal("No runs executed")
}
}
// Test leader election
func TestLeaderElection(t *testing.T) {
elector := &mockElector{isLeader: true}
s1, _ := gocron.NewScheduler(gocron.WithDistributedElector(elector))
defer s1.Shutdown()
var runs int
var mu sync.Mutex
s1.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
mu.Lock()
runs++
mu.Unlock()
}),
)
s1.Start()
time.Sleep(3 * time.Second)
// Simulate leadership loss
elector.SetLeader(false)
time.Sleep(3 * time.Second)
mu.Lock()
finalRuns := runs
mu.Unlock()
t.Logf("Total runs: %d", finalRuns)
// Should run while leader, stop after losing leadership
if finalRuns == 0 {
t.Fatal("No runs while leader")
}
}Multi-instance deployment:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scheduler-1:
build: .
environment:
- INSTANCE_ID=scheduler-1
- REDIS_ADDR=redis:6379
depends_on:
- redis
scheduler-2:
build: .
environment:
- INSTANCE_ID=scheduler-2
- REDIS_ADDR=redis:6379
depends_on:
- redis
scheduler-3:
build: .
environment:
- INSTANCE_ID=scheduler-3
- REDIS_ADDR=redis:6379
depends_on:
- redisWithName()Elector, Locker, LockWithDistributedElector, WithDistributedLockerWithDistributedJobLocker, WithDisabledDistributedJobLockerInstall with Tessl CLI
npx tessl i tessl/golang-github-com-go-co-op-gocron-v2docs
api
examples
guides