OpenTelemetry Context propagation mechanism for carrying scoped values across API boundaries and between threads in Java applications
—
Function wrapping provides context propagation utilities for various Java functional interfaces, allowing context to be automatically available when lambda expressions and method references are executed.
Wraps a Runnable to execute with this context.
Runnable wrap(Runnable runnable);Parameters:
runnable - The Runnable to wrapReturns: A Runnable that executes with the wrapping context
Usage Example:
Context userContext = Context.current().with(USER_ID_KEY, "user123");
Runnable task = userContext.wrap(() -> {
// This code runs with userContext active
String userId = Context.current().get(USER_ID_KEY); // "user123"
performUserTask(userId);
});
// Execute later - context is automatically applied
task.run();
// Can also be used with thread creation
Thread thread = new Thread(task);
thread.start();Wraps a Callable to execute with this context.
<T> Callable<T> wrap(Callable<T> callable);Parameters:
callable - The Callable to wrapReturns: A Callable that executes with the wrapping context
Usage Example:
Context requestContext = Context.current()
.with(REQUEST_ID_KEY, "req-456")
.with(USER_ID_KEY, "user789");
Callable<String> operation = requestContext.wrap(() -> {
// Both request ID and user ID are available
String requestId = Context.current().get(REQUEST_ID_KEY);
String userId = Context.current().get(USER_ID_KEY);
return processRequest(requestId, userId);
});
// Execute with automatic context
String result = operation.call();
// Or submit to executor
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(operation);Wraps a Function to execute with this context.
<T, U> Function<T, U> wrapFunction(Function<T, U> function);Usage Example:
Context processingContext = Context.current().with(PROCESSOR_ID_KEY, "proc-123");
Function<String, String> processor = processingContext.wrapFunction(input -> {
String processorId = Context.current().get(PROCESSOR_ID_KEY);
return String.format("[%s] %s", processorId, input.toUpperCase());
});
// Use in stream operations
List<String> inputs = Arrays.asList("hello", "world");
List<String> results = inputs.stream()
.map(processor) // Context automatically applied
.collect(Collectors.toList());Wraps a BiFunction to execute with this context.
<T, U, V> BiFunction<T, U, V> wrapFunction(BiFunction<T, U, V> function);Usage Example:
Context calculationContext = Context.current().with(PRECISION_KEY, 2);
BiFunction<Double, Double, String> calculator = calculationContext.wrapFunction((a, b) -> {
Integer precision = Context.current().get(PRECISION_KEY);
double result = a + b;
return String.format("%." + precision + "f", result);
});
String sum = calculator.apply(3.14159, 2.71828); // Context provides precisionWraps a Consumer to execute with this context.
<T> Consumer<T> wrapConsumer(Consumer<T> consumer);Usage Example:
Context loggingContext = Context.current()
.with(LOGGER_NAME_KEY, "UserProcessor")
.with(LOG_LEVEL_KEY, "INFO");
Consumer<String> logger = loggingContext.wrapConsumer(message -> {
String loggerName = Context.current().get(LOGGER_NAME_KEY);
String logLevel = Context.current().get(LOG_LEVEL_KEY);
System.out.printf("[%s] %s: %s%n", logLevel, loggerName, message);
});
// Use with streams
List<String> messages = Arrays.asList("Starting process", "Process complete");
messages.forEach(logger); // Each call has contextWraps a BiConsumer to execute with this context.
<T, U> BiConsumer<T, U> wrapConsumer(BiConsumer<T, U> consumer);Usage Example:
Context auditContext = Context.current().with(AUDIT_USER_KEY, "admin");
BiConsumer<String, String> auditor = auditContext.wrapConsumer((action, resource) -> {
String auditUser = Context.current().get(AUDIT_USER_KEY);
logAuditEvent(auditUser, action, resource);
});
// Use in various contexts
auditor.accept("CREATE", "user-record");
auditor.accept("DELETE", "temp-file");Wraps a Supplier to execute with this context.
<T> Supplier<T> wrapSupplier(Supplier<T> supplier);Usage Example:
Context configContext = Context.current().with(CONFIG_SOURCE_KEY, "database");
Supplier<String> configSupplier = configContext.wrapSupplier(() -> {
String source = Context.current().get(CONFIG_SOURCE_KEY);
return loadConfiguration(source);
});
// Use with Optional or lazy evaluation
Optional<String> config = Optional.of(configSupplier.get());
// Use with CompletableFuture
CompletableFuture<String> futureConfig = CompletableFuture.supplyAsync(configSupplier);public class ContextualStreamProcessor {
private final Context processingContext;
public ContextualStreamProcessor(String processorId) {
this.processingContext = Context.current().with(PROCESSOR_ID_KEY, processorId);
}
public List<String> processItems(List<String> items) {
// All stream operations maintain context
Function<String, String> transformer = processingContext.wrapFunction(this::transform);
Consumer<String> logger = processingContext.wrapConsumer(this::logItem);
return items.stream()
.peek(logger) // Log with context
.map(transformer) // Transform with context
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private String transform(String item) {
String processorId = Context.current().get(PROCESSOR_ID_KEY);
return String.format("[%s] %s", processorId, item.toUpperCase());
}
private void logItem(String item) {
String processorId = Context.current().get(PROCESSOR_ID_KEY);
logger.debug("Processing item {} with processor {}", item, processorId);
}
}public class AsyncOperationManager {
public CompletableFuture<String> processAsync(String input) {
Context operationContext = Context.current()
.with(OPERATION_ID_KEY, UUID.randomUUID().toString())
.with(START_TIME_KEY, System.currentTimeMillis());
// Wrap suppliers for async execution
Supplier<String> processor = operationContext.wrapSupplier(() -> {
String operationId = Context.current().get(OPERATION_ID_KEY);
Long startTime = Context.current().get(START_TIME_KEY);
try {
String result = performComplexOperation(input);
logSuccess(operationId, startTime);
return result;
} catch (Exception e) {
logError(operationId, startTime, e);
throw e;
}
});
return CompletableFuture.supplyAsync(processor);
}
}public class EventProcessor {
private final Map<String, Consumer<Event>> handlers = new HashMap<>();
public void registerHandler(String eventType, Consumer<Event> handler) {
// Capture current context when registering
Context registrationContext = Context.current();
Consumer<Event> contextualHandler = registrationContext.wrapConsumer(handler);
handlers.put(eventType, contextualHandler);
}
public void processEvent(Event event) {
Consumer<Event> handler = handlers.get(event.getType());
if (handler != null) {
// Handler executes with registration context
handler.accept(event);
}
}
}
// Usage
EventProcessor processor = new EventProcessor();
// Register handler with specific context
Context userContext = Context.current().with(USER_ID_KEY, "admin");
try (Scope scope = userContext.makeCurrent()) {
processor.registerHandler("USER_ACTION", event -> {
// This handler always runs with admin context
String userId = Context.current().get(USER_ID_KEY); // "admin"
handleUserAction(event, userId);
});
}
// Later execution maintains original context
processor.processEvent(new Event("USER_ACTION", data));public class ContextualPipeline {
public static <T, U, V> Function<T, V> compose(
Context context,
Function<T, U> first,
Function<U, V> second) {
Function<T, U> contextFirst = context.wrapFunction(first);
Function<U, V> contextSecond = context.wrapFunction(second);
return contextFirst.andThen(contextSecond);
}
// Usage
public String processData(String input) {
Context pipelineContext = Context.current().with(PIPELINE_ID_KEY, "pipe-123");
Function<String, String> pipeline = compose(
pipelineContext,
this::validateInput,
this::transformInput
);
return pipeline.apply(input);
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-opentelemetry--opentelemetry-context