Skip to content

Commit 8e42c93

Browse files
committed
docs and api
1 parent f4aca0f commit 8e42c93

File tree

10 files changed

+116
-17
lines changed

10 files changed

+116
-17
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.javasdk.annotations;
6+
7+
import java.lang.annotation.Documented;
8+
import java.lang.annotation.ElementType;
9+
import java.lang.annotation.Retention;
10+
import java.lang.annotation.RetentionPolicy;
11+
import java.lang.annotation.Target;
12+
13+
@Target(ElementType.TYPE)
14+
@Retention(RetentionPolicy.RUNTIME)
15+
@Documented
16+
public @interface EnableReplicationFilter {
17+
}

akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ interface Builder<S, E> {
273273
/**
274274
* Change the replication filter without persisting any events.
275275
*/
276-
OnSuccessBuilder<S> replicationFilter(ReplicationFilter filter);
276+
OnSuccessBuilder<S> updateReplicationFilter(ReplicationFilter filter);
277277

278278
/**
279279
* Create a message reply.
@@ -336,7 +336,7 @@ interface OnSuccessBuilder<S> {
336336
/**
337337
* Change the replication filter combined with for example {@code persist} event.
338338
*/
339-
OnSuccessBuilder<S> replicationFilter(ReplicationFilter filter);
339+
OnSuccessBuilder<S> updateReplicationFilter(ReplicationFilter filter);
340340

341341
}
342342

akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/ReplicationFilter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import akka.javasdk.impl.effect.ReplicationFilterImpl;
88

9+
import java.util.Set;
10+
911
public interface ReplicationFilter {
1012
static ReplicationFilter empty() {
1113
return ReplicationFilterImpl.empty();
@@ -15,12 +17,24 @@ static ReplicationFilter includeRegion(String region) {
1517
return ReplicationFilterImpl.empty().addRegion(region);
1618
}
1719

20+
static ReplicationFilter includeRegions(Set<String> regions) {
21+
return ReplicationFilterImpl.empty().addRegions(regions);
22+
}
23+
1824
static ReplicationFilter excludeRegion(String region) {
1925
return ReplicationFilterImpl.empty().removeRegion(region);
2026
}
2127

28+
static ReplicationFilter excludeRegions(Set<String> regions) {
29+
return ReplicationFilterImpl.empty().removeRegions(regions);
30+
}
31+
2232
ReplicationFilter addRegion(String region);
2333

34+
ReplicationFilter addRegions(Set<String> regions);
35+
2436
ReplicationFilter removeRegion(String region);
2537

38+
ReplicationFilter removeRegions(Set<String> regions);
39+
2640
}

akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import java.lang.reflect.Method
1010
import java.util
1111
import java.util.Optional
1212
import java.util.concurrent.CompletionStage
13+
1314
import scala.concurrent.ExecutionContext
1415
import scala.concurrent.Future
1516
import scala.concurrent.Promise
@@ -34,6 +35,7 @@ import akka.javasdk.Principals
3435
import akka.javasdk.ServiceSetup
3536
import akka.javasdk.Tracing
3637
import akka.javasdk.annotations.ComponentId
38+
import akka.javasdk.annotations.EnableReplicationFilter
3739
import akka.javasdk.annotations.GrpcEndpoint
3840
import akka.javasdk.annotations.Setup
3941
import akka.javasdk.annotations.http.HttpEndpoint
@@ -504,8 +506,7 @@ private final class Sdk(
504506
readOnlyCommandNames,
505507
instanceFactory,
506508
keyValue = false,
507-
replicationFilterEnabled = true
508-
) // FIXME how shall we enable it? Can we solve it without enable flag?
509+
replicationFilterEnabled = clz.hasAnnotation[EnableReplicationFilter])
509510

510511
case clz if classOf[KeyValueEntity[_]].isAssignableFrom(clz) =>
511512
val componentId = clz.getAnnotation(classOf[ComponentId]).value
@@ -543,8 +544,7 @@ private final class Sdk(
543544
readOnlyCommandNames,
544545
instanceFactory,
545546
keyValue = true,
546-
replicationFilterEnabled = false
547-
) // FIXME KVE replication filter
547+
replicationFilterEnabled = clz.hasAnnotation[EnableReplicationFilter])
548548

549549
case clz if Reflect.isWorkflow(clz) =>
550550
val componentId = clz.getAnnotation(classOf[ComponentId]).value

akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityEffectImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private[javasdk] class EventSourcedEntityEffectImpl[S, E]
110110
this.asInstanceOf[EventSourcedEntityEffectImpl[T, E]]
111111
}
112112

113-
override def replicationFilter(filter: ReplicationFilter): OnSuccessBuilder[S] = {
113+
override def updateReplicationFilter(filter: ReplicationFilter): OnSuccessBuilder[S] = {
114114
_replicationFilter = filter.asInstanceOf[ReplicationFilterImpl]
115115
this
116116
}

akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import scala.util.control.NonFatal
1212
import akka.annotation.InternalApi
1313
import akka.javasdk.Metadata
1414
import akka.javasdk.Tracing
15+
import akka.javasdk.annotations.EnableReplicationFilter
1516
import akka.javasdk.eventsourcedentity.CommandContext
1617
import akka.javasdk.eventsourcedentity.EventContext
1718
import akka.javasdk.eventsourcedentity.EventSourcedEntity
@@ -26,6 +27,7 @@ import akka.javasdk.impl.MetadataImpl
2627
import akka.javasdk.impl.effect.ErrorReplyImpl
2728
import akka.javasdk.impl.effect.MessageReplyImpl
2829
import akka.javasdk.impl.effect.NoSecondaryEffectImpl
30+
import akka.javasdk.impl.effect.ReplicationFilterImpl
2931
import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.EmitEvents
3032
import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.NoPrimaryEffect
3133
import akka.javasdk.impl.serialization.JsonSerializer
@@ -150,6 +152,11 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
150152
}
151153
}
152154

155+
if ((commandEffect.replFilter ne ReplicationFilterImpl.empty) && !isReplicationFilterEnabled) {
156+
throw new IllegalStateException(
157+
"To use replication filters the EventSourcedEntity class must be annotated with @EnableReplicationFilter.")
158+
}
159+
153160
var currentSequence = command.sequenceNumber
154161
commandEffect.primaryEffect match {
155162
case EmitEvents(events, deleteEntity) =>
@@ -240,4 +247,9 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
240247

241248
override def stateFromBytes(pb: BytesPayload): SpiEventSourcedEntity.State =
242249
serializer.fromBytes(entityStateType, pb).asInstanceOf[SpiEventSourcedEntity.State]
250+
251+
private def isReplicationFilterEnabled: Boolean = {
252+
import akka.javasdk.impl.reflection.Reflect.Syntax.AnnotatedElementOps
253+
entity.getClass.hasAnnotation[EnableReplicationFilter]
254+
}
243255
}

akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReplicationFilterImpl.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
33
*/
44

5-
package akka.javasdk.impl.effect
5+
package akka.javasdk.impl.eventsourcedentity
6+
7+
import java.util
8+
9+
import scala.jdk.CollectionConverters._
610

711
import akka.annotation.InternalApi
812
import akka.javasdk.eventsourcedentity.ReplicationFilter
@@ -25,9 +29,15 @@ private[javasdk] final case class ReplicationFilterImpl(addRegions: Set[String],
2529
override def addRegion(region: String): ReplicationFilterImpl =
2630
copy(addRegions = addRegions + region)
2731

32+
override def addRegions(regions: util.Set[String]): ReplicationFilter =
33+
copy(addRegions = addRegions.union(regions.asScala))
34+
2835
override def removeRegion(region: String): ReplicationFilterImpl =
2936
copy(removeRegions = removeRegions + region)
3037

38+
override def removeRegions(regions: util.Set[String]): ReplicationFilter =
39+
copy(removeRegions = removeRegions.union(regions.asScala))
40+
3141
def toSpi: SpiEventSourcedEntity.ChangeReplicationFilter =
3242
new SpiEventSourcedEntity.ChangeReplicationFilter(addRegions, removeRegions)
3343

docs/src/modules/java/pages/event-sourced-entities.adoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,20 @@ When the Event Sourced Entity is loaded again, the snapshot will be loaded befor
179179
[#_replication]
180180
include::partial$mutli-region-replication.adoc[]
181181

182+
=== Replication filters
183+
184+
Events are by default replicated to all regions that have been enabled for the service. For regulatory reasons or as cost optimization it is possible to filter which regions that participate in the replication for a specific entity. This can be changed at runtime by the entity itself.
185+
186+
[source,java]
187+
.{sample-base-url}/doc-snippets/src/main/java/com/example/replicationfilter/ShoppingCartEntity.java[ShoppingCartEntity.java]
188+
----
189+
include::example$doc-snippets/src/main/java/com/example/replicationfilter/ShoppingCartEntity.java[tag=replication-filter]
190+
----
191+
<1> Enable the replication filter feature by adding the `@EnableReplicationFilter annotation.
192+
<2> Define the replication filter with the `updateReplicationFilter` effect.
193+
194+
After enabling replication filter the entity is still replicated to all regions until the regions are defined with the `updateReplicationFilter` effect. This effect can be combined with persisting events or used without additional events. The filter can only be updated from the primary region of the entity, or it will become the primary if using `request-region` primary selection strategy. The filter is durable for the specific entity instance and can be changed without deploying a new version.
195+
182196
== Side Effects
183197

184198
An entity doesn't perform any external side effects aside from persisting events and replying to the request. Side effects can be handled from the Workflow, Consumer, or Endpoint components that are calling the entity.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
package com.example.replicationfilter;
5+
6+
// tag::replication-filter[]
7+
import akka.Done;
8+
import akka.javasdk.annotations.ComponentId;
9+
import akka.javasdk.annotations.EnableReplicationFilter;
10+
import akka.javasdk.eventsourcedentity.EventSourcedEntity;
11+
import akka.javasdk.eventsourcedentity.ReplicationFilter;
12+
13+
@ComponentId("shopping-cart")
14+
@EnableReplicationFilter // <1>
15+
public class ShoppingCartEntity extends EventSourcedEntity<ShoppingCart, ShoppingCartEvent> {
16+
17+
public Effect<Done> replicateTo(String region) {
18+
return effects()
19+
.updateReplicationFilter(ReplicationFilter.includeRegion(region)) // <2>
20+
.thenReply(__ -> Done.getInstance());
21+
}
22+
23+
// end::replication-filter[]
24+
@Override
25+
public ShoppingCart applyEvent(ShoppingCartEvent event) {
26+
return currentState();
27+
}
28+
// tag::replication-filter[]
29+
}
30+
// end::replication-filter[]
31+
32+
record ShoppingCart(String id) {}
33+
34+
interface ShoppingCartEvent {}

samples/shopping-cart-quickstart/src/main/java/shoppingcart/application/ShoppingCartEntity.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import akka.Done;
55
import akka.javasdk.annotations.ComponentId;
6+
import akka.javasdk.annotations.EnableReplicationFilter;
67
import akka.javasdk.eventsourcedentity.EventSourcedEntity;
78
import akka.javasdk.eventsourcedentity.EventSourcedEntityContext;
89
import akka.javasdk.eventsourcedentity.ReplicationFilter;
@@ -19,6 +20,7 @@
1920
// tag::all[]
2021
// tag::class[]
2122
@ComponentId("shopping-cart") // <2>
23+
@EnableReplicationFilter
2224
public class ShoppingCartEntity extends EventSourcedEntity<ShoppingCart, ShoppingCartEvent> { // <1>
2325
// end::class[]
2426

@@ -58,22 +60,18 @@ public Effect<Done> addItem(LineItem item) {
5860
.thenReply(newState -> Done.getInstance()); // <4>
5961
} else {
6062
logger.info("Update replication filter {}", item);
61-
var filter = ReplicationFilter.empty();
62-
for (String region : item.addRegions()) {
63-
filter = filter.addRegion(region);
64-
}
65-
for (String region : item.removeRegions()) {
66-
filter = filter.removeRegion(region);
67-
}
63+
var filter = ReplicationFilter
64+
.includeRegions(item.addRegions())
65+
.removeRegions(item.removeRegions());
6866

6967
if (item.productId().isEmpty()) {
7068
return effects()
71-
.replicationFilter(filter)
69+
.updateReplicationFilter(filter)
7270
.thenReply(newState -> Done.getInstance());
7371
} else {
7472
return effects()
7573
.persist(event)
76-
.replicationFilter(filter)
74+
.updateReplicationFilter(filter)
7775
.thenReply(newState -> Done.getInstance());
7876

7977
}

0 commit comments

Comments
 (0)