or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/bigquery@v1.72.0

docs

advanced-features.mdclient-setup.mddata-export.mddata-loading.mddatasets.mdindex.mdjobs.mdqueries.mdstorage-read.mdstorage-write.mdtables.md
tile.json

tessl/golang-cloud-google-com--go--bigquery

tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0

Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs

jobs.mddocs/

Job Management

This document covers managing BigQuery jobs, including monitoring status, canceling jobs, and retrieving job information.

Overview

All BigQuery operations that take time to complete (queries, loads, extracts, copies) create jobs. Jobs can be monitored, canceled, and retrieved by ID.

Job Type

type Job struct {
    // Read-only fields
    JobID    string
    Location string
    Email    string
}
func (j *Job) ID() string
func (j *Job) Location() string
func (j *Job) Config() (JobConfig, error)
func (j *Job) SessionID() string

JobConfig Interface

type JobConfig interface {
    isJobConfig()
}

The JobConfig interface is implemented by configuration types for different job types:

  • *CopyConfig - Table copy operations
  • *ExtractConfig - Data extract operations
  • *LoadConfig - Data load operations
  • *QueryConfig - Query operations

Get the configuration for a job:

config, err := job.Config()
if err != nil {
    return err
}

// Type assert to the specific config type
switch c := config.(type) {
case *bigquery.QueryConfig:
    fmt.Printf("Query: %s\n", c.Q)
case *bigquery.LoadConfig:
    fmt.Printf("Loading to: %s\n", c.Dst.FullyQualifiedName())
case *bigquery.CopyConfig:
    fmt.Printf("Copying to: %s\n", c.Dst.FullyQualifiedName())
case *bigquery.ExtractConfig:
    fmt.Printf("Extracting from: %s\n", c.Src.FullyQualifiedName())
}

Creating Jobs

Jobs are created by running queries, loads, extracts, or copies:

// Query job
q := client.Query("SELECT * FROM dataset.table")
job, err := q.Run(ctx)

// Load job
loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)

// Extract job
extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)

// Copy job
copier := dstTable.CopierFrom(srcTable)
job, err := copier.Run(ctx)

Retrieving Jobs

Get Job by ID

func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error)

