CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cocotb

Cocotb is a coroutine-based cosimulation library for writing VHDL and Verilog testbenches in Python.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

clock-generation.mddocs/

Clock Generation

Cocotb's clock generation system provides utilities for creating periodic signals with configurable periods, duty cycles, and phase relationships. The clock system integrates with the trigger system to enable precise timing control in testbenches.

Capabilities

Basic Clock Generation

Simple clock generation with 50% duty cycle and configurable period.

class Clock(signal, period, units="step"):
    """
    Simple 50:50 duty cycle clock generator.

    Parameters:
    - signal: HDL signal to drive as clock
    - period: Clock period value
    - units: Time units ("fs", "ps", "ns", "us", "ms", "sec", "step")

    Methods:
    - start(cycles=None, start_high=True): Start clock generation
    """
    
    async def start(self, cycles=None, start_high=True):
        """
        Start clock generation task.

        Parameters:
        - cycles: Number of cycles to generate (None for infinite)
        - start_high: True to start with high level, False for low

        Returns:
        Task object for the clock generation coroutine
        """

Usage Examples:

@cocotb.test()
async def basic_clock_test(dut):
    """Test demonstrating basic clock generation."""
    
    # Create 100MHz clock (10ns period)
    clock = Clock(dut.clk, 10, units="ns")
    
    # Start infinite clock
    clock_task = cocotb.start_soon(clock.start())
    
    # Run test with clock running
    await Timer(1000, units="ns")  # 100 clock cycles
    
    # Stop clock
    clock_task.kill()

@cocotb.test()
async def finite_clock_test(dut):
    """Test with finite number of clock cycles."""
    
    # Create clock
    clock = Clock(dut.clk, 20, units="ns")  # 50MHz
    
    # Generate exactly 50 cycles
    await clock.start(cycles=50, start_high=True)
    
    # Clock automatically stops after 50 cycles
    cocotb.log.info("50 clock cycles completed")

@cocotb.test()
async def multiple_clocks_test(dut):
    """Test with multiple independent clocks."""
    
    # Create multiple clocks with different frequencies
    main_clock = Clock(dut.main_clk, 10, units="ns")    # 100MHz
    slow_clock = Clock(dut.slow_clk, 100, units="ns")   # 10MHz
    fast_clock = Clock(dut.fast_clk, 2, units="ns")     # 500MHz
    
    # Start all clocks
    main_task = cocotb.start_soon(main_clock.start())
    slow_task = cocotb.start_soon(slow_clock.start())
    fast_task = cocotb.start_soon(fast_clock.start())
    
    # Run test
    await Timer(2000, units="ns")
    
    # Stop all clocks
    main_task.kill()
    slow_task.kill()
    fast_task.kill()

Advanced Clock Generation

Base class for custom clock implementations with advanced features.

class BaseClock:
    """
    Abstract base class for clock generators.
    
    Provides foundation for custom clock implementations
    with non-standard duty cycles, jitter, or phase relationships.
    """

Usage Examples:

class CustomClock(BaseClock):
    """Custom clock with configurable duty cycle."""
    
    def __init__(self, signal, period, duty_cycle=0.5, units="step"):
        self.signal = signal
        self.period = period
        self.duty_cycle = duty_cycle
        self.units = units
        self.high_time = period * duty_cycle
        self.low_time = period * (1 - duty_cycle)
    
    async def start(self, cycles=None, start_high=True):
        """Start custom clock generation."""
        cycle_count = 0
        current_level = start_high
        
        while cycles is None or cycle_count < cycles:
            self.signal.value = 1 if current_level else 0
            
            if current_level:
                await Timer(self.high_time, units=self.units)
            else:
                await Timer(self.low_time, units=self.units)
                cycle_count += 1  # Count on falling edge
            
            current_level = not current_level

