or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/spanner@v1.87.0

docs

client.mddml.mdindex.mdkeys.mdlow-level.mdprotobuf-types.mdreads.mdtesting.mdtransactions.mdtypes.mdwrites.md
tile.json

tessl/golang-cloud-google-com--go--spanner

tessl install tessl/golang-cloud-google-com--go--spanner@1.87.2

Official Google Cloud Spanner client library for Go providing comprehensive database operations, transactions, and admin functionality

dml.mddocs/

DML Operations

Data Manipulation Language (DML) operations for INSERT, UPDATE, DELETE statements and partitioned DML.

Overview

DML statements in Spanner allow you to manipulate data using SQL. DML includes:

  • UPDATE: Modify existing rows
  • INSERT: Add new rows
  • DELETE: Remove rows
  • Batch DML: Execute multiple DML statements atomically
  • Partitioned DML: Large-scale updates across entire table

DML in Read-Write Transactions

Update - Execute Single DML Statement

func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error)
func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error)

Example - UPDATE:

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmt := spanner.NewStatement("UPDATE Users SET active = true WHERE region = @region")
    stmt.Params["region"] = "us-west"
    
    rowCount, err := txn.Update(ctx, stmt)
    if err != nil {
        return err
    }
    
    fmt.Printf("Updated %d rows\n", rowCount)
    return nil
})

Example - INSERT:

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmt := spanner.NewStatement(`
        INSERT INTO Users (id, name, email)
        VALUES (@id, @name, @email)
    `)
    stmt.Params["id"] = 123
    stmt.Params["name"] = "alice"
    stmt.Params["email"] = "alice@example.com"
    
    rowCount, err := txn.Update(ctx, stmt)
    if err != nil {
        return err
    }
    
    return nil
})

Example - DELETE:

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmt := spanner.NewStatement("DELETE FROM Users WHERE last_login < @cutoff")
    stmt.Params["cutoff"] = time.Now().AddDate(0, -6, 0)
    
    rowCount, err := txn.Update(ctx, stmt)
    if err != nil {
        return err
    }
    
    fmt.Printf("Deleted %d inactive users\n", rowCount)
    return nil
})

BatchUpdate - Execute Multiple DML Statements

func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) ([]int64, error)
func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) ([]int64, error)

Example:

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmts := []spanner.Statement{
        {
            SQL: "UPDATE Accounts SET balance = balance - @amount WHERE user_id = @from_user",
            Params: map[string]interface{}{
                "amount":    100,
                "from_user": "alice",
            },
        },
        {
            SQL: "UPDATE Accounts SET balance = balance + @amount WHERE user_id = @to_user",
            Params: map[string]interface{}{
                "amount":  100,
                "to_user": "bob",
            },
        },
        {
            SQL: "INSERT INTO Transactions (from_user, to_user, amount, timestamp) VALUES (@from, @to, @amt, CURRENT_TIMESTAMP())",
            Params: map[string]interface{}{
                "from": "alice",
                "to":   "bob",
                "amt":  100,
            },
        },
    }
    
    rowCounts, err := txn.BatchUpdate(ctx, stmts)
    if err != nil {
        return err
    }
    
    for i, count := range rowCounts {
        fmt.Printf("Statement %d affected %d rows\n", i, count)
    }
    
    return nil
})

Partitioned DML

For large-scale updates that don't require atomicity:

func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (count int64, err error)
func (c *Client) PartitionedUpdateWithOptions(ctx context.Context, statement Statement, opts QueryOptions) (count int64, err error)

Characteristics:

  • Executed in parallel across multiple partitions
  • NOT atomic - partial success possible
  • Lower bound on row count returned
  • Does not acquire locks
  • Suitable for bulk operations

Example:

stmt := spanner.NewStatement("UPDATE Users SET verified = false WHERE verified IS NULL")

rowCount, err := client.PartitionedUpdate(ctx, stmt)
if err != nil {
    return err
}

fmt.Printf("Updated at least %d rows\n", rowCount)

With Options:

stmt := spanner.NewStatement("DELETE FROM Logs WHERE timestamp < @cutoff")
stmt.Params["cutoff"] = time.Now().AddDate(0, -1, 0)

opts := spanner.QueryOptions{
    Priority:   sppb.RequestOptions_PRIORITY_LOW,
    RequestTag: "cleanup-old-logs",
}

rowCount, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts)

QueryOptions for DML

type QueryOptions struct {
    Mode                        *sppb.ExecuteSqlRequest_QueryMode
    Options                     *sppb.ExecuteSqlRequest_QueryOptions
    Priority                    sppb.RequestOptions_Priority
    RequestTag                  string
    DataBoostEnabled            bool
    DirectedReadOptions         *sppb.DirectedReadOptions
    ExcludeTxnFromChangeStreams bool
    LastStatement               bool
}

Example:

opts := spanner.QueryOptions{
    Priority:                    sppb.RequestOptions_PRIORITY_HIGH,
    RequestTag:                  "user-activation",
    ExcludeTxnFromChangeStreams: true,
}

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmt := spanner.NewStatement("UPDATE Users SET active = true WHERE id = @id")
    stmt.Params["id"] = userID
    
    _, err := txn.UpdateWithOptions(ctx, stmt, opts)
    return err
})

DML with Query Results

Combine DML with query to read affected data:

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    // DML that returns data
    stmt := spanner.NewStatement(`
        UPDATE Users
        SET balance = balance + @amount
        WHERE id = @user_id
        THEN RETURN balance
    `)
    stmt.Params["amount"] = 100
    stmt.Params["user_id"] = "alice"
    
    iter := txn.Query(ctx, stmt)
    defer iter.Stop()
    
    row, err := iter.Next()
    if err != nil {
        return err
    }
    
    var newBalance int64
    if err := row.Column(0, &newBalance); err != nil {
        return err
    }
    
    fmt.Printf("New balance: %d\n", newBalance)
    return nil
})

Last Statement Option

Mark statement as last in transaction for early validation:

opts := spanner.QueryOptions{
    LastStatement: true,
}

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmt := spanner.NewStatement("UPDATE Accounts SET balance = @balance WHERE id = @id")
    stmt.Params["balance"] = 1000
    stmt.Params["id"] = "A123"
    
    // This is the last statement - validate early
    _, err := txn.UpdateWithOptions(ctx, stmt, opts)
    if err != nil {
        return err
    }
    
    // Transaction will be committed
    return nil
})

Error Handling

Common DML errors:

_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
    stmt := spanner.NewStatement("UPDATE Users SET email = @email WHERE id = @id")
    stmt.Params["email"] = "newemail@example.com"
    stmt.Params["id"] = 123
    
    rowCount, err := txn.Update(ctx, stmt)
    if err != nil {
        // Check specific errors
        switch spanner.ErrCode(err) {
        case codes.InvalidArgument:
            return fmt.Errorf("invalid DML syntax: %w", err)
        case codes.FailedPrecondition:
            return fmt.Errorf("constraint violation: %w", err)
        case codes.Aborted:
            // Will be automatically retried
            return err
        default:
            return err
        }
    }
    
    if rowCount == 0 {
        return fmt.Errorf("no rows updated")
    }
    
    return nil
})

Best Practices

  1. Use transactions for atomicity: Regular DML in read-write transactions
  2. Use partitioned DML for bulk: When atomicity not required
  3. Batch related DML: Use BatchUpdate for multiple related statements
  4. Set appropriate priority: LOW for background operations
  5. Add request tags: For tracking and debugging
  6. Handle zero rowCount: Check if DML affected expected rows
  7. Consider mutations: Often simpler than INSERT/UPDATE DML
  8. Exclude from change streams: When appropriate for performance
  9. Use THEN RETURN: To read modified data in same statement
  10. Monitor PDML progress: Row count is lower bound

Complete Examples

Transfer Funds with DML

func TransferFunds(ctx context.Context, client *spanner.Client, from, to string, amount int64) error {
    _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
        stmts := []spanner.Statement{
            // Deduct from source
            {
                SQL: "UPDATE Accounts SET balance = balance - @amount WHERE id = @account",
                Params: map[string]interface{}{
                    "amount":  amount,
                    "account": from,
                },
            },
            // Add to destination
            {
                SQL: "UPDATE Accounts SET balance = balance + @amount WHERE id = @account",
                Params: map[string]interface{}{
                    "amount":  amount,
                    "account": to,
                },
            },
            // Log transaction
            {
                SQL: `INSERT INTO TransferLog (from_account, to_account, amount, timestamp)
                      VALUES (@from, @to, @amount, CURRENT_TIMESTAMP())`,
                Params: map[string]interface{}{
                    "from":   from,
                    "to":     to,
                    "amount": amount,
                },
            },
        }
        
        rowCounts, err := txn.BatchUpdate(ctx, stmts)
        if err != nil {
            return err
        }
        
        // Verify both accounts were updated
        if rowCounts[0] == 0 || rowCounts[1] == 0 {
            return fmt.Errorf("account not found")
        }
        
        return nil
    })
    
    return err
}

Bulk Data Cleanup with PDML

func CleanupOldData(ctx context.Context, client *spanner.Client, retentionDays int) error {
    cutoff := time.Now().AddDate(0, 0, -retentionDays)
    
    stmt := spanner.NewStatement("DELETE FROM Logs WHERE created_at < @cutoff")
    stmt.Params["cutoff"] = cutoff
    
    opts := spanner.QueryOptions{
        Priority:   sppb.RequestOptions_PRIORITY_LOW,
        RequestTag: "data-retention-cleanup",
    }
    
    rowCount, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts)
    if err != nil {
        return fmt.Errorf("cleanup failed: %w", err)
    }
    
    log.Printf("Deleted at least %d old log entries\n", rowCount)
    return nil
}

Conditional Update with DML

func ActivateUserIfEligible(ctx context.Context, client *spanner.Client, userID int64) error {
    _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
        // Read current state
        row, err := txn.ReadRow(ctx, "Users",
            spanner.Key{userID}, []string{"email_verified", "active"})
        if err != nil {
            return err
        }
        
        var verified, active bool
        if err := row.Columns(&verified, &active); err != nil {
            return err
        }
        
        if !verified {
            return fmt.Errorf("user email not verified")
        }
        
        if active {
            return nil // Already active
        }
        
        // Activate user
        stmt := spanner.NewStatement(`
            UPDATE Users
            SET active = true, activated_at = CURRENT_TIMESTAMP()
            WHERE id = @user_id
        `)
        stmt.Params["user_id"] = userID
        
        _, err = txn.Update(ctx, stmt)
        return err
    })
    
    return err
}