A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.
ETL jobs, batch processing, queue workers, and data pipeline tasks.
// Extract, transform, and load data every hour
j, _ := s.NewJob(
gocron.DurationJob(time.Hour),
gocron.NewTask(func() error {
log.Println("Starting ETL pipeline")
// Extract
data, err := extractFromSource()
if err != nil {
return fmt.Errorf("extract failed: %w", err)
}
// Transform
transformed := transformData(data)
// Load
err = loadToDestination(transformed)
if err != nil {
return fmt.Errorf("load failed: %w", err)
}
log.Printf("ETL pipeline complete: processed %d records", len(transformed))
return nil
}),
gocron.WithName("etl-pipeline"),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
gocron.WithEventListeners(
gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
alertOps("etl-failure", err.Error())
}),
),
)// Sync only new/changed data every 5 minutes
j, _ := s.NewJob(
gocron.DurationJob(5*time.Minute),
gocron.NewTask(func() error {
lastSync := getLastSyncTimestamp("users")
// Extract only changes since last sync
changes, err := db.Query(`
SELECT * FROM users
WHERE updated_at > ?
`, lastSync)
if err != nil {
return err
}
if len(changes) == 0 {
log.Println("No changes to sync")
return nil
}
// Transform and load
for _, record := range changes {
transformed := transformUser(record)
dataWarehouse.Upsert("users", transformed)
}
// Update sync timestamp
setLastSyncTimestamp("users", time.Now())
log.Printf("Synced %d user changes", len(changes))
return nil
}),
gocron.WithName("incremental-sync"),
gocron.WithIntervalFromCompletion(),
)// Aggregate data from multiple sources daily at 2 AM
j, _ := s.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 0, 0))),
gocron.NewTask(func() error {
yesterday := time.Now().AddDate(0, 0, -1).Truncate(24 * time.Hour)
// Extract from multiple sources
salesData := extractSalesData(yesterday)
inventoryData := extractInventoryData(yesterday)
customerData := extractCustomerData(yesterday)
// Transform and join
aggregated := aggregateData(salesData, inventoryData, customerData)
// Load to analytics database
err := analyticsDB.BulkInsert("daily_summary", aggregated)
if err != nil {
return fmt.Errorf("failed to load aggregated data: %w", err)
}
log.Printf("Aggregated data for %v: %d records", yesterday, len(aggregated))
return nil
}),
gocron.WithName("multi-source-aggregation"),
)// Process large dataset in batches
j, _ := s.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(1, 0, 0))),
gocron.NewTask(func(ctx context.Context) error {
batchSize := 1000
offset := 0
totalProcessed := 0
for {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Fetch batch
batch := db.Query(`
SELECT * FROM orders
WHERE status = 'pending'
ORDER BY created_at
LIMIT ? OFFSET ?
`, batchSize, offset)
if len(batch) == 0 {
break
}
// Process batch
for _, order := range batch {
processOrder(order)
}
totalProcessed += len(batch)
offset += batchSize
log.Printf("Processed %d/%d orders", totalProcessed, offset)
// Rate limit
time.Sleep(100 * time.Millisecond)
}
log.Printf("Batch processing complete: %d total orders", totalProcessed)
return nil
}),
gocron.WithName("batch-processing"),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)// Process batches in parallel with concurrency limit
j, _ := s.NewJob(
gocron.DurationJob(30*time.Minute),
gocron.NewTask(func(ctx context.Context) error {
batchSize := 500
maxWorkers := 5
// Get total count
total := db.QueryOne("SELECT COUNT(*) FROM items WHERE status = 'pending'")
// Create work channel
workChan := make(chan int, maxWorkers)
errChan := make(chan error, maxWorkers)
var wg sync.WaitGroup
// Start workers
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for offset := range workChan {
batch := fetchBatch(offset, batchSize)
if err := processBatch(batch); err != nil {
errChan <- err
return
}
}
}()
}
// Send work
go func() {
for offset := 0; offset < total; offset += batchSize {
select {
case <-ctx.Done():
return
case workChan <- offset:
}
}
close(workChan)
}()
// Wait for completion
wg.Wait()
close(errChan)
// Check for errors
for err := range errChan {
if err != nil {
return err
}
}
return nil
}),
gocron.WithName("parallel-batch-processing"),
)// Generate weekly reports every Monday at 8 AM
j, _ := s.NewJob(
gocron.WeeklyJob(
1,
gocron.NewWeekdays(time.Monday),
gocron.NewAtTimes(gocron.NewAtTime(8, 0, 0)),
),
gocron.NewTask(func() error {
startDate := time.Now().AddDate(0, 0, -7).Truncate(24 * time.Hour)
endDate := time.Now().Truncate(24 * time.Hour)
log.Printf("Generating weekly report for %v to %v", startDate, endDate)
// Gather data
data := gatherReportData(startDate, endDate)
// Generate report
report := generateReport(data)
// Save report
reportPath := fmt.Sprintf("reports/weekly-%s.pdf", startDate.Format("2006-01-02"))
err := saveReport(reportPath, report)
if err != nil {
return err
}
// Send to stakeholders
sendReportEmail(reportPath, []string{"team@company.com"})
log.Printf("Weekly report generated: %s", reportPath)
return nil
}),
gocron.WithName("weekly-report"),
)// Process message queue every 10 seconds
j, _ := s.NewJob(
gocron.DurationJob(10*time.Second),
gocron.NewTask(func() {
messages := queue.Poll("tasks", 20) // Poll up to 20 messages
for _, msg := range messages {
err := processMessage(msg)
if err != nil {
log.Printf("Message processing failed: %v", err)
queue.Nack(msg) // Re-queue for retry
} else {
queue.Ack(msg) // Mark as processed
}
}
if len(messages) > 0 {
log.Printf("Processed %d messages", len(messages))
}
}),
gocron.WithName("queue-worker"),
gocron.WithIntervalFromCompletion(),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)// Process DLQ every hour to retry failed messages
j, _ := s.NewJob(
gocron.DurationJob(time.Hour),
gocron.NewTask(func() {
messages := queue.PollDLQ("tasks-dlq", 50)
retried := 0
abandoned := 0
for _, msg := range messages {
// Check retry count
if msg.RetryCount >= 3 {
// Move to permanent failure storage
logPermanentFailure(msg)
queue.Delete(msg)
abandoned++
} else {
// Retry
msg.RetryCount++
queue.Push("tasks", msg)
retried++
}
}
log.Printf("DLQ: %d retried, %d abandoned", retried, abandoned)
}),
gocron.WithName("dlq-processor"),
)// Process priority queue with high-priority items first
j, _ := s.NewJob(
gocron.DurationJob(5*time.Second),
gocron.NewTask(func() {
// Process high priority first
highPri := queue.Poll("tasks-high", 10)
for _, msg := range highPri {
processMessage(msg)
}
// Then medium priority
medPri := queue.Poll("tasks-medium", 5)
for _, msg := range medPri {
processMessage(msg)
}
// Finally low priority
lowPri := queue.Poll("tasks-low", 2)
for _, msg := range lowPri {
processMessage(msg)
}
}),
gocron.WithName("priority-queue-worker"),
gocron.WithIntervalFromCompletion(),
)// Import CSV files dropped in import folder
j, _ := s.NewJob(
gocron.DurationJob(2*time.Minute),
gocron.NewTask(func() error {
files := listFilesInDirectory("./imports/*.csv")
for _, file := range files {
log.Printf("Importing %s", file)
records, err := parseCSV(file)
if err != nil {
log.Printf("Failed to parse %s: %v", file, err)
moveToErrorFolder(file)
continue
}
// Import records
for _, record := range records {
db.Insert("imported_data", record)
}
// Move to processed folder
moveToProcessedFolder(file)
log.Printf("Imported %d records from %s", len(records), file)
}
return nil
}),
gocron.WithName("csv-import"),
)// Export database to S3 daily at midnight
j, _ := s.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))),
gocron.NewTask(func() error {
timestamp := time.Now().Format("2006-01-02")
filename := fmt.Sprintf("export-%s.sql.gz", timestamp)
// Export database
log.Println("Starting database export")
err := exportDatabase(filename)
if err != nil {
return fmt.Errorf("export failed: %w", err)
}
// Upload to S3
err = uploadToS3(filename, "backups/"+filename)
if err != nil {
return fmt.Errorf("S3 upload failed: %w", err)
}
// Clean up local file
os.Remove(filename)
log.Printf("Database exported to S3: %s", filename)
return nil
}),
gocron.WithName("db-export"),
)package main
import (
"context"
"log"
"time"
"github.com/go-co-op/gocron/v2"
)
func main() {
s, _ := gocron.NewScheduler()
defer s.Shutdown()
// ETL pipeline every hour
s.NewJob(
gocron.DurationJob(time.Hour),
gocron.NewTask(func() error {
return runETLPipeline()
}),
gocron.WithName("etl-hourly"),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
// Queue worker every 10 seconds
s.NewJob(
gocron.DurationJob(10*time.Second),
gocron.NewTask(func() {
processQueueMessages()
}),
gocron.WithName("queue-worker"),
gocron.WithIntervalFromCompletion(),
)
// Batch processing nightly at 2 AM
s.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 0, 0))),
gocron.NewTask(func(ctx context.Context) error {
return processBatchData(ctx)
}),
gocron.WithName("batch-nightly"),
)
// Data export daily at midnight
s.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))),
gocron.NewTask(func() error {
return exportDataToS3()
}),
gocron.WithName("data-export"),
)
s.Start()
log.Println("Data processing scheduler started")
select {}
}Install with Tessl CLI
npx tessl i tessl/golang-github-com-go-co-op-gocron-v2docs
api
examples
guides