Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2024. Axon Framework
* Copyright (c) 2010-2025. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,8 +49,34 @@ import org.axonframework.messaging.responsetypes.OptionalResponseType
import org.axonframework.messaging.responsetypes.ResponseType
import kotlin.reflect.KClass

private val trackingTokenSerializer = PolymorphicSerializer(TrackingToken::class).nullable
/**
* Serializer for Axon's [TrackingToken] class.
* Provides serialization and deserialization support for nullable instances of TrackingToken.
* This serializer uses [replyTokenContextSerializer] to serialize the context field and now only [String] type or null value is supported!
*
* @see TrackingToken
*/
val trackingTokenSerializer = PolymorphicSerializer(TrackingToken::class).nullable

/**
* Serializer for the [ReplayToken.context], represented as a nullable String.
* This context is typically used to provide additional information during token replay operations.
*
* This serializer is used by [trackingTokenSerializer] to serialize the context field and now only [String] type or null value is supported!
* Sadly enough, there's no straightforward solution to support [Any]; not without adjusting the context field of the ReplayToken in Axon Framework itself.
* That is, however, a breaking change, and as such, cannot be done till version 5.0.0 of the Axon Framework.
* This also allow more complex objects as the context, although it requires the user to do the de-/serialization to/from String, instead of the Axon Framework itself.
* Look at AxonSerializersTest, case `replay token with complex object as String context` for an example how to handle that using Kotlin Serialization.
*
* @see ReplayToken.context
*/
val replyTokenContextSerializer = String.serializer().nullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a way in kotlin serialization, as we have full control over (de)serialization, to encode the type of the payload as the first field, and then serialize the object properties next?

Something like outlined here https://stackoverflow.com/questions/66148137/how-to-serialize-any-type-in-kotlinx-serialization

Giving up the flexibility of the type in here feels wrong. But if there's no other way, so be it.

Copy link
Contributor Author

@MateuszNaKodach MateuszNaKodach Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you read the explanation from Steven?
#368

I'm wondering why he assumed that we need to introduce breaking change if we'd like to support.

Sadly enough, there's no straightforward solution to this; not without adjusting the context field of the ReplayToken in Axon Framework itself.
That is, however, a breaking change, and as such, cannot be done.

With your proposition, it's just an additional field. I may give it a try... whereas as far as I remember some deserializes breaks if you have additional field (for example: you read the context serialized in Kotlin, in the Java app) - so it might be the case - do not add more fields. The solution with first field, it's also not simple - there is no guarantee that the type field will be first and you still (look at StackOverflow) need to define all possibilites upfront:

    @Suppress("UNCHECKED_CAST")
    private val dataTypeSerializers: Map<String, KSerializer<Any>> =
        mapOf(
            "String" to serializer<String>(),
            "Int" to serializer<Int>(),
            //list them all
        ).mapValues { (_, v) -> v as KSerializer<Any> }

I just realized it may work in different way (what do you think about that):
I'm not sure if it will work. We may support String by default, but... configure the replayTokenContextSerializer as polymorphic and let user register all possible types of the context while creating KotlinSerializer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my conversations with @lion7 it seemed impractical to do with the current set up.

We discussed that it would've been best if the ReplayToken didn't contain a serializable Object at all.
But for example instead the SerializedObject from AF.

Adjusting the type is the breaking change I was referring too. We can't do that right now at all.


/**
* Module defining serializers for Axon Framework's core event handling and messaging components.
* This module includes serializers for TrackingTokens, ScheduleTokens, and ResponseTypes, enabling
* seamless integration with Axon-based applications.
*/
val AxonSerializersModule = SerializersModule {
contextual(ConfigToken::class) { ConfigTokenSerializer }
contextual(GapAwareTrackingToken::class) { GapAwareTrackingTokenSerializer }
Expand Down Expand Up @@ -86,6 +112,11 @@ val AxonSerializersModule = SerializersModule {
}
}

/**
* Serializer for [ConfigToken].
*
* @see ConfigToken
*/
object ConfigTokenSerializer : KSerializer<ConfigToken> {

private val mapSerializer = MapSerializer(String.serializer(), String.serializer())
Expand All @@ -112,6 +143,11 @@ object ConfigTokenSerializer : KSerializer<ConfigToken> {
}
}

