Skip to content

Commit f65e27d

Browse files
authored
Merge pull request #2390 from beyonnex-io/bugfix/fix-unknown-message-warnings-in-connection-persistence-actor
Fix spurious "Unknown message" warnings in ConnectionPersistenceActor
2 parents e830145 + 7031c04 commit f65e27d

3 files changed

Lines changed: 20 additions & 0 deletions

File tree

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/enforcement/NoOpEnforcerActorPropsFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import com.typesafe.config.Config;
1818

19+
import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor;
20+
1921
import org.apache.pekko.actor.AbstractActor;
2022
import org.apache.pekko.actor.ActorRef;
2123
import org.apache.pekko.actor.ActorSystem;
@@ -42,6 +44,9 @@ private static Props props() {
4244
@Override
4345
public Receive createReceive() {
4446
return receiveBuilder()
47+
.matchEquals(AbstractEnforcerActor.Control.PA_RECOVERED, msg -> {
48+
// PA_RECOVERED is a control signal from the supervisor — consume it, don't echo back.
49+
})
4550
.matchAny(any -> sender().tell(any, ActorRef.noSender()))
4651
.build();
4752
}

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.pekko.actor.ActorRef;
4141
import org.apache.pekko.actor.ActorSystem;
4242
import org.apache.pekko.actor.Props;
43+
import org.apache.pekko.actor.ReceiveTimeout;
4344
import org.apache.pekko.actor.Status;
4445
import org.apache.pekko.actor.SupervisorStrategy;
4546
import org.apache.pekko.cluster.Cluster;
@@ -135,6 +136,7 @@
135136
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
136137
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
137138
import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceActor;
139+
import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor;
138140
import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent;
139141
import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy;
140142
import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext;
@@ -706,6 +708,13 @@ protected Receive matchAnyAfterInitialization() {
706708
.matchEquals(Control.TRIGGER_UPDATE_PRIORITY, this::triggerUpdatePriority)
707709
.match(UpdatePriority.class, this::updatePriority)
708710
.match(ConnectionSupervisorActor.RestartByConnectionType.class, this::initiateRestartByConnectionType)
711+
// ReceiveTimeout may arrive here during startup due to the supervisor's
712+
// stash-unstash-forward mechanism — ignore gracefully:
713+
.match(ReceiveTimeout.class, receiveTimeout ->
714+
log.debug("Ignoring ReceiveTimeout forwarded from supervisor during startup."))
715+
// PA_RECOVERED may be echoed back by NoOpEnforcerActor — ignore gracefully:
716+
.matchEquals(AbstractEnforcerActor.Control.PA_RECOVERED, paRecovered ->
717+
log.debug("Ignoring PA_RECOVERED echoed back by enforcer."))
709718
.build()
710719
.orElse(super.matchAnyAfterInitialization());
711720
}

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
153153
log.debug("Successfully registered for connectivity config changes.");
154154
isRegisteredForConnectivityConfigChanges = true;
155155
})
156+
.match(ReceiveTimeout.class, receiveTimeout -> {
157+
// ReceiveTimeout may still be in the stash from the initial 2s timeout set in preStart().
158+
// Cancel and ignore it — it served its purpose during startup.
159+
log.debug("Received lingering ReceiveTimeout in active state, cancelling.");
160+
getContext().cancelReceiveTimeout();
161+
})
156162
.build()
157163
.orElse(connectivityConfigModifiedBehavior(getSelf(), () -> persistenceActorChild))
158164
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));

0 commit comments

Comments
 (0)