JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.
—
Platform-specific features for JVM including executor integration, CompletableFuture interoperability, thread-local context elements, and Java ecosystem integration.
Converting Java executors to coroutine dispatchers and vice versa.
/** Converts Executor to CoroutineDispatcher */
fun Executor.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/** Converts CoroutineDispatcher to Executor */
fun CoroutineDispatcher.asExecutor(): Executor
/** ExecutorCoroutineDispatcher with close capability */
abstract class ExecutorCoroutineDispatcher : CoroutineDispatcher(), Closeable {
abstract val executor: Executor
abstract override fun close()
}
/** Creates single-threaded dispatcher */
fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
/** Creates fixed thread pool dispatcher */
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcherUsage Examples:
import kotlinx.coroutines.*
import java.util.concurrent.*
fun main() = runBlocking {
// Convert existing Executor to CoroutineDispatcher
val executor = Executors.newFixedThreadPool(4)
val dispatcher = executor.asCoroutineDispatcher()
val job = launch(dispatcher) {
println("Running on executor thread: ${Thread.currentThread().name}")
delay(100)
println("Coroutine completed on: ${Thread.currentThread().name}")
}
job.join()
dispatcher.close() // Important: close custom dispatchers
// Create coroutine dispatchers with specific thread pools
val singleThreadDispatcher = newSingleThreadContext("SingleThread")
val fixedPoolDispatcher = newFixedThreadPoolContext(8, "FixedPool")
launch(singleThreadDispatcher) {
repeat(3) { i ->
println("Single thread task $i: ${Thread.currentThread().name}")
delay(100)
}
}
List(10) { i ->
launch(fixedPoolDispatcher) {
println("Pool task $i: ${Thread.currentThread().name}")
delay(50)
}
}.forEach { it.join() }
// Cleanup
singleThreadDispatcher.close()
fixedPoolDispatcher.close()
}
// Custom executor example
fun customExecutorExample() = runBlocking {
val customExecutor = ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
LinkedBlockingQueue(),
ThreadFactory { r -> Thread(r, "CustomThread-${System.currentTimeMillis()}") }
)
val customDispatcher = customExecutor.asCoroutineDispatcher()
repeat(6) { i ->
launch(customDispatcher) {
println("Custom executor task $i: ${Thread.currentThread().name}")
delay(200)
}
}
delay(1000)
customDispatcher.close()
}Seamless interoperability with Java's CompletableFuture API.
/** Converts CompletableFuture to Deferred */
fun <T> CompletableFuture<T>.asDeferred(): Deferred<T>
/** Suspends until CompletableFuture completes */
suspend fun <T> CompletableFuture<T>.await(): T
/** Converts Deferred to CompletableFuture */
fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T>
/** Converts Job to CompletableFuture<Unit> */
fun Job.asCompletableFuture(): CompletableFuture<Unit>
/** Creates CompletableDeferred that can be converted to CompletableFuture */
fun <T> CompletableDeferred<T>.asCompletableFuture(): CompletableFuture<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.future.*
import java.util.concurrent.CompletableFuture
fun main() = runBlocking {
// CompletableFuture to Coroutines
val future = CompletableFuture.supplyAsync {
Thread.sleep(500)
"Future result"
}
// Convert to Deferred
val deferred = future.asDeferred()
println("Deferred result: ${deferred.await()}")
// Direct await
val anotherFuture = CompletableFuture.supplyAsync {
Thread.sleep(200)
42
}
val result = anotherFuture.await()
println("Awaited result: $result")
// Coroutines to CompletableFuture
val coroutineDeferred = async {
delay(300)
"Coroutine result"
}
val convertedFuture = coroutineDeferred.asCompletableFuture()
println("Converted future result: ${convertedFuture.get()}")
// Job to CompletableFuture
val job = launch {
delay(100)
println("Job completed")
}
val jobFuture = job.asCompletableFuture()
jobFuture.get() // Wait for job completion
}
// Integration with existing Java APIs
class LegacyService {
fun fetchDataAsync(): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
Thread.sleep(1000)
"Legacy data"
}
}
fun processDataAsync(data: String): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
Thread.sleep(500)
"Processed: $data"
}
}
}
suspend fun modernServiceIntegration() {
val legacyService = LegacyService()
// Use legacy services with coroutines
val data = legacyService.fetchDataAsync().await()
val processedData = legacyService.processDataAsync(data).await()
println("Modern integration result: $processedData")
}
// Exposing coroutines to Java clients
class ModernService {
// Expose coroutine as CompletableFuture for Java clients
fun fetchUserDataAsync(userId: String): CompletableFuture<String> {
return GlobalScope.async {
delay(300) // Simulate async operation
"User data for $userId"
}.asCompletableFuture()
}
fun processMultipleAsync(items: List<String>): CompletableFuture<List<String>> {
return GlobalScope.async {
items.map { item ->
async {
delay(100)
"Processed $item"
}
}.awaitAll()
}.asCompletableFuture()
}
}Integration with Java ThreadLocal variables in coroutine contexts.
/** Converts ThreadLocal to CoroutineContext.Element */
fun <T> ThreadLocal<T>.asContextElement(value: T? = null): ThreadContextElement<T>
/** ThreadLocal context element interface */
interface ThreadContextElement<T> : CoroutineContext.Element {
val threadLocal: ThreadLocal<T>
val value: T
}
/** Copies current ThreadLocal values to context */
fun copyableThreadLocalContext(): CoroutineContextUsage Examples:
import kotlinx.coroutines.*
import java.util.concurrent.ThreadLocalRandom
// ThreadLocal variables
val userContext = ThreadLocal<String>()
val requestId = ThreadLocal<String>()
fun main() = runBlocking {
// Set ThreadLocal values
userContext.set("Alice")
requestId.set("REQ-123")
println("Main thread context: ${userContext.get()}, ${requestId.get()}")
// Without context element - ThreadLocal is lost
launch {
println("Child coroutine (no context): ${userContext.get()}, ${requestId.get()}")
}.join()
// With context element - ThreadLocal is preserved
launch(userContext.asContextElement() + requestId.asContextElement()) {
println("Child coroutine (with context): ${userContext.get()}, ${requestId.get()}")
// Modify in child
userContext.set("Bob")
println("Modified in child: ${userContext.get()}")
launch {
println("Grandchild: ${userContext.get()}, ${requestId.get()}")
}.join()
}.join()
// Original thread still has original values
println("Back to main: ${userContext.get()}, ${requestId.get()}")
}
// Web request context example
class RequestContext {
companion object {
private val userId = ThreadLocal<String>()
private val sessionId = ThreadLocal<String>()
fun setUser(id: String) = userId.set(id)
fun getUser(): String? = userId.get()
fun setSession(id: String) = sessionId.set(id)
fun getSession(): String? = sessionId.get()
fun asCoroutineContext(): CoroutineContext {
return userId.asContextElement() + sessionId.asContextElement()
}
}
}
suspend fun handleRequest(userId: String, sessionId: String) {
// Set request context
RequestContext.setUser(userId)
RequestContext.setSession(sessionId)
// Process with context preservation
withContext(RequestContext.asCoroutineContext()) {
processUserData()
auditUserAction()
}
}
suspend fun processUserData() {
println("Processing data for user: ${RequestContext.getUser()}")
delay(100)
// Context is preserved across suspend points
println("Session: ${RequestContext.getSession()}")
}
suspend fun auditUserAction() {
println("Auditing action for user: ${RequestContext.getUser()}, session: ${RequestContext.getSession()}")
}Features for seamless Java-Kotlin coroutine interoperability.
/** Runs blocking coroutine for Java interop */
@JvmName("runBlocking")
fun <T> runBlocking(block: suspend CoroutineScope.() -> T): T
/** Global scope for fire-and-forget coroutines */
@DelicateCoroutinesApi
object GlobalScope : CoroutineScope
/** Annotation for JVM static members */
@JvmStatic
/** Annotation for JVM method name */
@JvmName("methodName")
/** Annotation for JVM overloads */
@JvmOverloadsUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.future.*
import java.util.concurrent.CompletableFuture
// Java-friendly coroutine wrapper
class CoroutineService {
// Expose as CompletableFuture for Java clients
@JvmName("fetchDataAsync")
fun fetchDataForJava(id: String): CompletableFuture<String> {
return GlobalScope.async {
delay(500)
"Data for $id"
}.asCompletableFuture()
}
// Blocking wrapper for Java clients that prefer blocking APIs
@JvmName("fetchDataBlocking")
@JvmOverloads
fun fetchDataBlocking(id: String, timeoutMs: Long = 5000): String = runBlocking {
withTimeout(timeoutMs) {
delay(500)
"Blocking data for $id"
}
}
// Static methods for utility functions
companion object {
@JvmStatic
fun createService(): CoroutineService = CoroutineService()
@JvmStatic
@JvmName("delay")
fun delayBlocking(ms: Long) = runBlocking {
delay(ms)
}
}
}
// Bridge between Java callbacks and coroutines
suspend fun <T> CompletableFuture<T>.awaitWithTimeout(timeoutMs: Long): T {
return withTimeout(timeoutMs) {
this@awaitWithTimeout.await()
}
}
fun <T> suspendFunctionAsCompletableFuture(
block: suspend () -> T
): CompletableFuture<T> {
return GlobalScope.async { block() }.asCompletableFuture()
}
// Example Java integration patterns
fun main() = runBlocking {
val service = CoroutineService.createService()
// Use from Kotlin
val kotlinResult = service.fetchDataForJava("kotlin-client").await()
println("Kotlin result: $kotlinResult")
val blockingResult = service.fetchDataBlocking("blocking-client")
println("Blocking result: $blockingResult")
// Timeout handling
try {
val future = CompletableFuture.supplyAsync {
Thread.sleep(2000)
"Slow result"
}
val result = future.awaitWithTimeout(1000)
println("Fast result: $result")
} catch (e: TimeoutCancellationException) {
println("Operation timed out")
}
}Configuration properties for tuning JVM-specific behavior.
/** IO dispatcher parallelism property */
const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.parallelism"
/** Debug mode property */
const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"
/** Stacktrace recovery property */
const val STACKTRACE_RECOVERY_PROPERTY_NAME = "kotlinx.coroutines.stacktrace.recovery"Configuration Examples:
// JVM startup arguments
// -Dkotlinx.coroutines.io.parallelism=128
// -Dkotlinx.coroutines.debug=on
// -Dkotlinx.coroutines.stacktrace.recovery=on
// Programmatic configuration
fun configureCoroutines() {
// Set IO parallelism
System.setProperty("kotlinx.coroutines.io.parallelism", "64")
// Enable debug mode
System.setProperty("kotlinx.coroutines.debug", "on")
// Enable stacktrace recovery
System.setProperty("kotlinx.coroutines.stacktrace.recovery", "on")
}
// Runtime configuration check
fun checkConfiguration() {
val ioParallelism = System.getProperty("kotlinx.coroutines.io.parallelism")
val debugMode = System.getProperty("kotlinx.coroutines.debug")
println("IO Parallelism: ${ioParallelism ?: "default"}")
println("Debug Mode: ${debugMode ?: "off"}")
}Interfaces specific to JVM platform integration.
/** Closeable coroutine dispatcher */
interface Closeable {
fun close()
}
/** Executor-based coroutine dispatcher */
abstract class ExecutorCoroutineDispatcher : CoroutineDispatcher(), Closeable {
abstract val executor: Executor
}
/** Thread context element for ThreadLocal integration */
interface ThreadContextElement<T> : CoroutineContext.Element {
val threadLocal: ThreadLocal<T>
val value: T
fun updateThreadContext(context: CoroutineContext): T
fun restoreThreadContext(context: CoroutineContext, oldState: T)
}Types for seamless Java interoperability.
/** Java-compatible future conversion */
typealias JavaCompletableFuture<T> = java.util.concurrent.CompletableFuture<T>
/** Java executor types */
typealias JavaExecutor = java.util.concurrent.Executor
typealias JavaExecutorService = java.util.concurrent.ExecutorServiceInstall with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm