CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tensorboardx

TensorBoardX lets you watch Tensors Flow without Tensorflow

Overview
Eval results
Files

global-writer.mddocs/

Global Writer

Thread-safe writer with automatic step incrementing for concurrent logging across processes and threads. Simplifies multi-threaded experiment tracking by eliminating manual step management and providing process-safe singleton access.

Capabilities

Initialization

Creates a GlobalSummaryWriter instance with thread-safe configuration and automatic step management.

class GlobalSummaryWriter:
    def __init__(
        self,
        logdir: Optional[str] = None,
        comment: str = '',
        purge_step: Optional[int] = None,
        max_queue: int = 10,
        flush_secs: int = 120,
        filename_suffix: str = '',
        write_to_disk: bool = True,
        log_dir: Optional[str] = None,
        coalesce_process: bool = True
    ):
        """
        Creates a GlobalSummaryWriter for thread-safe logging.

        Parameters:
        - logdir: Save directory location (default creates timestamped directory)
        - comment: Comment suffix for logdir
        - purge_step: Step to purge crashed events from
        - max_queue: Queue size for pending events (default: 10)
        - flush_secs: Seconds between flushes (default: 120)
        - filename_suffix: Suffix for event filenames
        - write_to_disk: Whether to write files to disk
        - log_dir: Deprecated alias for logdir
        - coalesce_process: Whether to coalesce events from same process
        """

Auto-Incrementing Logging

Log data with automatic step incrementing, eliminating the need for manual step management in concurrent environments.

def add_scalar(
    self,
    tag: str,
    scalar_value,
    walltime: Optional[float] = None
):
    """
    Add scalar data with automatic step incrementing.

    Parameters:
    - tag: Data identifier (e.g., 'Loss/Train')
    - scalar_value: Value to record (float, int, or 0-d tensor)
    - walltime: Timestamp (uses current time if None)
    """

def add_image(
    self,
    tag: str,
    img_tensor,
    walltime: Optional[float] = None,
    dataformats: str = 'CHW'
):
    """
    Add image data with automatic step incrementing.

    Parameters:
    - tag: Data identifier
    - img_tensor: Image tensor (torch.Tensor, numpy.ndarray, or PIL Image)
    - walltime: Timestamp (uses current time if None)
    - dataformats: Tensor format ('CHW', 'HWC', 'HW')
    """

def add_text(
    self,
    tag: str,
    text_string: str,
    walltime: Optional[float] = None
):
    """
    Add text data with automatic step incrementing.

    Parameters:
    - tag: Data identifier
    - text_string: Text content (supports markdown)
    - walltime: Timestamp (uses current time if None)
    """

Writer Management

Control writer lifecycle and access the singleton instance across processes.

def close(self):
    """
    Close the writer and flush all data to disk.
    """

@staticmethod
def getSummaryWriter() -> 'GlobalSummaryWriter':
    """
    Get the global writer singleton instance.
    Creates a new instance if none exists.

    Returns:
    GlobalSummaryWriter: The global writer instance
    """

Usage Examples

Multi-Threaded Logging

import threading
from tensorboardX import GlobalSummaryWriter
import time
import random

def worker_function(worker_id):
    """Worker function that logs data from multiple threads."""
    writer = GlobalSummaryWriter.getSummaryWriter()
    
    for i in range(10):
        # Each worker logs independently with auto-incrementing steps
        loss = random.random()
        accuracy = random.random()
        
        writer.add_scalar(f'Worker_{worker_id}/Loss', loss)
        writer.add_scalar(f'Worker_{worker_id}/Accuracy', accuracy)
        
        time.sleep(0.1)

# Create multiple threads
threads = []
for worker_id in range(5):
    thread = threading.Thread(target=worker_function, args=(worker_id,))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

# Close the global writer
GlobalSummaryWriter.getSummaryWriter().close()

Multi-Process Logging

import multiprocessing
from tensorboardX import GlobalSummaryWriter
import time
import random

def process_function(process_id):
    """Process function that logs data from multiple processes."""
    # Each process gets its own writer instance
    writer = GlobalSummaryWriter(
        logdir=f'logs/multiprocess',
        comment=f'_process_{process_id}',
        coalesce_process=True
    )
    
    for i in range(20):
        metrics = {
            'loss': random.random(),
            'accuracy': random.random(),
            'learning_rate': 0.01 * (0.9 ** i)
        }
        
        for metric_name, value in metrics.items():
            writer.add_scalar(f'Process_{process_id}/{metric_name}', value)
        
        time.sleep(0.05)
    
    writer.close()

if __name__ == '__main__':
    # Create multiple processes
    processes = []
    for process_id in range(3):
        process = multiprocessing.Process(target=process_function, args=(process_id,))
        processes.append(process)
        process.start()
    
    # Wait for all processes to complete
    for process in processes:
        process.join()

Singleton Pattern Usage

from tensorboardX import GlobalSummaryWriter

# Initialize global writer once
def initialize_logging():
    writer = GlobalSummaryWriter(
        logdir='logs/singleton_experiment',
        comment='_global_logging'
    )
    return writer

# Use anywhere in the codebase
def train_model():
    writer = GlobalSummaryWriter.getSummaryWriter()
    
    for epoch in range(100):
        loss = train_one_epoch()
        writer.add_scalar('Training/Loss', loss)

def validate_model():
    writer = GlobalSummaryWriter.getSummaryWriter()
    
    accuracy = run_validation()
    writer.add_scalar('Validation/Accuracy', accuracy)

# Initialize once at the start
initialize_logging()

# Use throughout the application
train_model()
validate_model()

# Close when done
GlobalSummaryWriter.getSummaryWriter().close()

Automatic Step Management

from tensorboardX import GlobalSummaryWriter
import time

# Create writer with automatic step management
writer = GlobalSummaryWriter('logs/auto_steps')

# Log data without specifying steps - they auto-increment
for i in range(50):
    # Steps automatically increment: 0, 1, 2, 3, ...
    writer.add_scalar('Metric_A', i * 0.1)
    writer.add_scalar('Metric_B', i * 0.2)
    
    # Even with different timing, steps stay synchronized
    if i % 5 == 0:
        writer.add_scalar('Periodic_Metric', i)
    
    time.sleep(0.1)

writer.close()

Thread Safety Features

  • Automatic Step Management: Steps increment atomically across threads
  • Process Coalescing: Events from the same process can be coalesced for efficiency
  • Singleton Access: getSummaryWriter() provides thread-safe singleton access
  • Queue Management: Thread-safe event queuing and flushing

Configuration Options

Process Coalescing

Control how events from multiple processes are handled:

# Coalesce events from same process (default: True)
writer = GlobalSummaryWriter(coalesce_process=True)

# Keep separate event streams per process
writer = GlobalSummaryWriter(coalesce_process=False)

Directory Organization

Organize logs across multiple processes and experiments:

# Base directory with process-specific comments
writer = GlobalSummaryWriter(
    logdir='logs/multi_process_experiment',
    comment=f'_process_{os.getpid()}'
)

# Automatic timestamped directories
writer = GlobalSummaryWriter()  # Creates runs/DATETIME_HOSTNAME

Install with Tessl CLI

npx tessl i tessl/pypi-tensorboardx

docs

file-writer.md

global-writer.md

index.md

record-writer.md

summary-writer.md

torchvis.md

utilities.md

tile.json