CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-opentelemetry--opentelemetry-context

OpenTelemetry Context propagation mechanism for carrying scoped values across API boundaries and between threads in Java applications

Pending
Overview
Eval results
Files

executor-integration.mddocs/

Executor Integration

Context executor integration provides automatic context propagation across asynchronous operations by wrapping Java executor services. This ensures that context values remain available in background threads and scheduled tasks.

Static Executor Wrapping

Static methods wrap executors to automatically propagate the current context at the time of task submission.

Executor Wrapping

Wraps an Executor to propagate the current context.

static Executor taskWrapping(Executor executor);

Parameters:

  • executor - The executor to wrap

Returns: An Executor that propagates current context to submitted tasks

Usage Example:

// Create context-aware executor
Executor executor = Executors.newFixedThreadPool(4);
Executor contextExecutor = Context.taskWrapping(executor);

// Context at submission time is propagated
Context.current().with(USER_ID_KEY, "user123").makeCurrent();

contextExecutor.execute(() -> {
    // This runs with the context from submission time
    String userId = Context.current().get(USER_ID_KEY); // "user123"
    processUser(userId);
});

ExecutorService Wrapping

Wraps an ExecutorService to propagate current context to all submitted tasks.

static ExecutorService taskWrapping(ExecutorService executorService);

Parameters:

  • executorService - The executor service to wrap

Returns: An ExecutorService that propagates current context

Usage Example:

ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService contextExecutorService = Context.taskWrapping(executorService);

Context.current().with(REQUEST_ID_KEY, "req-456").makeCurrent();

// Submit callable with context propagation
Future<String> future = contextExecutorService.submit(() -> {
    String requestId = Context.current().get(REQUEST_ID_KEY); // "req-456"
    return processRequest(requestId);
});

String result = future.get();

ScheduledExecutorService Wrapping

Wraps a ScheduledExecutorService to propagate context to scheduled tasks.

static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService);

Parameters:

  • executorService - The scheduled executor service to wrap

Returns: A ScheduledExecutorService that propagates current context

Usage Example:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
ScheduledExecutorService contextScheduler = Context.taskWrapping(scheduler);

Context.current().with(JOB_ID_KEY, "job-789").makeCurrent();

// Schedule task with context propagation
ScheduledFuture<?> future = contextScheduler.schedule(() -> {
    String jobId = Context.current().get(JOB_ID_KEY); // "job-789"
    executeScheduledJob(jobId);
}, 5, TimeUnit.SECONDS);

Note: Context is NOT propagated for scheduleAtFixedRate() and scheduleWithFixedDelay() calls due to their repeated execution nature.

Instance Executor Wrapping

Instance methods wrap executors to propagate a specific context rather than the current context.

Instance Executor Wrapping

Wraps an Executor to propagate this specific context.

Executor wrap(Executor executor);

Usage Example:

// Create specific context
Context userContext = Context.current().with(USER_ID_KEY, "admin");

// Wrap executor with specific context
Executor executor = Executors.newSingleThreadExecutor();
Executor contextExecutor = userContext.wrap(executor);

// Tasks always run with the admin context, regardless of current context
Context.current().with(USER_ID_KEY, "guest").makeCurrent();

contextExecutor.execute(() -> {
    String userId = Context.current().get(USER_ID_KEY); // "admin" (not "guest")
    performAdminTask(userId);
});

Instance ExecutorService Wrapping

Wraps an ExecutorService to propagate this specific context.

ExecutorService wrap(ExecutorService executorService);

Usage Example:

Context backgroundContext = Context.current()
    .with(USER_ID_KEY, "system")
    .with(OPERATION_KEY, "background");

ExecutorService executorService = Executors.newFixedThreadPool(4);
ExecutorService contextExecutorService = backgroundContext.wrap(executorService);

// All submissions use background context
List<Future<String>> futures = new ArrayList<>();
for (String task : tasks) {
    Future<String> future = contextExecutorService.submit(() -> {
        // Always runs with system user context
        String userId = Context.current().get(USER_ID_KEY); // "system"
        return processBackgroundTask(task);
    });
    futures.add(future);
}