@cocotb.test()
async def custom_clock_test(dut):
    """Test demonstrating custom clock generation."""
    
    # Create clock with 25% duty cycle
    clock_25 = CustomClock(dut.clk_25, 100, duty_cycle=0.25, units="ns")
    
    # Create clock with 75% duty cycle  
    clock_75 = CustomClock(dut.clk_75, 100, duty_cycle=0.75, units="ns")
    
    # Start custom clocks
    task_25 = cocotb.start_soon(clock_25.start(cycles=20))
    task_75 = cocotb.start_soon(clock_75.start(cycles=20))
    
    # Wait for completion
    await task_25.join()
    await task_75.join()
    
    cocotb.log.info("Custom clocks completed")

Clock Control Patterns

Conditional Clock Generation

class ConditionalClock:
    """Clock that can be enabled/disabled during operation."""
    
    def __init__(self, signal, period, units="step"):
        self.signal = signal
        self.period = period
        self.units = units
        self.enabled = False
    
    def enable(self):
        """Enable clock generation."""
        self.enabled = True
    
    def disable(self):
        """Disable clock generation."""
        self.enabled = False
    
    async def start(self):
        """Start conditional clock generation."""
        while True:
            if self.enabled:
                self.signal.value = 1
                await Timer(self.period / 2, units=self.units)
                self.signal.value = 0
                await Timer(self.period / 2, units=self.units)
            else:
                await Timer(self.period / 4, units=self.units)  # Check enable more frequently

@cocotb.test()
async def conditional_clock_test(dut):
    """Test demonstrating conditional clock control."""
    
    # Create conditional clock
    cond_clock = ConditionalClock(dut.gated_clk, 20, units="ns")
    clock_task = cocotb.start_soon(cond_clock.start())
    
    # Initially disabled
    await Timer(100, units="ns")
    cocotb.log.info("Clock should be stopped")
    
    # Enable clock
    cond_clock.enable()
    await Timer(200, units="ns")
    cocotb.log.info("Clock should be running")
    
    # Disable clock
    cond_clock.disable()
    await Timer(100, units="ns")
    cocotb.log.info("Clock should be stopped again")
    
    # Clean up
    clock_task.kill()

Phase-Shifted Clocks

class PhaseShiftedClock:
    """Clock generator with configurable phase shift."""
    
    def __init__(self, signal, period, phase_shift=0, units="step"):
        self.signal = signal
        self.period = period
        self.phase_shift = phase_shift
        self.units = units
    
    async def start(self, cycles=None):
        """Start phase-shifted clock generation."""
        
        # Apply initial phase shift
        if self.phase_shift > 0:
            await Timer(self.phase_shift, units=self.units)
        
        # Generate clock
        cycle_count = 0
        while cycles is None or cycle_count < cycles:
            self.signal.value = 1
            await Timer(self.period / 2, units=self.units)
            
            self.signal.value = 0
            await Timer(self.period / 2, units=self.units)
            
            cycle_count += 1

@cocotb.test()
async def phase_shifted_test(dut):
    """Test demonstrating phase-shifted clocks."""
    
    period = 20  # 50MHz base frequency
    
    # Create phase-shifted clocks
    clk_0 = PhaseShiftedClock(dut.clk_0, period, phase_shift=0, units="ns")
    clk_90 = PhaseShiftedClock(dut.clk_90, period, phase_shift=period/4, units="ns")
    clk_180 = PhaseShiftedClock(dut.clk_180, period, phase_shift=period/2, units="ns")
    clk_270 = PhaseShiftedClock(dut.clk_270, period, phase_shift=3*period/4, units="ns")
    
    # Start all clocks
    tasks = [
        cocotb.start_soon(clk_0.start(cycles=10)),
        cocotb.start_soon(clk_90.start(cycles=10)),
        cocotb.start_soon(clk_180.start(cycles=10)),
        cocotb.start_soon(clk_270.start(cycles=10))
    ]
    
    # Wait for all to complete
    for task in tasks:
        await task.join()
    
    cocotb.log.info("Phase-shifted clocks completed")

