Composable, asynchronous database actions with effect tracking and transaction support in Slick.
The foundation of Slick's asynchronous database operations with composability and effect tracking.
/**
* Composable database action with result type R, streaming type S, and effect E
* @param R Result type of the action
* @param S Streaming capability (NoStream or Streaming[T])
* @param E Effect type (Read, Write, Transactional, etc.)
*/
sealed trait DBIOAction[+R, +S <: NoStream, -E <: Effect] {
/** Transform the result of this action */
def map[R2](f: R => R2): DBIOAction[R2, S, E]
/** Compose this action with another action (monadic bind) */
def flatMap[R2, S2 <: NoStream, E2 <: Effect](f: R => DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2]
/** Run another action after this one, ignoring this action's result */
def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2]
/** Run this action and another action in parallel, combining results */
def zip[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[(R, R2), S with S2, E with E2]
/** Run a cleanup action after this action, regardless of success or failure */
def andFinally[S2 <: NoStream, E2 <: Effect](a: DBIOAction[_, S2, E2]): DBIOAction[R, S with S2, E with E2]
/** Handle failures in this action */
def asTry: DBIOAction[Try[R], S, E]
/** Recover from failures with an alternative result */
def recover[R2 >: R](pf: PartialFunction[Throwable, R2]): DBIOAction[R2, S, E]
/** Recover from failures with an alternative action */
def recoverWith[R2 >: R, S2 <: NoStream, E2 <: Effect](pf: PartialFunction[Throwable, DBIOAction[R2, S2, E2]]): DBIOAction[R2, S with S2, E with E2]
/** Run this action in a transaction */
def transactionally: DBIOAction[R, S, E with Transactional]
/** Apply a cleanup action when this action fails */
def cleanUp[S2 <: NoStream, E2 <: Effect](f: Option[Throwable] => DBIOAction[_, S2, E2]): DBIOAction[R, S with S2, E with E2]
/** Convert failed action to successful action with exception as result */
def failed: DBIOAction[Throwable, S, E]
/** Filter the result based on a predicate */
def filter(p: R => Boolean)(implicit executor: ExecutionContext): DBIOAction[R, NoStream, E]
/** Transform with partial function, fail if not defined */
def collect[R2](pf: PartialFunction[R, R2])(implicit executor: ExecutionContext): DBIOAction[R2, NoStream, E]
/** Replace result with unit value */
def void: DBIOAction[Unit, NoStream, E]
/** Replace result with given value */
def as[A](a: => A): DBIOAction[A, NoStream, E]
/** Use a pinned database session for this action */
def withPinnedSession: DBIOAction[R, S, E]
/** Add a name for logging purposes */
def named(name: String): DBIOAction[R, S, E]
}
/**
* Type aliases for common action types
*/
type DBIO[+R] = DBIOAction[R, NoStream, Effect.All]
type StreamingDBIO[+R, +T] = DBIOAction[R, Streaming[T], Effect.All]
type ReadAction[+R, +S <: NoStream, -E <: Effect] = DBIOAction[R, S, E with Read]
type WriteAction[+R, +S <: NoStream, -E <: Effect] = DBIOAction[R, S, E with Write]Usage Examples:
// Basic action composition
val action1: DBIO[Int] = coffees.length.result
val action2: DBIO[Seq[Coffee]] = coffees.result
val combined = for {
count <- action1
allCoffees <- action2
} yield (count, allCoffees)
// Parallel execution
val parallelAction = action1.zip(action2)
// Error handling
val safeAction = coffees.result.asTry.map {
case Success(coffees) => s"Found ${coffees.length} coffees"
case Failure(ex) => s"Error: ${ex.getMessage}"
}
// Transaction
val transactionalAction = (for {
_ <- coffees += Coffee("New Coffee", 3.50)
_ <- coffees.filter(_.price > 5.0).delete
} yield ()).transactionallyTrack the types of operations performed by database actions.
/**
* Base trait for database effects
*/
sealed trait Effect
/**
* Standard database effects
*/
object Effect {
/** Read operations that don't modify data */
sealed trait Read extends Effect
/** Write operations that modify data */
sealed trait Write extends Effect
/** Operations that require transaction support */
sealed trait Transactional extends Effect
/** Schema operations (DDL) */
sealed trait Schema extends Effect
/** All possible effects */
type All = Read with Write with Transactional with Schema
}Handle large result sets with streaming capabilities.
/**
* Streaming capability marker
*/
sealed trait StreamingAction[+R, +T] extends DBIOAction[R, Streaming[T], Effect.All]
/**
* No streaming capability
*/
sealed trait NoStream
/**
* Streaming capability with element type T
*/
final class Streaming[+T] private[slick] ()Usage Examples:
// Stream large result sets
val largeResultStream: StreamingDBIO[_, Coffee] = coffees.result
// Process streaming results with Akka Streams (example integration)
import akka.stream.scaladsl.Source
import akka.NotUsed
val source: Source[Coffee, NotUsed] =
Source.fromPublisher(db.stream(coffees.result))
// Streaming with processing
val processedStream = db.stream(coffees.result)
.map(coffee => coffee.copy(price = coffee.price * 1.1))
.take(1000)Combine and sequence database actions in various ways.
object DBIO {
/** Create an action that returns the given value */
def successful[R](v: R): DBIO[R]
/** Create a failed action with the given exception */
def failed[R](t: Throwable): DBIO[R]
/** Run a sequence of actions sequentially, returning all results */
def sequence[R](actions: Seq[DBIO[R]]): DBIO[Seq[R]]
/** Run a sequence of actions sequentially, ignoring results */
def seq(actions: DBIO[_]*): DBIO[Unit]
/** Convert a Scala Future to a DBIO action */
def from[R](f: Future[R]): DBIO[R]
/** Create an action from a side-effecting function */
def fold[T, R](values: Seq[T], zero: R)(f: (R, T) => DBIO[R]): DBIO[R]
/** Traverse a sequence, applying an action to each element */
def traverse[T, R](values: Seq[T])(f: T => DBIO[R]): DBIO[Seq[R]]
}Usage Examples:
// Sequential actions
val setupActions = DBIO.seq(
coffees.schema.create,
coffees += Coffee("Americano", 2.50),
coffees += Coffee("Latte", 3.00),
coffees += Coffee("Espresso", 2.00)
)
// Batch operations
val insertActions = Seq(
Coffee("Mocha", 3.50),
Coffee("Cappuccino", 3.00),
Coffee("Macchiato", 3.25)
).map(coffee => coffees += coffee)
val batchInsert = DBIO.sequence(insertActions)
// Conditional actions
def getCoffeeOrCreate(name: String): DBIO[Coffee] = {
coffees.filter(_.name === name).result.headOption.flatMap {
case Some(coffee) => DBIO.successful(coffee)
case None =>
val newCoffee = Coffee(name, 2.50)
(coffees += newCoffee).map(_ => newCoffee)
}
}
// Traverse pattern
val coffeeNames = Seq("Americano", "Latte", "Cappuccino")
val findOrCreateAll = DBIO.traverse(coffeeNames)(getCoffeeOrCreate)Execute queries and return results in various forms.
/**
* Query result execution methods
*/
trait QueryExecutionMethods[E, U, C[_]] {
/** Execute query and return all results */
def result: StreamingDBIO[C[U], U]
/** Execute query and return first result */
def head: DBIO[U]
/** Execute query and return optional first result */
def headOption: DBIO[Option[U]]
/** Execute query and return streaming results */
def stream: StreamingDBIO[C[U], U]
}
/**
* Update/Insert/Delete execution methods
*/
trait ModifyingExecutionMethods[E] {
/** Insert a single row */
def += (value: E): DBIO[Int]
/** Insert multiple rows */
def ++= (values: Iterable[E]): DBIO[Int]
/** Insert or update a row (upsert) */
def insertOrUpdate(value: E): DBIO[Int]
/** Update matching rows */
def update: DBIO[Int]
/** Delete matching rows */
def delete: DBIO[Int]
}Usage Examples:
// Query execution
val allCoffees: DBIO[Seq[Coffee]] = coffees.result
val firstCoffee: DBIO[Coffee] = coffees.head
val maybeCoffee: DBIO[Option[Coffee]] = coffees.headOption
// Modifications
val insertAction: DBIO[Int] = coffees += Coffee("New Coffee", 3.00)
val batchInsertAction: DBIO[Int] = coffees ++= Seq(
Coffee("Coffee 1", 2.50),
Coffee("Coffee 2", 2.75)
)
val updateAction: DBIO[Int] = coffees
.filter(_.name === "Old Coffee")
.map(_.name)
.update("Updated Coffee")
val deleteAction: DBIO[Int] = coffees
.filter(_.price > 5.0)
.delete
// Upsert
val upsertAction: DBIO[Int] = coffees.insertOrUpdate(Coffee("Specialty", 4.50))Manage database transactions for consistency and atomicity.
/**
* Transaction isolation levels
*/
object TransactionIsolation {
val ReadUncommitted: Int
val ReadCommitted: Int
val RepeatableRead: Int
val Serializable: Int
}
/**
* Transaction methods
*/
trait DatabaseTransaction {
/** Run action in a transaction */
def transactionally: DBIOAction[R, S, E with Transactional]
/** Run action in a transaction with specific isolation level */
def withTransactionIsolation(level: Int): DBIOAction[R, S, E with Transactional]
}Usage Examples:
// Simple transaction
val transferMoney = (for {
_ <- accounts.filter(_.id === fromAccountId).map(_.balance).update(fromBalance - amount)
_ <- accounts.filter(_.id === toAccountId).map(_.balance).update(toBalance + amount)
_ <- transactions += Transaction(fromAccountId, toAccountId, amount)
} yield ()).transactionally
// Transaction with error handling
val safeTransfer = transferMoney.asTry.map {
case Success(_) => "Transfer completed successfully"
case Failure(ex) => s"Transfer failed: ${ex.getMessage}"
}
// Nested transactions (savepoints)
val complexTransaction = (for {
user <- users += User("Alice")
profile <- profiles += Profile(user.id, "Alice's Profile")
// Inner transaction that might fail
_ <- (orders += Order(user.id, "Failed Item")).transactionally.asTry
_ <- orders += Order(user.id, "Success Item")
} yield user).transactionally
// Custom isolation level
val highConsistencyAction = coffees.result
.withTransactionIsolation(TransactionIsolation.Serializable)Handle database errors and exceptions in actions.
/**
* Error handling methods
*/
trait ErrorHandling[R, S <: NoStream, E <: Effect] {
/** Convert to Try[R] to handle exceptions */
def asTry: DBIOAction[Try[R], S, E]
/** Recover from specific exceptions */
def recover[R2 >: R](pf: PartialFunction[Throwable, R2]): DBIOAction[R2, S, E]
/** Recover with alternative action */
def recoverWith[R2 >: R, S2 <: NoStream, E2 <: Effect](
pf: PartialFunction[Throwable, DBIOAction[R2, S2, E2]]
): DBIOAction[R2, S with S2, E with E2]
/** Handle both success and failure cases */
def andThen[U](pf: PartialFunction[Try[R], U]): DBIOAction[R, S, E]
}Usage Examples:
// Try-based error handling
val safeQuery = coffees.result.asTry.map {
case Success(coffees) => Right(coffees)
case Failure(ex) => Left(s"Database error: ${ex.getMessage}")
}
// Recover from specific errors
val queryWithFallback = coffees.filter(_.name === "Nonexistent").result.recover {
case _: NoSuchElementException => Seq.empty[Coffee]
case ex: SQLException => throw new RuntimeException(s"Database error: ${ex.getMessage}")
}
// Recover with alternative action
val queryWithAlternative = coffees.filter(_.category === "special").result.recoverWith {
case _: SQLException => coffees.take(10).result
}
// Cleanup on error
val actionWithCleanup = (for {
tempId <- tempTable += TempData("processing")
result <- complexProcessing(tempId)
_ <- tempTable.filter(_.id === tempId).delete
} yield result).cleanUp { errorOpt =>
errorOpt.fold(DBIO.successful(()))(ex =>
DBIO.seq(
tempTable.filter(_.status === "processing").delete,
logError(ex.getMessage)
)
)
}sealed trait DBIOAction[+R, +S <: NoStream, -E <: Effect]
type DBIO[+R] = DBIOAction[R, NoStream, Effect.All]
type StreamingDBIO[+R, +T] = DBIOAction[R, Streaming[T], Effect.All]
sealed trait Effect
object Effect {
trait Read extends Effect
trait Write extends Effect
trait Transactional extends Effect
trait Schema extends Effect
type All = Read with Write with Transactional with Schema
}
sealed trait NoStream
final class Streaming[+T] private[slick] ()
// Execution result types
type DatabasePublisher[T] // Reactive Streams Publisher
type StreamingInvoker[R, T] // For streaming execution