/**
* Serializer for [GapAwareTrackingToken].
*
* @see GapAwareTrackingToken
*/
object GapAwareTrackingTokenSerializer : KSerializer<GapAwareTrackingToken> {

private val setSerializer = SetSerializer(Long.serializer())
Expand Down Expand Up @@ -143,6 +179,11 @@ object GapAwareTrackingTokenSerializer : KSerializer<GapAwareTrackingToken> {
}
}

/**
* Serializer for [MultiSourceTrackingToken].
*
* @see MultiSourceTrackingToken
*/
object MultiSourceTrackingTokenSerializer : KSerializer<MultiSourceTrackingToken> {

private val mapSerializer = MapSerializer(String.serializer(), trackingTokenSerializer)
Expand All @@ -169,6 +210,11 @@ object MultiSourceTrackingTokenSerializer : KSerializer<MultiSourceTrackingToken
}
}

/**
* Serializer for [MergedTrackingToken].
*
* @see MergedTrackingToken
*/
object MergedTrackingTokenSerializer : KSerializer<MergedTrackingToken> {

override val descriptor = buildClassSerialDescriptor(MergedTrackingToken::class.java.name) {
Expand Down Expand Up @@ -199,36 +245,61 @@ object MergedTrackingTokenSerializer : KSerializer<MergedTrackingToken> {
}
}

/**
* Serializer for [ReplayToken].
* The [ReplayToken.context] value can be only a String or null.
* See [replyTokenContextSerializer] for more information how to handle the context field.
*
* @see ReplayToken
*/
object ReplayTokenSerializer : KSerializer<ReplayToken> {

override val descriptor = buildClassSerialDescriptor(ReplayToken::class.java.name) {
element<TrackingToken>("tokenAtReset")
element<TrackingToken>("currentToken")
element<String>("context")
}

override fun deserialize(decoder: Decoder) = decoder.decodeStructure(descriptor) {
var tokenAtReset: TrackingToken? = null
var currentToken: TrackingToken? = null
var context: String? = null
while (true) {
val index = decodeElementIndex(descriptor)
if (index == CompositeDecoder.DECODE_DONE) break
when (index) {
0 -> tokenAtReset = decodeSerializableElement(descriptor, index, trackingTokenSerializer)
1 -> currentToken = decodeSerializableElement(descriptor, index, trackingTokenSerializer)
2 -> context = decodeSerializableElement(descriptor, index, replyTokenContextSerializer)
}
}
ReplayToken(
ReplayToken.createReplayToken(
tokenAtReset ?: throw SerializationException("Element 'tokenAtReset' is missing"),
currentToken,
)
context
) as ReplayToken
}

override fun serialize(encoder: Encoder, value: ReplayToken) = encoder.encodeStructure(descriptor) {
encodeSerializableElement(descriptor, 0, trackingTokenSerializer, value.tokenAtReset)
encodeSerializableElement(descriptor, 1, trackingTokenSerializer, value.currentToken)
encodeSerializableElement(
descriptor,
2,
replyTokenContextSerializer,
stringOrNullFrom(value.context())
)
}

private fun stringOrNullFrom(obj: Any?): String? =
obj?.takeIf { it is String }?.let { it as String }
}

/**
* Serializer for [GlobalSequenceTrackingToken].
*
* @see GlobalSequenceTrackingToken
*/
object GlobalSequenceTrackingTokenSerializer : KSerializer<GlobalSequenceTrackingToken> {

override val descriptor = buildClassSerialDescriptor(GlobalSequenceTrackingToken::class.java.name) {
Expand All @@ -254,6 +325,11 @@ object GlobalSequenceTrackingTokenSerializer : KSerializer<GlobalSequenceTrackin
}
}

/**
* Serializer for [SimpleScheduleToken].
*
* @see SimpleScheduleToken
*/
object SimpleScheduleTokenSerializer : KSerializer<SimpleScheduleToken> {

override val descriptor = buildClassSerialDescriptor(SimpleScheduleToken::class.java.name) {
Expand All @@ -279,6 +355,11 @@ object SimpleScheduleTokenSerializer : KSerializer<SimpleScheduleToken> {
}
}

/**
* Serializer for [QuartzScheduleToken].
*
* @see QuartzScheduleToken
*/
object QuartzScheduleTokenSerializer : KSerializer<QuartzScheduleToken> {

override val descriptor = buildClassSerialDescriptor(QuartzScheduleToken::class.java.name) {
Expand Down Expand Up @@ -334,14 +415,34 @@ abstract class ResponseTypeSerializer<R : ResponseType<*>>(kClass: KClass<R>, pr
}
}

/**
* Serializer for [InstanceResponseType].
*
* @see InstanceResponseType
*/
object InstanceResponseTypeSerializer : KSerializer<InstanceResponseType<*>>,
ResponseTypeSerializer<InstanceResponseType<*>>(InstanceResponseType::class, { InstanceResponseType(it) })

/**
* Serializer for [OptionalResponseType].
*
* @see OptionalResponseType
*/
object OptionalResponseTypeSerializer : KSerializer<OptionalResponseType<*>>,
ResponseTypeSerializer<OptionalResponseType<*>>(OptionalResponseType::class, { OptionalResponseType(it) })

/**
* Serializer for [MultipleInstancesResponseType].
*
* @see MultipleInstancesResponseType
*/
object MultipleInstancesResponseTypeSerializer : KSerializer<MultipleInstancesResponseType<*>>,
ResponseTypeSerializer<MultipleInstancesResponseType<*>>(MultipleInstancesResponseType::class, { MultipleInstancesResponseType(it) })

/**
* Serializer for [ArrayResponseType].
*
* @see ArrayResponseType
*/
object ArrayResponseTypeSerializer : KSerializer<ArrayResponseType<*>>,
ResponseTypeSerializer<ArrayResponseType<*>>(ArrayResponseType::class, { ArrayResponseType(it) })
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package org.axonframework.extensions.kotlin.serializer

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.axonframework.eventhandling.GapAwareTrackingToken
import org.axonframework.eventhandling.GlobalSequenceTrackingToken
Expand All @@ -36,7 +41,9 @@ import org.axonframework.messaging.responsetypes.ResponseType
import org.axonframework.serialization.Serializer
import org.axonframework.serialization.SimpleSerializedObject
import org.axonframework.serialization.SimpleSerializedType
import org.axonframework.serialization.json.JacksonSerializer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertInstanceOf
import org.junit.jupiter.api.Test

internal class AxonSerializersTest {
Expand Down Expand Up @@ -76,21 +83,49 @@ internal class AxonSerializersTest {
}

@Test
fun replayToken() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(15), GlobalSequenceTrackingToken(10))
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":15},"currentToken":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":10}}"""
fun `replay token with String context`() {
val token = ReplayToken.createReplayToken(
GlobalSequenceTrackingToken(15), GlobalSequenceTrackingToken(10), "someContext"
)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":15},"currentToken":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":10},"context":"someContext"}""".trimIndent()
assertEquals(json, serializer.serialize(token, String::class.java).data)
assertEquals(token, serializer.deserializeTrackingToken(token.javaClass.name, json))
}