Clock Domain Crossing

@cocotb.test()
async def clock_domain_crossing_test(dut):
    """Test demonstrating clock domain crossing scenarios."""
    
    # Create clocks for different domains
    fast_clock = Clock(dut.fast_clk, 5, units="ns")    # 200MHz
    slow_clock = Clock(dut.slow_clk, 50, units="ns")   # 20MHz
    
    # Start clocks
    fast_task = cocotb.start_soon(fast_clock.start())
    slow_task = cocotb.start_soon(slow_clock.start())
    
    # Test data transfer from fast to slow domain
    await transfer_fast_to_slow(dut)
    
    # Test data transfer from slow to fast domain
    await transfer_slow_to_fast(dut)
    
    # Clean up
    fast_task.kill()
    slow_task.kill()

async def transfer_fast_to_slow(dut):
    """Transfer data from fast clock domain to slow clock domain."""
    
    # Synchronize to fast clock and send data
    await RisingEdge(dut.fast_clk)
    dut.fast_data.value = 0xABCD
    dut.fast_valid.value = 1
    
    await RisingEdge(dut.fast_clk)
    dut.fast_valid.value = 0
    
    # Wait for slow domain to capture (may take several slow cycles)
    timeout_cycles = 10
    for _ in range(timeout_cycles):
        await RisingEdge(dut.slow_clk)
        if dut.slow_data_valid.value:
            break
    else:
        raise AssertionError("Data not transferred to slow domain")
    
    # Verify data integrity
    assert dut.slow_data.value == 0xABCD
    cocotb.log.info("Fast-to-slow transfer successful")

async def transfer_slow_to_fast(dut):
    """Transfer data from slow clock domain to fast clock domain."""
    
    # Synchronize to slow clock and send data
    await RisingEdge(dut.slow_clk)
    dut.slow_data.value = 0x1234
    dut.slow_valid.value = 1
    
    await RisingEdge(dut.slow_clk)
    dut.slow_valid.value = 0
    
    # Fast domain should capture quickly
    await RisingEdge(dut.fast_clk)
    await RisingEdge(dut.fast_clk)  # Allow for synchronizer delay
    
    if dut.fast_data_valid.value:
        assert dut.fast_data.value == 0x1234
        cocotb.log.info("Slow-to-fast transfer successful")
    else:
        cocotb.log.warning("Slow-to-fast transfer not detected")

Clock Utilities

Clock Measurement

class ClockMeasurer:
    """Utility for measuring actual clock characteristics."""
    
    def __init__(self, signal):
        self.signal = signal
        self.measurements = []
    
    async def measure_period(self, num_cycles=10):
        """Measure actual clock period over multiple cycles."""
        
        edge_times = []
        
        # Collect edge timestamps
        for _ in range(num_cycles + 1):
            await RisingEdge(self.signal)
            edge_times.append(get_sim_time("ps"))  # High precision
        
        # Calculate periods
        periods = []
        for i in range(1, len(edge_times)):
            period = edge_times[i] - edge_times[i-1]
            periods.append(period)
        
        # Statistics
        avg_period = sum(periods) / len(periods)
        min_period = min(periods)
        max_period = max(periods)
        jitter = max_period - min_period
        
        result = {
            'average_period_ps': avg_period,
            'min_period_ps': min_period,
            'max_period_ps': max_period,
            'jitter_ps': jitter,
            'frequency_mhz': 1e6 / avg_period
        }
        
        self.measurements.append(result)
        return result

