CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

data-processing.mddocs/examples/by-scenario/

Data Processing Task Examples

ETL jobs, batch processing, queue workers, and data pipeline tasks.

ETL (Extract, Transform, Load) Jobs

Simple ETL Pipeline

// Extract, transform, and load data every hour
j, _ := s.NewJob(
    gocron.DurationJob(time.Hour),
    gocron.NewTask(func() error {
        log.Println("Starting ETL pipeline")

        // Extract
        data, err := extractFromSource()
        if err != nil {
            return fmt.Errorf("extract failed: %w", err)
        }

        // Transform
        transformed := transformData(data)

        // Load
        err = loadToDestination(transformed)
        if err != nil {
            return fmt.Errorf("load failed: %w", err)
        }

        log.Printf("ETL pipeline complete: processed %d records", len(transformed))
        return nil
    }),
    gocron.WithName("etl-pipeline"),
    gocron.WithSingletonMode(gocron.LimitModeReschedule),
    gocron.WithEventListeners(
        gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
            alertOps("etl-failure", err.Error())
        }),
    ),
)

Incremental Data Sync

// Sync only new/changed data every 5 minutes
j, _ := s.NewJob(
    gocron.DurationJob(5*time.Minute),
    gocron.NewTask(func() error {
        lastSync := getLastSyncTimestamp("users")

        // Extract only changes since last sync
        changes, err := db.Query(`
            SELECT * FROM users
            WHERE updated_at > ?
        `, lastSync)
        if err != nil {
            return err
        }

        if len(changes) == 0 {
            log.Println("No changes to sync")
            return nil
        }

        // Transform and load
        for _, record := range changes {
            transformed := transformUser(record)
            dataWarehouse.Upsert("users", transformed)
        }

        // Update sync timestamp
        setLastSyncTimestamp("users", time.Now())

        log.Printf("Synced %d user changes", len(changes))
        return nil
    }),
    gocron.WithName("incremental-sync"),
    gocron.WithIntervalFromCompletion(),
)

Multi-Source Data Aggregation

// Aggregate data from multiple sources daily at 2 AM
j, _ := s.NewJob(
    gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 0, 0))),
    gocron.NewTask(func() error {
        yesterday := time.Now().AddDate(0, 0, -1).Truncate(24 * time.Hour)

        // Extract from multiple sources
        salesData := extractSalesData(yesterday)
        inventoryData := extractInventoryData(yesterday)
        customerData := extractCustomerData(yesterday)

        // Transform and join
        aggregated := aggregateData(salesData, inventoryData, customerData)

        // Load to analytics database
        err := analyticsDB.BulkInsert("daily_summary", aggregated)
        if err != nil {
            return fmt.Errorf("failed to load aggregated data: %w", err)
        }

        log.Printf("Aggregated data for %v: %d records", yesterday, len(aggregated))
        return nil
    }),
    gocron.WithName("multi-source-aggregation"),
)

Batch Processing

Large Dataset Processing

// Process large dataset in batches
j, _ := s.NewJob(
    gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(1, 0, 0))),
    gocron.NewTask(func(ctx context.Context) error {
        batchSize := 1000
        offset := 0
        totalProcessed := 0

        for {
            // Check for cancellation
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }

            // Fetch batch
            batch := db.Query(`
                SELECT * FROM orders
                WHERE status = 'pending'
                ORDER BY created_at
                LIMIT ? OFFSET ?
            `, batchSize, offset)

            if len(batch) == 0 {
                break
            }

            // Process batch
            for _, order := range batch {
                processOrder(order)
            }

            totalProcessed += len(batch)
            offset += batchSize

            log.Printf("Processed %d/%d orders", totalProcessed, offset)

            // Rate limit
            time.Sleep(100 * time.Millisecond)
        }

        log.Printf("Batch processing complete: %d total orders", totalProcessed)
        return nil
    }),
    gocron.WithName("batch-processing"),
    gocron.WithSingletonMode(gocron.LimitModeReschedule),
)

