OpenTelemetry Context propagation mechanism for carrying scoped values across API boundaries and between threads in Java applications
—
Context executor integration provides automatic context propagation across asynchronous operations by wrapping Java executor services. This ensures that context values remain available in background threads and scheduled tasks.
Static methods wrap executors to automatically propagate the current context at the time of task submission.
Wraps an Executor to propagate the current context.
static Executor taskWrapping(Executor executor);Parameters:
executor - The executor to wrapReturns: An Executor that propagates current context to submitted tasks
Usage Example:
// Create context-aware executor
Executor executor = Executors.newFixedThreadPool(4);
Executor contextExecutor = Context.taskWrapping(executor);
// Context at submission time is propagated
Context.current().with(USER_ID_KEY, "user123").makeCurrent();
contextExecutor.execute(() -> {
// This runs with the context from submission time
String userId = Context.current().get(USER_ID_KEY); // "user123"
processUser(userId);
});Wraps an ExecutorService to propagate current context to all submitted tasks.
static ExecutorService taskWrapping(ExecutorService executorService);Parameters:
executorService - The executor service to wrapReturns: An ExecutorService that propagates current context
Usage Example:
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService contextExecutorService = Context.taskWrapping(executorService);
Context.current().with(REQUEST_ID_KEY, "req-456").makeCurrent();
// Submit callable with context propagation
Future<String> future = contextExecutorService.submit(() -> {
String requestId = Context.current().get(REQUEST_ID_KEY); // "req-456"
return processRequest(requestId);
});
String result = future.get();Wraps a ScheduledExecutorService to propagate context to scheduled tasks.
static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService);Parameters:
executorService - The scheduled executor service to wrapReturns: A ScheduledExecutorService that propagates current context
Usage Example:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
ScheduledExecutorService contextScheduler = Context.taskWrapping(scheduler);
Context.current().with(JOB_ID_KEY, "job-789").makeCurrent();
// Schedule task with context propagation
ScheduledFuture<?> future = contextScheduler.schedule(() -> {
String jobId = Context.current().get(JOB_ID_KEY); // "job-789"
executeScheduledJob(jobId);
}, 5, TimeUnit.SECONDS);Note: Context is NOT propagated for scheduleAtFixedRate() and scheduleWithFixedDelay() calls due to their repeated execution nature.
Instance methods wrap executors to propagate a specific context rather than the current context.
Wraps an Executor to propagate this specific context.
Executor wrap(Executor executor);Usage Example:
// Create specific context
Context userContext = Context.current().with(USER_ID_KEY, "admin");
// Wrap executor with specific context
Executor executor = Executors.newSingleThreadExecutor();
Executor contextExecutor = userContext.wrap(executor);
// Tasks always run with the admin context, regardless of current context
Context.current().with(USER_ID_KEY, "guest").makeCurrent();
contextExecutor.execute(() -> {
String userId = Context.current().get(USER_ID_KEY); // "admin" (not "guest")
performAdminTask(userId);
});Wraps an ExecutorService to propagate this specific context.
ExecutorService wrap(ExecutorService executorService);Usage Example:
Context backgroundContext = Context.current()
.with(USER_ID_KEY, "system")
.with(OPERATION_KEY, "background");
ExecutorService executorService = Executors.newFixedThreadPool(4);
ExecutorService contextExecutorService = backgroundContext.wrap(executorService);
// All submissions use background context
List<Future<String>> futures = new ArrayList<>();
for (String task : tasks) {
Future<String> future = contextExecutorService.submit(() -> {
// Always runs with system user context
String userId = Context.current().get(USER_ID_KEY); // "system"
return processBackgroundTask(task);
});
futures.add(future);
}Wraps a ScheduledExecutorService to propagate this specific context.
ScheduledExecutorService wrap(ScheduledExecutorService executorService);Usage Example:
Context maintenanceContext = Context.current()
.with(USER_ID_KEY, "maintenance")
.with(OPERATION_KEY, "cleanup");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledExecutorService contextScheduler = maintenanceContext.wrap(scheduler);
// Schedule maintenance tasks with specific context
contextScheduler.scheduleAtFixedRate(() -> {
String userId = Context.current().get(USER_ID_KEY); // "maintenance"
performMaintenance();
}, 0, 1, TimeUnit.HOURS);public class TaskProcessor {
private final ExecutorService backgroundExecutor;
private final ExecutorService userExecutor;
public TaskProcessor() {
// Different executors for different contexts
ExecutorService baseExecutor = Executors.newFixedThreadPool(8);
// Background tasks run with system context
Context systemContext = Context.current().with(USER_ID_KEY, "system");
this.backgroundExecutor = systemContext.wrap(baseExecutor);
// User tasks propagate current context
this.userExecutor = Context.taskWrapping(baseExecutor);
}
public void processUserTask(Runnable task) {
// Uses current context
userExecutor.execute(task);
}
public void processBackgroundTask(Runnable task) {
// Always uses system context
backgroundExecutor.execute(task);
}
}public class ContextAwareService {
private final ExecutorService executor;
public ContextAwareService() {
ExecutorService baseExecutor = Executors.newWorkStealingPool();
this.executor = Context.taskWrapping(baseExecutor);
}
public CompletableFuture<String> processAsync(String input) {
// Context from calling thread is automatically propagated
return CompletableFuture.supplyAsync(() -> {
// Process with propagated context
String userId = Context.current().get(USER_ID_KEY);
return processWithUser(input, userId);
}, executor);
}
public void shutdown() {
executor.shutdown();
}
}public void handleAsyncErrors() {
ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
Context errorContext = Context.current().with(OPERATION_KEY, "error-prone");
try (Scope scope = errorContext.makeCurrent()) {
Future<String> future = executor.submit(() -> {
try {
// Context available in error handling
String operation = Context.current().get(OPERATION_KEY);
return performRiskyOperation();
} catch (Exception e) {
// Log with context information
String operation = Context.current().get(OPERATION_KEY);
logger.error("Operation {} failed", operation, e);
throw e;
}
});
try {
String result = future.get();
} catch (ExecutionException e) {
// Handle wrapped exception
handleError(e.getCause());
}
}
}// Good: Create wrapper once, reuse
private static final ExecutorService CONTEXT_EXECUTOR =
Context.taskWrapping(Executors.newFixedThreadPool(10));
// Avoid: Creating wrapper for each use
public void badPattern() {
ExecutorService wrapped = Context.taskWrapping(someExecutor); // Don't do this repeatedly
wrapped.execute(task);
}Install with Tessl CLI
npx tessl i tessl/maven-io-opentelemetry--opentelemetry-context