Retrieve a job using its ID (in client's location):

jobID := "my-job-id"
job, err := client.JobFromID(ctx, jobID)
if err != nil {
    return err
}

Get Job by ID and Location

func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (*Job, error)
jobID := "my-job-id"
location := "US"
job, err := client.JobFromIDLocation(ctx, jobID, location)

Get Job from Another Project

func (c *Client) JobFromProject(ctx context.Context, projectID, jobID, location string) (*Job, error)
projectID := "other-project"
jobID := "their-job-id"
location := "EU"
job, err := client.JobFromProject(ctx, projectID, jobID, location)

Job Status

JobStatus Type

type JobStatus struct {
    State      State
    Done       bool
    Err        error
    Statistics *JobStatistics
}
func (j *Job) Status(ctx context.Context) (*JobStatus, error)
func (j *Job) LastStatus() *JobStatus

State Type

type State string

const (
    Pending State = "PENDING"
    Running State = "RUNNING"
    Done    State = "DONE"
)

Check Job Status

status, err := job.Status(ctx)
if err != nil {
    return err
}

fmt.Printf("State: %s\n", status.State)
fmt.Printf("Done: %v\n", status.Done)

if status.Err != nil {
    fmt.Printf("Error: %v\n", status.Err)
}

Wait for Job Completion

func (j *Job) Wait(ctx context.Context) (*JobStatus, error)

Wait with exponential backoff:

status, err := job.Wait(ctx)
if err != nil {
    return err
}

if err := status.Err(); err != nil {
    return err
}

fmt.Println("Job completed successfully")

Manual Polling

import "time"

for {
    status, err := job.Status(ctx)
    if err != nil {
        return err
    }

    if status.Done {
        if status.Err != nil {
            return status.Err
        }
        break
    }

    time.Sleep(5 * time.Second)
}

Job Statistics

JobStatistics Type

type JobStatistics struct {
    CreationTime        time.Time
    StartTime           time.Time
    EndTime             time.Time
    TotalBytesProcessed int64
    NumChildJobs        int64
    ParentJobID         string
    ScriptStatistics    *ScriptStatistics
    ReservationID       string
    TransactionInfo     *TransactionInfo
    SessionInfo         *SessionInfo
    FinalExecutionDuration time.Duration
    TotalSlotMs         int64
    QuotaDeferments     []string
    Details             StatisticsDetails
}
type Statistics interface {
    implementsStatistics()
}

The Statistics interface is implemented by job-type-specific statistics types:

  • *QueryStatistics - Query job statistics
  • *LoadStatistics - Load job statistics
  • *ExtractStatistics - Extract job statistics
  • *CopyStatistics - Copy job statistics (not separately defined, uses ExtractStatistics)

Get Job Statistics

status, err := job.Wait(ctx)
if err != nil {
    return err
}

stats := status.Statistics
fmt.Printf("Creation time: %s\n", stats.CreationTime)
fmt.Printf("Start time: %s\n", stats.StartTime)
fmt.Printf("End time: %s\n", stats.EndTime)
fmt.Printf("Total bytes processed: %d\n", stats.TotalBytesProcessed)

Query Statistics

type QueryStatistics struct {
    TotalBytesProcessed        int64
    TotalBytesBilled           int64
    BillingTier                int64
    CacheHit                   bool
    DDLTargetTable             *Table
    DDLOperationPerformed      string
    StatementType              string
    TotalPartitionsProcessed   int64
    TotalSlotMs                int64
    ReferencedTables           []*Table
    Schema                     Schema
    NumDMLAffectedRows         int64
    DMLStats                   *DMLStatistics
    QueryPlan                  []*ExplainQueryStage
    Timeline                   []*QueryTimelineIteration
    UndeclaredQueryParameters  []QueryParameter
    BiEngineStatistics         *BIEngineStatistics
    TransactionInfo            *TransactionInfo
}
queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
fmt.Printf("Bytes billed: %d\n", queryStats.TotalBytesBilled)
fmt.Printf("Cache hit: %v\n", queryStats.CacheHit)
fmt.Printf("Statement type: %s\n", queryStats.StatementType)

Load Statistics

type LoadStatistics struct {
    InputFileBytes  int64
    InputFiles      int64
    OutputBytes     int64
    OutputRows      int64
    BadRecords      int64
}
loadStats := status.Statistics.Details.(*bigquery.LoadStatistics)
fmt.Printf("Input files: %d\n", loadStats.InputFiles)
fmt.Printf("Output rows: %d\n", loadStats.OutputRows)
fmt.Printf("Bad records: %d\n", loadStats.BadRecords)

ExtractStatistics

type ExtractStatistics struct {
    DestinationURIFileCounts []int64
}

Extract statistics for export jobs:

extractStats := status.Statistics.Details.(*bigquery.ExtractStatistics)
fmt.Printf("Files created: %d\n", len(extractStats.DestinationURIFileCounts))
for i, count := range extractStats.DestinationURIFileCounts {
    fmt.Printf("  URI %d: %d files\n", i, count)
}

DML Statistics

type DMLStatistics struct {
    InsertedRowCount int64
    DeletedRowCount  int64
    UpdatedRowCount  int64
}

DML statistics for INSERT, UPDATE, DELETE statements:

queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
if queryStats.DMLStats != nil {
    fmt.Printf("Inserted rows: %d\n", queryStats.DMLStats.InsertedRowCount)
    fmt.Printf("Deleted rows: %d\n", queryStats.DMLStats.DeletedRowCount)
    fmt.Printf("Updated rows: %d\n", queryStats.DMLStats.UpdatedRowCount)
}

Export Data Statistics

type ExportDataStatistics struct {
    FileCount int64
    RowCount  int64
}

Statistics for EXPORT DATA statements within query jobs:

queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
if queryStats.ExportDataStatistics != nil {
    fmt.Printf("Exported files: %d\n", queryStats.ExportDataStatistics.FileCount)
    fmt.Printf("Exported rows: %d\n", queryStats.ExportDataStatistics.RowCount)
}

Query Execution Plan

type ExplainQueryStage struct {
    CompletedParallelInputs int64
    ComputeAvg              time.Duration
    ComputeMax              time.Duration
    ComputeRatioAvg         float64
    ComputeRatioMax         float64
    EndTime                 time.Time
    ID                      int64
    InputStages             []int64
    Name                    string
    ParallelInputs          int64
    ReadAvg                 time.Duration
    ReadMax                 time.Duration
    ReadRatioAvg            float64
    ReadRatioMax            float64
    RecordsRead             int64
    RecordsWritten          int64
    ShuffleOutputBytes      int64
    ShuffleOutputBytesSpilled int64
    StartTime               time.Time
    Status                  string
    Steps                   []*ExplainQueryStep
    WaitAvg                 time.Duration
    WaitMax                 time.Duration
    WaitRatioAvg            float64
    WaitRatioMax            float64
    WriteAvg                time.Duration
    WriteMax                time.Duration
    WriteRatioAvg           float64
    WriteRatioMax           float64
}
type ExplainQueryStep struct {
    Kind     string
    Substeps []string
}

Query execution stages and steps from the query plan:

queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
for _, stage := range queryStats.QueryPlan {
    fmt.Printf("Stage %d: %s\n", stage.ID, stage.Name)
    fmt.Printf("  Records read: %d\n", stage.RecordsRead)
    fmt.Printf("  Records written: %d\n", stage.RecordsWritten)
    fmt.Printf("  Duration: %s to %s\n", stage.StartTime, stage.EndTime)

    for _, step := range stage.Steps {
        fmt.Printf("    Step: %s\n", step.Kind)
    }
}

Script Statistics

type ScriptStatistics struct {
    EvaluationKind string
    StackFrames    []*ScriptStackFrame
}
type ScriptStackFrame struct {
    StartLine   int64
    StartColumn int64
    EndLine     int64
    EndColumn   int64
    ProcedureID string
    Text        string
}

Statistics for script-based query jobs:

stats := status.Statistics
if stats.ScriptStatistics != nil {
    fmt.Printf("Evaluation kind: %s\n", stats.ScriptStatistics.EvaluationKind)
    for _, frame := range stats.ScriptStatistics.StackFrames {
        fmt.Printf("Frame: %s (lines %d-%d)\n",
            frame.ProcedureID, frame.StartLine, frame.EndLine)
    }
}

Session and Transaction Info

type SessionInfo struct {
    SessionID string
}
type TransactionInfo struct {
    TransactionID string
}

Information about multi-statement transactions and sessions:

stats := status.Statistics
if stats.SessionInfo != nil {
    fmt.Printf("Session ID: %s\n", stats.SessionInfo.SessionID)
}
if stats.TransactionInfo != nil {
    fmt.Printf("Transaction ID: %s\n", stats.TransactionInfo.TransactionID)
}

Canceling Jobs

func (j *Job) Cancel(ctx context.Context) error

Cancel a running job:

if err := job.Cancel(ctx); err != nil {
    return err
}

// Wait for cancellation to complete
status, err := job.Wait(ctx)
if err != nil {
    return err
}

if status.Err != nil {
    fmt.Printf("Job was canceled: %v\n", status.Err)
}

Listing Jobs

Job Iterator

func (c *Client) Jobs(ctx context.Context) *JobIterator
type JobIterator struct {
    // ProjectID is the project to list jobs from
    ProjectID string

    // AllUsers, when true, lists jobs from all users in the project
    AllUsers bool

    // State filters jobs by state
    State State

    // MinCreationTime filters jobs created after this time
    MinCreationTime time.Time

    // MaxCreationTime filters jobs created before this time
    MaxCreationTime time.Time

    // ParentJobID filters jobs by parent job ID
    ParentJobID string
}
func (it *JobIterator) Next() (*Job, error)
func (it *JobIterator) PageInfo() *iterator.PageInfo

List All Jobs

it := client.Jobs(ctx)
for {
    job, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        return err
    }

    fmt.Printf("Job: %s\n", job.ID())
}