Instance ScheduledExecutorService Wrapping

Wraps a ScheduledExecutorService to propagate this specific context.

ScheduledExecutorService wrap(ScheduledExecutorService executorService);

Usage Example:

Context maintenanceContext = Context.current()
    .with(USER_ID_KEY, "maintenance")
    .with(OPERATION_KEY, "cleanup");

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledExecutorService contextScheduler = maintenanceContext.wrap(scheduler);

// Schedule maintenance tasks with specific context
contextScheduler.scheduleAtFixedRate(() -> {
    String userId = Context.current().get(USER_ID_KEY); // "maintenance"
    performMaintenance();
}, 0, 1, TimeUnit.HOURS);

Executor Patterns

Mixed Context Usage

public class TaskProcessor {
    private final ExecutorService backgroundExecutor;
    private final ExecutorService userExecutor;
    
    public TaskProcessor() {
        // Different executors for different contexts
        ExecutorService baseExecutor = Executors.newFixedThreadPool(8);
        
        // Background tasks run with system context
        Context systemContext = Context.current().with(USER_ID_KEY, "system");
        this.backgroundExecutor = systemContext.wrap(baseExecutor);
        
        // User tasks propagate current context
        this.userExecutor = Context.taskWrapping(baseExecutor);
    }
    
    public void processUserTask(Runnable task) {
        // Uses current context
        userExecutor.execute(task);
    }
    
    public void processBackgroundTask(Runnable task) {
        // Always uses system context
        backgroundExecutor.execute(task);
    }
}

Context-Aware Thread Pool

public class ContextAwareService {
    private final ExecutorService executor;
    
    public ContextAwareService() {
        ExecutorService baseExecutor = Executors.newWorkStealingPool();
        this.executor = Context.taskWrapping(baseExecutor);
    }
    
    public CompletableFuture<String> processAsync(String input) {
        // Context from calling thread is automatically propagated
        return CompletableFuture.supplyAsync(() -> {
            // Process with propagated context
            String userId = Context.current().get(USER_ID_KEY);
            return processWithUser(input, userId);
        }, executor);
    }
    
    public void shutdown() {
        executor.shutdown();
    }
}

Error Handling with Context Executors

public void handleAsyncErrors() {
    ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
    
    Context errorContext = Context.current().with(OPERATION_KEY, "error-prone");
    
    try (Scope scope = errorContext.makeCurrent()) {
        Future<String> future = executor.submit(() -> {
            try {
                // Context available in error handling
                String operation = Context.current().get(OPERATION_KEY);
                return performRiskyOperation();
            } catch (Exception e) {
                // Log with context information
                String operation = Context.current().get(OPERATION_KEY);
                logger.error("Operation {} failed", operation, e);
                throw e;
            }
        });
        
        try {
            String result = future.get();
        } catch (ExecutionException e) {
            // Handle wrapped exception
            handleError(e.getCause());
        }
    }
}

Performance Considerations

  • Wrapped executors have minimal overhead - they simply wrap submitted tasks
  • Context propagation happens at task submission time, not execution time
  • Avoid creating multiple wrappers around the same executor
  • Reuse wrapped executors when possible
// Good: Create wrapper once, reuse
private static final ExecutorService CONTEXT_EXECUTOR = 
    Context.taskWrapping(Executors.newFixedThreadPool(10));

// Avoid: Creating wrapper for each use
public void badPattern() {
    ExecutorService wrapped = Context.taskWrapping(someExecutor); // Don't do this repeatedly
    wrapped.execute(task);
}

Install with Tessl CLI

npx tessl i tessl/maven-io-opentelemetry--opentelemetry-context

docs

context-keys-scoping.md

context-propagation.md

core-context.md

executor-integration.md

function-wrapping.md

implicit-context-values.md

index.md

storage-customization.md

tile.json