diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/enforcement/NoOpEnforcerActorPropsFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/enforcement/NoOpEnforcerActorPropsFactory.java index 04d6f1065ba..2bd5e74a91a 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/enforcement/NoOpEnforcerActorPropsFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/enforcement/NoOpEnforcerActorPropsFactory.java @@ -16,6 +16,8 @@ import com.typesafe.config.Config; +import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor; + import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; @@ -42,6 +44,9 @@ private static Props props() { @Override public Receive createReceive() { return receiveBuilder() + .matchEquals(AbstractEnforcerActor.Control.PA_RECOVERED, msg -> { + // PA_RECOVERED is a control signal from the supervisor — consume it, don't echo back. + }) .matchAny(any -> sender().tell(any, ActorRef.noSender())) .build(); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java index 01ccb06b43f..65a7e19f3f2 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java @@ -40,6 +40,7 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; +import org.apache.pekko.actor.ReceiveTimeout; import org.apache.pekko.actor.Status; import org.apache.pekko.actor.SupervisorStrategy; import org.apache.pekko.cluster.Cluster; @@ -135,6 +136,7 @@ import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceActor; +import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor; import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent; import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy; import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext; @@ -706,6 +708,13 @@ protected Receive matchAnyAfterInitialization() { .matchEquals(Control.TRIGGER_UPDATE_PRIORITY, this::triggerUpdatePriority) .match(UpdatePriority.class, this::updatePriority) .match(ConnectionSupervisorActor.RestartByConnectionType.class, this::initiateRestartByConnectionType) + // ReceiveTimeout may arrive here during startup due to the supervisor's + // stash-unstash-forward mechanism — ignore gracefully: + .match(ReceiveTimeout.class, receiveTimeout -> + log.debug("Ignoring ReceiveTimeout forwarded from supervisor during startup.")) + // PA_RECOVERED may be echoed back by NoOpEnforcerActor — ignore gracefully: + .matchEquals(AbstractEnforcerActor.Control.PA_RECOVERED, paRecovered -> + log.debug("Ignoring PA_RECOVERED echoed back by enforcer.")) .build() .orElse(super.matchAnyAfterInitialization()); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java index 37dacd87440..b9261307012 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java @@ -153,6 +153,12 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha log.debug("Successfully registered for connectivity config changes."); isRegisteredForConnectivityConfigChanges = true; }) + .match(ReceiveTimeout.class, receiveTimeout -> { + // ReceiveTimeout may still be in the stash from the initial 2s timeout set in preStart(). + // Cancel and ignore it — it served its purpose during startup. + log.debug("Received lingering ReceiveTimeout in active state, cancelling."); + getContext().cancelReceiveTimeout(); + }) .build() .orElse(connectivityConfigModifiedBehavior(getSelf(), () -> persistenceActorChild)) .orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));