Skip to content

Commit 1b30f61

Browse files
authored
ensure clientOnly mode without storage falls back to network messaging (#4825)
1 parent b93c54c commit 1b30f61

File tree

3 files changed

+16
-5
lines changed

3 files changed

+16
-5
lines changed

.changeset/poor-bananas-sip.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
ensure clientOnly mode without storage falls back to network messaging

packages/cluster/src/Runners.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ export const makeRpc: Effect.Effect<
430430
> = Effect.gen(function*() {
431431
const makeClientProtocol = yield* RpcClientProtocol
432432
const snowflakeGen = yield* Snowflake.Generator
433+
const storageEnabled = (yield* MessageStorage.MessageStorage) !== MessageStorage.noop
433434

434435
const clients = yield* RcMap.make({
435436
lookup: (address: RunnerAddress) =>
@@ -450,7 +451,7 @@ export const makeRpc: Effect.Effect<
450451
},
451452
send({ address, message }) {
452453
const rpc = message.rpc as any as Rpc.AnyWithProps
453-
const isPersisted = Context.get(rpc.annotations, Persisted)
454+
const isPersisted = storageEnabled && Context.get(rpc.annotations, Persisted)
454455
if (message._tag === "OutgoingEnvelope") {
455456
return RcMap.get(clients, address).pipe(
456457
Effect.flatMap((client) =>

packages/cluster/src/Sharding.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import { type FromServer, RequestId } from "@effect/rpc/RpcMessage"
77
import * as Arr from "effect/Array"
88
import * as Cause from "effect/Cause"
99
import * as Context from "effect/Context"
10+
import * as Deferred from "effect/Deferred"
1011
import type { DurationInput } from "effect/Duration"
1112
import * as Effect from "effect/Effect"
1213
import * as Equal from "effect/Equal"
14+
import * as Exit from "effect/Exit"
1315
import * as Fiber from "effect/Fiber"
1416
import * as FiberHandle from "effect/FiberHandle"
1517
import * as FiberMap from "effect/FiberMap"
@@ -740,7 +742,7 @@ const make = Effect.gen(function*() {
740742

741743
yield* Effect.logDebug("Subscribing to sharding events")
742744
const mailbox = yield* shardManager.shardingEvents
743-
const startedLatch = yield* Effect.makeLatch(false)
745+
const startedLatch = yield* Deferred.make<void>()
744746

745747
const eventsFiber = yield* Effect.gen(function*() {
746748
while (true) {
@@ -750,7 +752,7 @@ const make = Effect.gen(function*() {
750752

751753
switch (event._tag) {
752754
case "StreamStarted": {
753-
yield* startedLatch.open
755+
yield* Deferred.done(startedLatch, Exit.void)
754756
break
755757
}
756758
case "ShardsAssigned": {
@@ -781,10 +783,13 @@ const make = Effect.gen(function*() {
781783
}
782784
}
783785
}
784-
}).pipe(Effect.forkScoped)
786+
}).pipe(
787+
Effect.intoDeferred(startedLatch),
788+
Effect.forkScoped
789+
)
785790

786791
// Wait for the stream to be established
787-
yield* startedLatch.await
792+
yield* Deferred.await(startedLatch)
788793

789794
// perform a full sync every config.refreshAssignmentsInterval
790795
const syncFiber = yield* syncAssignments.pipe(

0 commit comments

Comments
 (0)