Parallel Batch Processing

// Process batches in parallel with concurrency limit
j, _ := s.NewJob(
    gocron.DurationJob(30*time.Minute),
    gocron.NewTask(func(ctx context.Context) error {
        batchSize := 500
        maxWorkers := 5

        // Get total count
        total := db.QueryOne("SELECT COUNT(*) FROM items WHERE status = 'pending'")

        // Create work channel
        workChan := make(chan int, maxWorkers)
        errChan := make(chan error, maxWorkers)
        var wg sync.WaitGroup

        // Start workers
        for i := 0; i < maxWorkers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for offset := range workChan {
                    batch := fetchBatch(offset, batchSize)
                    if err := processBatch(batch); err != nil {
                        errChan <- err
                        return
                    }
                }
            }()
        }

        // Send work
        go func() {
            for offset := 0; offset < total; offset += batchSize {
                select {
                case <-ctx.Done():
                    return
                case workChan <- offset:
                }
            }
            close(workChan)
        }()

        // Wait for completion
        wg.Wait()
        close(errChan)

        // Check for errors
        for err := range errChan {
            if err != nil {
                return err
            }
        }

        return nil
    }),
    gocron.WithName("parallel-batch-processing"),
)

Scheduled Report Generation

// Generate weekly reports every Monday at 8 AM
j, _ := s.NewJob(
    gocron.WeeklyJob(
        1,
        gocron.NewWeekdays(time.Monday),
        gocron.NewAtTimes(gocron.NewAtTime(8, 0, 0)),
    ),
    gocron.NewTask(func() error {
        startDate := time.Now().AddDate(0, 0, -7).Truncate(24 * time.Hour)
        endDate := time.Now().Truncate(24 * time.Hour)

        log.Printf("Generating weekly report for %v to %v", startDate, endDate)

        // Gather data
        data := gatherReportData(startDate, endDate)

        // Generate report
        report := generateReport(data)

        // Save report
        reportPath := fmt.Sprintf("reports/weekly-%s.pdf", startDate.Format("2006-01-02"))
        err := saveReport(reportPath, report)
        if err != nil {
            return err
        }

        // Send to stakeholders
        sendReportEmail(reportPath, []string{"team@company.com"})

        log.Printf("Weekly report generated: %s", reportPath)
        return nil
    }),
    gocron.WithName("weekly-report"),
)

Queue Workers

Message Queue Consumer

// Process message queue every 10 seconds
j, _ := s.NewJob(
    gocron.DurationJob(10*time.Second),
    gocron.NewTask(func() {
        messages := queue.Poll("tasks", 20) // Poll up to 20 messages

        for _, msg := range messages {
            err := processMessage(msg)
            if err != nil {
                log.Printf("Message processing failed: %v", err)
                queue.Nack(msg) // Re-queue for retry
            } else {
                queue.Ack(msg) // Mark as processed
            }
        }

        if len(messages) > 0 {
            log.Printf("Processed %d messages", len(messages))
        }
    }),
    gocron.WithName("queue-worker"),
    gocron.WithIntervalFromCompletion(),
    gocron.WithSingletonMode(gocron.LimitModeReschedule),
)

Dead Letter Queue Processing

// Process DLQ every hour to retry failed messages
j, _ := s.NewJob(
    gocron.DurationJob(time.Hour),
    gocron.NewTask(func() {
        messages := queue.PollDLQ("tasks-dlq", 50)

        retried := 0
        abandoned := 0

        for _, msg := range messages {
            // Check retry count
            if msg.RetryCount >= 3 {
                // Move to permanent failure storage
                logPermanentFailure(msg)
                queue.Delete(msg)
                abandoned++
            } else {
                // Retry
                msg.RetryCount++
                queue.Push("tasks", msg)
                retried++
            }
        }

        log.Printf("DLQ: %d retried, %d abandoned", retried, abandoned)
    }),
    gocron.WithName("dlq-processor"),
)