@Test
fun `replay token with currentToken with null value`() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(5), null)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":5},"currentToken":null}"""
fun `replay token with currentToken with null value and null context`() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(5), null, null)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":5},"currentToken":null,"context":null}"""
assertEquals(json, serializer.serialize(token, String::class.java).data)
assertEquals(token, serializer.deserializeTrackingToken(token.javaClass.name, json))
}

@Test
fun `replay token deserialize without context field`() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(5), null, null)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":5},"currentToken":null}"""
assertEquals(token, serializer.deserializeTrackingToken(token.javaClass.name, json))
}

@Test
fun `replay token with complex object as String context`() {
@Serializable
data class ComplexContext(val value1: String, val value2: Int, val value3: Boolean)
val complexContext = ComplexContext("value1", 2, false)

val token = ReplayToken.createReplayToken(
GlobalSequenceTrackingToken(15),
GlobalSequenceTrackingToken(10),
Json.encodeToString(complexContext)
)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":15},"currentToken":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":10},"context":"{\"value1\":\"value1\",\"value2\":2,\"value3\":false}"}""".trimIndent()
assertEquals(json, serializer.serialize(token, String::class.java).data)
val deserializedToken = serializer.deserializeTrackingToken(token.javaClass.name, json) as ReplayToken
assertEquals(token, deserializedToken)
assertInstanceOf(String::class.java, deserializedToken.context())
assertEquals(complexContext, Json.decodeFromString<ComplexContext>(deserializedToken.context() as String))
}

@Test
fun globalSequenceTrackingToken() {
val token = GlobalSequenceTrackingToken(5)
Expand Down
Loading