The gbytes package provides a thread-safe buffer implementation designed for testing streaming I/O operations. It supports incremental pattern detection and works seamlessly with Gomega's asynchronous assertions to test output from processes, network connections, and other I/O sources.
go get github.com/onsi/gomegaimport "github.com/onsi/gomega/gbytes"The gbytes.Buffer maintains an internal read cursor that advances as patterns are matched using the Say matcher. This allows sequential assertions on streaming data without re-matching previous content. The buffer stores all writes in memory and is thread-safe, making it ideal for testing but inappropriate for production use.
The Buffer type is a thread-safe in-memory buffer that implements io.Writer and io.Reader interfaces. It tracks a read cursor internally to support incremental pattern matching.
// NewBuffer creates a new empty gbytes.Buffer
func NewBuffer() *Buffer
// BufferWithBytes creates a buffer pre-populated with the given bytes
func BufferWithBytes(bytes []byte) *Buffer
// BufferReader creates a buffer that wraps an io.Reader
// The reader's contents are copied to the buffer in a background goroutine
// The buffer is automatically closed when the reader reaches EOF
func BufferReader(reader io.Reader) *Buffer// Write implements io.Writer, appending data to the buffer
// Returns an error if the buffer is closed
func (b *Buffer) Write(p []byte) (n int, err error)
// Read implements io.Reader, reading from the current read cursor position
// Advances the read cursor as data is read
// Returns io.EOF when no unread data remains
func (b *Buffer) Read(d []byte) (int, error)
// Clear removes all contents from the buffer and resets the read cursor to zero
// Returns an error if the buffer is closed
func (b *Buffer) Clear() error
// Close marks the buffer as closed, preventing further writes
// Does not affect reading from the buffer
func (b *Buffer) Close() error
// Closed returns true if the buffer has been closed
func (b *Buffer) Closed() bool
// Contents returns a copy of all data ever written to the buffer
// This includes data before the read cursor (unlike Read)
func (b *Buffer) Contents() []byte
// Detect returns a channel that receives true when the pattern is detected
// The regular expression is matched against unread buffer contents
// When matched, the read cursor advances to after the match
// The channel is closed after detecting or when CancelDetects is called
// Supports format string and arguments (uses fmt.Sprintf)
func (b *Buffer) Detect(desired string, args ...any) chan bool
// CancelDetects cancels all pending Detect operations and cleans up goroutines
// Always call this after using Detect to prevent goroutine leaks
func (b *Buffer) CancelDetects()The Say matcher checks if the unread portion of a buffer matches a regular expression pattern. When successful, it advances the buffer's read cursor to just after the match, ensuring subsequent matches only operate on new data.
// Say creates a matcher that checks for a regular expression pattern in the buffer
// The pattern is matched against unread buffer contents only
// Supports format string and arguments (uses fmt.Sprintf)
// Works with *gbytes.Buffer or any type implementing BufferProvider
// Integrates with Eventually to wait for patterns to appear
// Tells Eventually to abort when the buffer is closed
func Say(expected string, args ...any) types.GomegaMatcherThe IO wrapper functions add timeout protection to standard I/O operations. If the underlying operation doesn't complete within the specified duration, ErrTimeout is returned.
// TimeoutCloser wraps an io.Closer with a timeout
// Returns ErrTimeout if Close doesn't complete within the duration
func TimeoutCloser(c io.Closer, timeout time.Duration) io.Closer
// TimeoutReader wraps an io.Reader with a timeout
// Returns ErrTimeout if Read doesn't complete within the duration
func TimeoutReader(r io.Reader, timeout time.Duration) io.Reader
// TimeoutWriter wraps an io.Writer with a timeout
// Returns ErrTimeout if Write doesn't complete within the duration
func TimeoutWriter(w io.Writer, timeout time.Duration) io.Writer// ErrTimeout is returned by timeout wrappers when operations exceed the deadline
var ErrTimeout error// BufferProvider interface allows types to provide access to their underlying Buffer
// The Say matcher accepts both *Buffer and BufferProvider types
// This is particularly useful for process execution types that wrap buffers
type BufferProvider interface {
Buffer() *Buffer
}import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
// Create a buffer and write to it
buffer := gbytes.NewBuffer()
buffer.Write([]byte("Hello World"))
// Assert on buffer contents
Expect(buffer).Should(gbytes.Say("Hello"))
// Say advances the cursor, so this matches the rest
Expect(buffer).Should(gbytes.Say("World"))
// This won't match because cursor is at the end
Expect(buffer).ShouldNot(gbytes.Say("Hello"))
// Contents returns everything regardless of cursor
Expect(buffer.Contents()).To(Equal([]byte("Hello World")))import (
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
// Buffer receiving output from a background process
buffer := gbytes.NewBuffer()
// Start a goroutine that writes slowly
go func() {
time.Sleep(100 * time.Millisecond)
buffer.Write([]byte("Starting..."))
time.Sleep(200 * time.Millisecond)
buffer.Write([]byte("Done!"))
buffer.Close()
}()
// Wait up to 1 second for "Starting" to appear
Eventually(buffer, 1*time.Second).Should(gbytes.Say("Starting"))
// Wait for completion
Eventually(buffer).Should(gbytes.Say("Done"))import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
// Create buffer with initial content
buffer := gbytes.BufferWithBytes([]byte("line1\nline2\nline3"))
// Read cursor starts at beginning
Expect(buffer).Should(gbytes.Say("line1"))
Expect(buffer).Should(gbytes.Say("line2"))
Expect(buffer).Should(gbytes.Say("line3"))import (
"bytes"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
// Wrap a reader - contents are copied in background
reader := bytes.NewBufferString("data from reader")
buffer := gbytes.BufferReader(reader)
// Buffer is populated asynchronously
Eventually(buffer).Should(gbytes.Say("data from reader"))
// Buffer closes automatically when reader reaches EOF
Eventually(buffer.Closed).Should(BeTrue())import (
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
// Start background writer
go func() {
time.Sleep(100 * time.Millisecond)
buffer.Write([]byte("You are not logged in"))
}()
// Branch based on what appears in the buffer
select {
case <-buffer.Detect("You are not logged in"):
// Handle authentication
performLogin()
case <-buffer.Detect("Success"):
// Already authenticated
continueOperation()
case <-time.After(1 * time.Second):
// Timeout - neither pattern appeared
handleTimeout()
}
// Always cancel detects to clean up goroutines
buffer.CancelDetects()import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
buffer.Write([]byte("Error: connection timeout on port 8080"))
// Regular expression patterns
Expect(buffer).Should(gbytes.Say("Error:"))
Expect(buffer).Should(gbytes.Say(`\d+`)) // Matches "8080"
// Format string with arguments
port := 8080
Eventually(buffer).Should(gbytes.Say("port %d", port))import (
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
// Start writer that produces safe output
go func() {
for i := 0; i < 10; i++ {
time.Sleep(50 * time.Millisecond)
buffer.Write([]byte("safe output\n"))
}
}()
// Assert that "ERROR" never appears for 1 second
Consistently(buffer, 1*time.Second).ShouldNot(gbytes.Say("ERROR"))import (
"bytes"
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
// Wrap a slow reader with a 100ms timeout
slowReader := &SlowReader{delay: 200 * time.Millisecond}
timedReader := gbytes.TimeoutReader(slowReader, 100*time.Millisecond)
// Reading will timeout
buffer := make([]byte, 10)
n, err := timedReader.Read(buffer)
Expect(err).To(Equal(gbytes.ErrTimeout))
Expect(n).To(Equal(0))
// Similarly for writers and closers
timedWriter := gbytes.TimeoutWriter(writer, 100*time.Millisecond)
timedCloser := gbytes.TimeoutCloser(closer, 100*time.Millisecond)import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
buffer.Write([]byte("first test"))
Expect(buffer).Should(gbytes.Say("first"))
// Clear buffer and read cursor for reuse
err := buffer.Clear()
Expect(err).NotTo(HaveOccurred())
Expect(buffer.Contents()).To(BeEmpty())
buffer.Write([]byte("second test"))
Expect(buffer).Should(gbytes.Say("second"))import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
)
// gexec.Session implements BufferProvider via its Out field
session, err := gexec.Start(command, nil, nil)
Expect(err).NotTo(HaveOccurred())
// Say works directly with Session because it's a BufferProvider
Eventually(session).Should(gbytes.Say("Server started"))
Eventually(session).Should(gbytes.Say("Listening on :8080"))import (
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
// Watch for multiple patterns simultaneously
errorChan := buffer.Detect("ERROR:")
warnChan := buffer.Detect("WARN:")
successChan := buffer.Detect("SUCCESS")
go func() {
time.Sleep(100 * time.Millisecond)
buffer.Write([]byte("WARN: deprecated API used"))
}()
select {
case <-errorChan:
Fail("Unexpected error")
case <-warnChan:
// Expected warning received
Expect(buffer.Contents()).To(ContainSubstring("deprecated"))
case <-successChan:
// Success appeared
case <-time.After(1 * time.Second):
Fail("Timeout waiting for output")
}
buffer.CancelDetects()import (
"io"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.BufferWithBytes([]byte("abcdefghij"))
// Read advances the cursor like Say does
dest := make([]byte, 3)
n, err := buffer.Read(dest)
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(3))
Expect(string(dest)).To(Equal("abc"))
// Next read continues from cursor position
n, err = buffer.Read(dest)
Expect(err).NotTo(HaveOccurred())
Expect(string(dest)).To(Equal("def"))
// Say operates on remaining unread data
Expect(buffer).Should(gbytes.Say("ghij"))
// Reading at end returns EOF
n, err = buffer.Read(dest)
Expect(err).To(Equal(io.EOF))
Expect(n).To(Equal(0))import (
"os/exec"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
cmd := exec.Command("ls", "-la")
cmd.Stdout = buffer
err := cmd.Start()
Expect(err).NotTo(HaveOccurred())
Eventually(buffer).Should(gbytes.Say("total"))
Eventually(buffer).Should(gbytes.Say("drwx"))
err = cmd.Wait()
Expect(err).NotTo(HaveOccurred())import (
"log"
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
buffer := gbytes.NewBuffer()
logger := log.New(buffer, "", 0)
// Start server with logger
go startServer(logger)
// Verify startup sequence
Eventually(buffer, 5*time.Second).Should(gbytes.Say("Initializing"))
Eventually(buffer).Should(gbytes.Say("Database connected"))
Eventually(buffer).Should(gbytes.Say("Server listening on :8080"))
// Ensure no errors during startup
Consistently(buffer, 2*time.Second).ShouldNot(gbytes.Say("ERROR"))import (
"time"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
// Program that prompts for input
stdout := gbytes.NewBuffer()
stdin := gbytes.NewBuffer()
go runInteractiveProgram(stdin, stdout)
// Wait for prompt
Eventually(stdout).Should(gbytes.Say("Enter username:"))
// Provide input
stdin.Write([]byte("testuser\n"))
// Check response
Eventually(stdout).Should(gbytes.Say("Hello, testuser"))Always use Say with Eventually/Consistently: The Say matcher is designed to work with asynchronous assertions when testing streaming output.
Call CancelDetects after Detect: When using Detect, always call CancelDetects() afterward to clean up background goroutines and prevent leaks.
Don't use in production: The Buffer type stores all writes in memory indefinitely, making it suitable only for testing scenarios.
Use Contents() for full inspection: When you need to see all buffer data regardless of the read cursor position, use Contents() rather than Read().
Close buffers when done: Call Close() on buffers when no more data will be written. This allows Eventually to stop polling early when patterns aren't found.
Regular expressions: Remember that Say uses regular expression matching. Escape special regex characters when matching literal strings.
Format strings: Both Say and Detect support format strings, making it easy to match dynamic patterns: Say("port %d", expectedPort).
Buffer with readers: When using BufferReader, the buffer is automatically closed when the reader reaches EOF, which signals Eventually to stop polling.
gexec.Session type implements BufferProvider and provides Out and Err buffers for testing process executionEventually and Consistently work seamlessly with gbytes buffersGomegaMatcher interface that Say implements