Priority Queue Processing

// Process priority queue with high-priority items first
j, _ := s.NewJob(
    gocron.DurationJob(5*time.Second),
    gocron.NewTask(func() {
        // Process high priority first
        highPri := queue.Poll("tasks-high", 10)
        for _, msg := range highPri {
            processMessage(msg)
        }

        // Then medium priority
        medPri := queue.Poll("tasks-medium", 5)
        for _, msg := range medPri {
            processMessage(msg)
        }

        // Finally low priority
        lowPri := queue.Poll("tasks-low", 2)
        for _, msg := range lowPri {
            processMessage(msg)
        }
    }),
    gocron.WithName("priority-queue-worker"),
    gocron.WithIntervalFromCompletion(),
)

Data Import/Export

CSV Import

// Import CSV files dropped in import folder
j, _ := s.NewJob(
    gocron.DurationJob(2*time.Minute),
    gocron.NewTask(func() error {
        files := listFilesInDirectory("./imports/*.csv")

        for _, file := range files {
            log.Printf("Importing %s", file)

            records, err := parseCSV(file)
            if err != nil {
                log.Printf("Failed to parse %s: %v", file, err)
                moveToErrorFolder(file)
                continue
            }

            // Import records
            for _, record := range records {
                db.Insert("imported_data", record)
            }

            // Move to processed folder
            moveToProcessedFolder(file)

            log.Printf("Imported %d records from %s", len(records), file)
        }

        return nil
    }),
    gocron.WithName("csv-import"),
)

Database Export

// Export database to S3 daily at midnight
j, _ := s.NewJob(
    gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))),
    gocron.NewTask(func() error {
        timestamp := time.Now().Format("2006-01-02")
        filename := fmt.Sprintf("export-%s.sql.gz", timestamp)

        // Export database
        log.Println("Starting database export")
        err := exportDatabase(filename)
        if err != nil {
            return fmt.Errorf("export failed: %w", err)
        }

        // Upload to S3
        err = uploadToS3(filename, "backups/"+filename)
        if err != nil {
            return fmt.Errorf("S3 upload failed: %w", err)
        }

        // Clean up local file
        os.Remove(filename)

        log.Printf("Database exported to S3: %s", filename)
        return nil
    }),
    gocron.WithName("db-export"),
)

Complete Data Processing Example

package main

import (
    "context"
    "log"
    "time"

    "github.com/go-co-op/gocron/v2"
)

func main() {
    s, _ := gocron.NewScheduler()
    defer s.Shutdown()

    // ETL pipeline every hour
    s.NewJob(
        gocron.DurationJob(time.Hour),
        gocron.NewTask(func() error {
            return runETLPipeline()
        }),
        gocron.WithName("etl-hourly"),
        gocron.WithSingletonMode(gocron.LimitModeReschedule),
    )

    // Queue worker every 10 seconds
    s.NewJob(
        gocron.DurationJob(10*time.Second),
        gocron.NewTask(func() {
            processQueueMessages()
        }),
        gocron.WithName("queue-worker"),
        gocron.WithIntervalFromCompletion(),
    )

    // Batch processing nightly at 2 AM
    s.NewJob(
        gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 0, 0))),
        gocron.NewTask(func(ctx context.Context) error {
            return processBatchData(ctx)
        }),
        gocron.WithName("batch-nightly"),
    )

    // Data export daily at midnight
    s.NewJob(
        gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))),
        gocron.NewTask(func() error {
            return exportDataToS3()
        }),
        gocron.WithName("data-export"),
    )

    s.Start()
    log.Println("Data processing scheduler started")
    select {}
}

Related Documentation

  • Examples: Web App Tasks
  • Examples: Maintenance Jobs
  • Guide: Concurrency Control
  • API: Job Options

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2

docs

index.md

tile.json