@cocotb.test()
async def clock_measurement_test(dut):
    """Test demonstrating clock measurement."""
    
    # Start clock under test
    clock = Clock(dut.clk, 10, units="ns")  # Nominal 100MHz
    clock_task = cocotb.start_soon(clock.start())
    
    # Measure actual characteristics
    measurer = ClockMeasurer(dut.clk)
    result = await measurer.measure_period(num_cycles=20)
    
    # Report results
    cocotb.log.info(f"Measured frequency: {result['frequency_mhz']:.2f} MHz")
    cocotb.log.info(f"Average period: {result['average_period_ps']:.1f} ps")
    cocotb.log.info(f"Jitter: {result['jitter_ps']:.1f} ps")
    
    # Verify within tolerance
    expected_freq = 100.0  # MHz
    tolerance = 0.1  # MHz
    
    assert abs(result['frequency_mhz'] - expected_freq) < tolerance, \
           f"Frequency error too large: {result['frequency_mhz']} MHz"
    
    # Clean up
    clock_task.kill()

Clock Synchronization

async def wait_for_clock_edges(clock_signal, num_edges):
    """Wait for specific number of clock edges."""
    
    for _ in range(num_edges):
        await RisingEdge(clock_signal)

async def synchronize_to_clock(clock_signal):
    """Synchronize current coroutine to clock edge."""
    
    await RisingEdge(clock_signal)

@cocotb.test()
async def clock_sync_test(dut):
    """Test demonstrating clock synchronization utilities."""
    
    # Start system clock
    sys_clock = Clock(dut.sys_clk, 8, units="ns")  # 125MHz
    clock_task = cocotb.start_soon(sys_clock.start())
    
    # Perform clock-synchronized operations
    await synchronize_to_clock(dut.sys_clk)
    cocotb.log.info("Synchronized to system clock")
    
    # Wait for specific number of cycles
    await wait_for_clock_edges(dut.sys_clk, 16)
    cocotb.log.info("16 clock cycles elapsed")
    
    # Use built-in ClockCycles trigger
    await ClockCycles(dut.sys_clk, 8)
    cocotb.log.info("8 more clock cycles elapsed")
    
    # Clean up
    clock_task.kill()

Best Practices

Clock Management in Large Tests

class ClockManager:
    """Centralized clock management for complex testbenches."""
    
    def __init__(self):
        self.clocks = {}
        self.tasks = {}
    
    def add_clock(self, name, signal, period, units="ns"):
        """Add a clock to the manager."""
        clock = Clock(signal, period, units)
        self.clocks[name] = clock
    
    async def start_all_clocks(self):
        """Start all managed clocks."""
        for name, clock in self.clocks.items():
            task = cocotb.start_soon(clock.start())
            self.tasks[name] = task
            cocotb.log.info(f"Started clock: {name}")
    
    def stop_all_clocks(self):
        """Stop all managed clocks."""
        for name, task in self.tasks.items():
            task.kill()
            cocotb.log.info(f"Stopped clock: {name}")
        self.tasks.clear()
    
    def stop_clock(self, name):
        """Stop specific clock."""
        if name in self.tasks:
            self.tasks[name].kill()
            del self.tasks[name]
            cocotb.log.info(f"Stopped clock: {name}")

@cocotb.test()
async def managed_clocks_test(dut):
    """Test demonstrating centralized clock management."""
    
    # Create clock manager
    clock_mgr = ClockManager()
    
    # Add clocks
    clock_mgr.add_clock("system", dut.sys_clk, 10, "ns")     # 100MHz
    clock_mgr.add_clock("memory", dut.mem_clk, 5, "ns")      # 200MHz
    clock_mgr.add_clock("interface", dut.if_clk, 20, "ns")   # 50MHz
    
    # Start all clocks
    await clock_mgr.start_all_clocks()
    
    # Run test with all clocks active
    await Timer(1000, units="ns")
    
    # Stop specific clock for power testing
    clock_mgr.stop_clock("interface")
    await Timer(500, units="ns")
    
    # Stop all clocks
    clock_mgr.stop_all_clocks()

Install with Tessl CLI

npx tessl i tessl/pypi-cocotb

docs

binary-types.md

clock-generation.md

index.md

logging-utilities.md

signal-handling.md

task-management.md

test-framework.md

triggers-timing.md

tile.json