Jackson-based JSON and CBOR serialization capabilities for the Akka toolkit, enabling efficient serialization and deserialization of Akka messages and persistent events.
—
The data migration system enables schema evolution by transforming serialized data from older versions to newer formats. This is essential for production systems that need to handle changes to data structures without breaking compatibility with existing persisted data.
The base class for all data migrations provides the framework for version-aware transformations.
/**
* Data migration of old formats to current format.
* Used when deserializing data of older version than the currentVersion.
* Implement transformation of JSON structure in the transform method.
*/
abstract class JacksonMigration {
/**
* Define current version used when serializing new data.
* The first version, when no migration was used, is always 1.
*
* @return current version number
*/
def currentVersion: Int
/**
* Define the supported forward version this migration can read.
* Must be greater or equal than currentVersion.
*
* @return supported forward version number
*/
def supportedForwardVersion: Int = currentVersion
/**
* Override if you have changed the class name. Return current class name.
*
* @param fromVersion the version of the old data
* @param className the old class name
* @return current class name
*/
def transformClassName(fromVersion: Int, className: String): String = className
/**
* Implement the transformation of old JSON structure to new JSON structure.
* The JsonNode is mutable so you can add, remove fields, or change values.
* Cast to specific sub-classes such as ObjectNode and ArrayNode to get access to mutators.
*
* @param fromVersion the version of the old data
* @param json the old JSON data
* @return transformed JSON data
*/
def transform(fromVersion: Int, json: JsonNode): JsonNode
}import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import akka.serialization.jackson.JacksonMigration
class UserMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val obj = json.asInstanceOf[ObjectNode]
fromVersion match {
case 1 =>
// Rename 'name' field to 'fullName' in version 2
if (obj.has("name")) {
val nameValue = obj.remove("name")
obj.set("fullName", nameValue)
}
obj
case _ => obj
}
}
}class OrderMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transformClassName(fromVersion: Int, className: String): String = {
fromVersion match {
case 1 if className == "com.example.PurchaseOrder" => "com.example.Order"
case _ => className
}
}
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val obj = json.asInstanceOf[ObjectNode]
fromVersion match {
case 1 =>
// Convert old 'items' array to new 'orderItems' structure
if (obj.has("items")) {
val items = obj.remove("items")
obj.set("orderItems", items)
}
obj
case _ => obj
}
}
}class EventMigration extends JacksonMigration {
override def currentVersion: Int = 3
override def supportedForwardVersion: Int = 4 // Can read version 4 data
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val obj = json.asInstanceOf[ObjectNode]
fromVersion match {
case 1 =>
// Migrate from version 1 to current format
migrateFromV1(obj)
case 2 =>
// Migrate from version 2 to current format
migrateFromV2(obj)
case 4 =>
// Downcast from future version 4 to current version 3
downcastFromV4(obj)
case _ => obj
}
}
private def migrateFromV1(obj: ObjectNode): ObjectNode = {
// Add required fields that didn't exist in v1
if (!obj.has("timestamp")) {
obj.put("timestamp", System.currentTimeMillis())
}
obj
}
private def migrateFromV2(obj: ObjectNode): ObjectNode = {
// Convert string timestamp to long
if (obj.has("timestamp") && obj.get("timestamp").isTextual) {
val timestampStr = obj.remove("timestamp").asText()
obj.put("timestamp", timestampStr.toLong)
}
obj
}
private def downcastFromV4(obj: ObjectNode): ObjectNode = {
// Remove fields that don't exist in current version
obj.remove("futureFeatureFlag")
obj
}
}import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}
class OrderEventMigration extends JacksonMigration {
override def currentVersion: Int = 3
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val obj = json.asInstanceOf[ObjectNode]
fromVersion match {
case 1 => migrateFromV1(obj)
case 2 => migrateFromV2(obj)
case _ => obj
}
}
private def migrateFromV1(obj: ObjectNode): ObjectNode = {
// Migrate nested address structure
if (obj.has("customerInfo")) {
val customerInfo = obj.get("customerInfo").asInstanceOf[ObjectNode]
// Split 'address' string into structured object
if (customerInfo.has("address") && customerInfo.get("address").isTextual) {
val addressStr = customerInfo.remove("address").asText()
val addressParts = addressStr.split(",")
val addressObj = obj.objectNode()
if (addressParts.length > 0) addressObj.put("street", addressParts(0).trim)
if (addressParts.length > 1) addressObj.put("city", addressParts(1).trim)
if (addressParts.length > 2) addressObj.put("state", addressParts(2).trim)
customerInfo.set("address", addressObj)
}
}
obj
}
private def migrateFromV2(obj: ObjectNode): ObjectNode = {
// Convert items array to map structure
if (obj.has("items") && obj.get("items").isArray) {
val items = obj.remove("items").asInstanceOf[ArrayNode]
val itemsMap = obj.objectNode()
items.forEach { item =>
val itemObj = item.asInstanceOf[ObjectNode]
if (itemObj.has("id") && itemObj.has("quantity")) {
val id = itemObj.get("id").asText()
itemsMap.set(id, itemObj)
}
}
obj.set("itemsById", itemsMap)
}
obj
}
}Configure migrations in your application.conf:
akka.serialization.jackson.migrations {
"com.example.User" = "com.example.UserMigration"
"com.example.Order" = "com.example.OrderMigration"
"com.example.OrderEvent" = "com.example.OrderEventMigration"
}akka.serialization.jackson {
# Global migrations
migrations {
"com.example.User" = "com.example.UserMigration"
}
# Binding-specific migrations
jackson-json {
migrations {
"com.example.JsonOnlyEvent" = "com.example.JsonEventMigration"
}
}
jackson-cbor {
migrations {
"com.example.CborOnlyEvent" = "com.example.CborEventMigration"
}
}
}When serializing new data:
When deserializing data:
// Version 1 data (old format)
{
"name": "John Doe",
"age": 30
}
// After migration to version 2 (current format)
{
"fullName": "John Doe",
"age": 30,
"createdAt": "2023-01-01T00:00:00Z"
}override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
try {
val obj = json.asInstanceOf[ObjectNode]
fromVersion match {
case 1 => migrateFromV1(obj)
case _ => obj
}
} catch {
case e: Exception =>
// Log error but don't fail deserialization
logger.warn(s"Migration failed for version $fromVersion", e)
json // Return original data
}
}import com.fasterxml.jackson.databind.ObjectMapper
import org.scalatest.flatspec.AnyFlatSpec
class UserMigrationTest extends AnyFlatSpec {
val migration = new UserMigration()
val mapper = new ObjectMapper()
"UserMigration" should "rename name field to fullName" in {
val oldJson = """{"name": "John Doe", "age": 30}"""
val jsonNode = mapper.readTree(oldJson)
val migrated = migration.transform(1, jsonNode)
assert(!migrated.has("name"))
assert(migrated.has("fullName"))
assert(migrated.get("fullName").asText() == "John Doe")
}
}// Test end-to-end serialization with migration
val user = User("John Doe", 30) // Old format
val serialized = serialize(user) // With version 1
// Simulate version upgrade
val deserializedUser = deserialize[User](serialized) // Should apply migration
assert(deserializedUser.fullName == "John Doe")Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-serialization-jackson