Filter Jobs

it := client.Jobs(ctx)
it.AllUsers = true
it.State = bigquery.Running
it.MinCreationTime = time.Now().Add(-24 * time.Hour)

for {
    job, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        return err
    }

    status := job.LastStatus()
    fmt.Printf("Job: %s, State: %s\n", job.ID(), status.State)
}

Job Configuration

JobIDConfig Type

type JobIDConfig struct {
    JobID          string
    AddJobIDSuffix bool
    Location       string
    ProjectID      string
}

Set custom job ID:

q := client.Query("SELECT 1")
q.JobID = "my-custom-job-id"
q.Location = "US"

job, err := q.Run(ctx)

Add random suffix:

q.JobID = "query-job"
q.AddJobIDSuffix = true
// Results in "query-job-<random-suffix>"

Reading Job Results

Query Job Results

func (j *Job) Read(ctx context.Context) (*RowIterator, error)
job, err := q.Run(ctx)
if err != nil {
    return err
}

// Can read results before job is done
it, err := job.Read(ctx)
if err != nil {
    return err
}

for {
    var values []bigquery.Value
    err := it.Next(&values)
    if err == iterator.Done {
        break
    }
    if err != nil {
        return err
    }
    fmt.Println(values)
}

Dry Run Jobs

Execute a dry run to get statistics without actually running:

q := client.Query("SELECT * FROM large_table")
q.DryRun = true

job, err := q.Run(ctx)
if err != nil {
    return err
}

stats := job.LastStatus().Statistics
queryStats := stats.Details.(*bigquery.QueryStatistics)
fmt.Printf("Would process: %d bytes\n", queryStats.TotalBytesProcessed)

Job Timeouts

Set a timeout for job execution:

q := client.Query("SELECT * FROM dataset.table")
q.JobTimeout = 10 * time.Minute

job, err := q.Run(ctx)

Note: The timeout cannot be adjusted after the job is created. Use Job.Cancel() for more dynamic control.

Complete Job Management Example

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "cloud.google.com/go/bigquery"
    "google.golang.org/api/iterator"
)

func main() {
    ctx := context.Background()
    client, err := bigquery.NewClient(ctx, "my-project")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Start a query job
    q := client.Query(`
        SELECT name, COUNT(*) as count
        FROM ` + "`bigquery-public-data.usa_names.usa_1910_2013`" + `
        GROUP BY name
        ORDER BY count DESC
        LIMIT 100
    `)
    q.JobID = "analysis-job"
    q.AddJobIDSuffix = true

    job, err := q.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }

    jobID := job.ID()
    fmt.Printf("Started job: %s\n", jobID)

    // Monitor job progress
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    done := make(chan bool)
    go func() {
        for {
            select {
            case <-ticker.C:
                status, err := job.Status(ctx)
                if err != nil {
                    log.Printf("Error checking status: %v\n", err)
                    continue
                }

                fmt.Printf("Job state: %s\n", status.State)

                if status.Done {
                    if status.Err != nil {
                        log.Printf("Job failed: %v\n", status.Err)
                    } else {
                        fmt.Println("Job completed successfully")
                    }
                    done <- true
                    return
                }
            }
        }
    }()

    // Wait for completion
    <-done

    // Get final statistics
    status := job.LastStatus()
    stats := status.Statistics
    queryStats := stats.Details.(*bigquery.QueryStatistics)

    fmt.Printf("\nJob Statistics:\n")
    fmt.Printf("  Creation time: %s\n", stats.CreationTime)
    fmt.Printf("  Start time: %s\n", stats.StartTime)
    fmt.Printf("  End time: %s\n", stats.EndTime)
    fmt.Printf("  Duration: %s\n", stats.EndTime.Sub(stats.StartTime))
    fmt.Printf("  Bytes processed: %d\n", queryStats.TotalBytesProcessed)
    fmt.Printf("  Bytes billed: %d\n", queryStats.TotalBytesBilled)
    fmt.Printf("  Cache hit: %v\n", queryStats.CacheHit)
    fmt.Printf("  Slot ms: %d\n", queryStats.TotalSlotMs)

    // Read results
    it, err := job.Read(ctx)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("\nResults (%d total rows):\n", it.TotalRows)
    count := 0
    for {
        var values []bigquery.Value
        err := it.Next(&values)
        if err == iterator.Done {
            break
        }
        if err != nil {
            log.Fatal(err)
        }

        if count < 10 {
            fmt.Printf("  %v\n", values)
        }
        count++
    }

    // List recent jobs
    fmt.Println("\nRecent jobs:")
    listRecentJobs(ctx, client)
}

func listRecentJobs(ctx context.Context, client *bigquery.Client) {
    it := client.Jobs(ctx)
    it.AllUsers = false
    it.MinCreationTime = time.Now().Add(-1 * time.Hour)

    count := 0
    for {
        job, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            log.Fatal(err)
        }

        status := job.LastStatus()
        fmt.Printf("  %s: %s\n", job.ID(), status.State)

        count++
        if count >= 10 {
            break
        }
    }
}