Skip to content

Commit 5a65e24

Browse files
KhraksMamtsovtim-smart
authored andcommitted
Add Schedule.CurrentIterationMetadata Reference (#4828)
Co-authored-by: Tim <hello@timsmart.co>
1 parent 0a8c0e7 commit 5a65e24

File tree

9 files changed

+485
-21
lines changed

9 files changed

+485
-21
lines changed

.changeset/calm-geese-carry.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
`Schedule.CurrentIterationMetadata` has been added
6+
7+
```ts
8+
import { Effect, Schedule } from "effect"
9+
10+
Effect.gen(function* () {
11+
const currentIterationMetadata = yield* Schedule.CurrentIterationMetadata
12+
// ^? Schedule.IterationMetadata
13+
14+
console.log(currentIterationMetadata)
15+
}).pipe(Effect.repeat(Schedule.recurs(2)))
16+
// {
17+
// elapsed: Duration.zero,
18+
// elapsedSincePrevious: Duration.zero,
19+
// input: undefined,
20+
// now: 0,
21+
// recurrence: 0,
22+
// start: 0
23+
// }
24+
// {
25+
// elapsed: Duration.zero,
26+
// elapsedSincePrevious: Duration.zero,
27+
// input: undefined,
28+
// now: 0,
29+
// recurrence: 1,
30+
// start: 0
31+
// }
32+
// {
33+
// elapsed: Duration.zero,
34+
// elapsedSincePrevious: Duration.zero,
35+
// input: undefined,
36+
// now: 0,
37+
// recurrence: 2,
38+
// start: 0
39+
// }
40+
41+
Effect.gen(function* () {
42+
const currentIterationMetadata = yield* Schedule.CurrentIterationMetadata
43+
44+
console.log(currentIterationMetadata)
45+
}).pipe(
46+
Effect.schedule(
47+
Schedule.intersect(Schedule.fibonacci("1 second"), Schedule.recurs(3))
48+
)
49+
)
50+
// {
51+
// elapsed: Duration.zero,
52+
// elapsedSincePrevious: Duration.zero,
53+
// recurrence: 1,
54+
// input: undefined,
55+
// now: 0,
56+
// start: 0
57+
// },
58+
// {
59+
// elapsed: Duration.seconds(1),
60+
// elapsedSincePrevious: Duration.seconds(1),
61+
// recurrence: 2,
62+
// input: undefined,
63+
// now: 1000,
64+
// start: 0
65+
// },
66+
// {
67+
// elapsed: Duration.seconds(2),
68+
// elapsedSincePrevious: Duration.seconds(1),
69+
// recurrence: 3,
70+
// input: undefined,
71+
// now: 2000,
72+
// start: 0
73+
// }
74+
```

packages/effect/src/Schedule.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import * as internal from "./internal/schedule.js"
1414
import type * as Option from "./Option.js"
1515
import type { Pipeable } from "./Pipeable.js"
1616
import type { Predicate } from "./Predicate.js"
17+
import type * as Ref from "./Ref.js"
1718
import type * as ScheduleDecision from "./ScheduleDecision.js"
1819
import type * as Intervals from "./ScheduleIntervals.js"
1920
import type * as Types from "./Types.js"
@@ -140,6 +141,7 @@ export declare namespace Schedule {
140141
*/
141142
export interface ScheduleDriver<out Out, in In = unknown, out R = never> extends Schedule.DriverVariance<Out, In, R> {
142143
readonly state: Effect.Effect<unknown>
144+
readonly iterationMeta: Ref.Ref<IterationMetadata>
143145
readonly last: Effect.Effect<Out, Cause.NoSuchElementException>
144146
readonly reset: Effect.Effect<void>
145147
next(input: In): Effect.Effect<Out, Option.Option<never>, R>
@@ -2184,3 +2186,33 @@ export const zipWith: {
21842186
f: (out: Out, out2: Out2) => Out3
21852187
): Schedule<Out3, In & In2, R | R2>
21862188
} = internal.zipWith
2189+
2190+
/**
2191+
* @since 3.15.0
2192+
* @category models
2193+
*/
2194+
export interface CurrentIterationMetadata {
2195+
readonly _: unique symbol
2196+
}
2197+
2198+
/**
2199+
* @since 3.15.0
2200+
* @category models
2201+
*/
2202+
export interface IterationMetadata {
2203+
readonly input: unknown
2204+
readonly recurrence: number
2205+
readonly start: number
2206+
readonly now: number
2207+
readonly elapsed: Duration.Duration
2208+
readonly elapsedSincePrevious: Duration.Duration
2209+
}
2210+
2211+
/**
2212+
* @since 3.15.0
2213+
* @category models
2214+
*/
2215+
export const CurrentIterationMetadata: Context.Reference<
2216+
CurrentIterationMetadata,
2217+
IterationMetadata
2218+
> = internal.CurrentIterationMetadata

packages/effect/src/internal/schedule.ts

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,22 @@ export const ScheduleDriverTypeId: Schedule.ScheduleDriverTypeId = Symbol.for(
4545
ScheduleDriverSymbolKey
4646
) as Schedule.ScheduleDriverTypeId
4747

48+
/** @internal */
49+
const defaultIterationMetadata: Schedule.IterationMetadata = {
50+
start: 0,
51+
now: 0,
52+
input: undefined,
53+
elapsed: Duration.zero,
54+
elapsedSincePrevious: Duration.zero,
55+
recurrence: 0
56+
}
57+
58+
/** @internal */
59+
export const CurrentIterationMetadata = Context.Reference<Schedule.CurrentIterationMetadata>()(
60+
"effect/Schedule/CurrentIterationMetadata",
61+
{ defaultValue: () => defaultIterationMetadata }
62+
)
63+
4864
const scheduleVariance = {
4965
/* c8 ignore next */
5066
_Out: (_: never) => _,
@@ -80,6 +96,31 @@ class ScheduleImpl<S, Out, In, R> implements Schedule.Schedule<Out, In, R> {
8096
}
8197
}
8298

99+
/** @internal */
100+
const updateInfo = (
101+
iterationMetaRef: Ref.Ref<Schedule.IterationMetadata>,
102+
now: number,
103+
input: unknown
104+
) =>
105+
ref.update(iterationMetaRef, (prev) =>
106+
(prev.recurrence === 0) ?
107+
{
108+
now,
109+
input,
110+
recurrence: prev.recurrence + 1,
111+
elapsed: Duration.zero,
112+
elapsedSincePrevious: Duration.zero,
113+
start: now
114+
} :
115+
{
116+
now,
117+
input,
118+
recurrence: prev.recurrence + 1,
119+
elapsed: Duration.millis(now - prev.start),
120+
elapsedSincePrevious: Duration.millis(now - prev.now),
121+
start: prev.start
122+
})
123+
83124
/** @internal */
84125
class ScheduleDriverImpl<Out, In, R> implements Schedule.ScheduleDriver<Out, In, R> {
85126
[ScheduleDriverTypeId] = scheduleDriverVariance
@@ -106,8 +147,12 @@ class ScheduleDriverImpl<Out, In, R> implements Schedule.ScheduleDriver<Out, In,
106147
})
107148
}
108149

150+
iterationMeta = ref.unsafeMake(defaultIterationMetadata)
151+
109152
get reset(): Effect.Effect<void> {
110-
return ref.set(this.ref, [Option.none(), this.schedule.initial])
153+
return ref.set(this.ref, [Option.none(), this.schedule.initial]).pipe(
154+
core.zipLeft(ref.set(this.iterationMeta, defaultIterationMetadata))
155+
)
111156
}
112157

113158
next(input: In): Effect.Effect<Out, Option.Option<never>, R> {
@@ -122,15 +167,22 @@ class ScheduleDriverImpl<Out, In, R> implements Schedule.ScheduleDriver<Out, In,
122167
core.flatMap(([state, out, decision]) => {
123168
const setState = ref.set(this.ref, [Option.some(out), state] as const)
124169
if (ScheduleDecision.isDone(decision)) {
125-
return core.zipRight(setState, core.fail(Option.none()))
170+
return setState.pipe(
171+
core.zipRight(core.fail(Option.none()))
172+
)
126173
}
127174
const millis = Intervals.start(decision.intervals) - now
128175
if (millis <= 0) {
129-
return core.as(setState, out)
176+
return setState.pipe(
177+
core.zipRight(updateInfo(this.iterationMeta, now, input)),
178+
core.as(out)
179+
)
130180
}
181+
const duration = Duration.millis(millis)
131182
return pipe(
132183
setState,
133-
core.zipRight(effect.sleep(Duration.millis(millis))),
184+
core.zipRight(updateInfo(this.iterationMeta, now, input)),
185+
core.zipRight(effect.sleep(duration)),
134186
core.as(out)
135187
)
136188
})
@@ -1874,7 +1926,22 @@ export const repeatOrElse_Effect = dual<
18741926
core.flatMap(driver(schedule), (driver) =>
18751927
core.matchEffect(self, {
18761928
onFailure: (error) => orElse(error, Option.none()),
1877-
onSuccess: (value) => repeatOrElseEffectLoop(self, driver, orElse, value)
1929+
onSuccess: (value) =>
1930+
repeatOrElseEffectLoop(
1931+
effect.provideServiceEffect(
1932+
self,
1933+
CurrentIterationMetadata,
1934+
ref.get(driver.iterationMeta)
1935+
),
1936+
driver,
1937+
(error, option) =>
1938+
effect.provideServiceEffect(
1939+
orElse(error, option),
1940+
CurrentIterationMetadata,
1941+
ref.get(driver.iterationMeta)
1942+
),
1943+
value
1944+
)
18781945
})))
18791946

18801947
/** @internal */
@@ -1883,16 +1950,15 @@ const repeatOrElseEffectLoop = <A, E, R, R1, B, C, E2, R2>(
18831950
driver: Schedule.ScheduleDriver<B, A, R1>,
18841951
orElse: (error: E, option: Option.Option<B>) => Effect.Effect<C, E2, R2>,
18851952
value: A
1886-
): Effect.Effect<B | C, E2, R | R1 | R2> => {
1887-
return core.matchEffect(driver.next(value), {
1953+
): Effect.Effect<B | C, E2, R | R1 | R2> =>
1954+
core.matchEffect(driver.next(value), {
18881955
onFailure: () => core.orDie(driver.last),
18891956
onSuccess: (b) =>
18901957
core.matchEffect(self, {
18911958
onFailure: (error) => orElse(error, Option.some(b)),
18921959
onSuccess: (value) => repeatOrElseEffectLoop(self, driver, orElse, value)
18931960
})
18941961
})
1895-
}
18961962

18971963
/** @internal */
18981964
export const retry_Effect = dual<
@@ -1970,7 +2036,21 @@ export const retryOrElse_Effect = dual<
19702036
>(3, (self, policy, orElse) =>
19712037
core.flatMap(
19722038
driver(policy),
1973-
(driver) => retryOrElse_EffectLoop(self, driver, orElse)
2039+
(driver) =>
2040+
retryOrElse_EffectLoop(
2041+
effect.provideServiceEffect(
2042+
self,
2043+
CurrentIterationMetadata,
2044+
ref.get(driver.iterationMeta)
2045+
),
2046+
driver,
2047+
(e, out) =>
2048+
effect.provideServiceEffect(
2049+
orElse(e, out),
2050+
CurrentIterationMetadata,
2051+
ref.get(driver.iterationMeta)
2052+
)
2053+
)
19742054
))
19752055

19762056
/** @internal */
@@ -2022,7 +2102,16 @@ export const scheduleFrom_Effect = dual<
20222102
>(3, (self, initial, schedule) =>
20232103
core.flatMap(
20242104
driver(schedule),
2025-
(driver) => scheduleFrom_EffectLoop(self, initial, driver)
2105+
(driver) =>
2106+
scheduleFrom_EffectLoop(
2107+
effect.provideServiceEffect(
2108+
self,
2109+
CurrentIterationMetadata,
2110+
ref.get(driver.iterationMeta)
2111+
),
2112+
initial,
2113+
driver
2114+
)
20262115
))
20272116

20282117
/** @internal */
@@ -2033,7 +2122,11 @@ const scheduleFrom_EffectLoop = <In, E, R, R2, Out>(
20332122
): Effect.Effect<Out, E, R | R2> =>
20342123
core.matchEffect(driver.next(initial), {
20352124
onFailure: () => core.orDie(driver.last),
2036-
onSuccess: () => core.flatMap(self, (a) => scheduleFrom_EffectLoop(self, a, driver))
2125+
onSuccess: () =>
2126+
core.flatMap(
2127+
self,
2128+
(a) => scheduleFrom_EffectLoop(self, a, driver)
2129+
)
20372130
})
20382131

20392132
/** @internal */

packages/effect/src/internal/stream.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5382,8 +5382,13 @@ export const repeatWith = dual<
53825382
return pipe(
53835383
Schedule.driver(schedule),
53845384
Effect.map((driver) => {
5385+
const provideLastIterationInfo = provideServiceEffect(
5386+
Schedule.CurrentIterationMetadata,
5387+
Ref.get(driver.iterationMeta)
5388+
)
5389+
53855390
const scheduleOutput = pipe(driver.last, Effect.orDie, Effect.map(options.onSchedule))
5386-
const process = pipe(self, map(options.onElement), toChannel)
5391+
const process = pipe(self, provideLastIterationInfo, map(options.onElement), toChannel)
53875392
const loop: Channel.Channel<Chunk.Chunk<C>, unknown, E, unknown, void, unknown, R | R2> = channel.unwrap(
53885393
Effect.match(driver.next(void 0), {
53895394
onFailure: () => core.void,
@@ -5419,15 +5424,21 @@ export const repeatEffectWithSchedule = <A, E, R, X, A0 extends A, R2>(
54195424
): Stream.Stream<A, E, R | R2> =>
54205425
flatMap(
54215426
fromEffect(Effect.zip(effect, Schedule.driver(schedule))),
5422-
([a, driver]) =>
5423-
concat(
5427+
([a, driver]) => {
5428+
const provideLastIterationInfo = Effect.provideServiceEffect(
5429+
Schedule.CurrentIterationMetadata,
5430+
Ref.get(driver.iterationMeta)
5431+
)
5432+
return concat(
54245433
succeed(a),
54255434
unfoldEffect(a, (s) =>
54265435
Effect.matchEffect(driver.next(s as A0), {
54275436
onFailure: Effect.succeed,
5428-
onSuccess: () => Effect.map(effect, (nextA) => Option.some([nextA, nextA] as const))
5437+
onSuccess: () =>
5438+
Effect.map(provideLastIterationInfo(effect), (nextA) => Option.some([nextA, nextA] as const))
54295439
}))
54305440
)
5441+
}
54315442
)
54325443

54335444
/** @internal */
@@ -5447,6 +5458,11 @@ export const retry = dual<
54475458
): Stream.Stream<A, E, R | R2> =>
54485459
Schedule.driver(schedule).pipe(
54495460
Effect.map((driver) => {
5461+
const provideLastIterationInfo = provideServiceEffect(
5462+
Schedule.CurrentIterationMetadata,
5463+
Ref.get(driver.iterationMeta)
5464+
)
5465+
54505466
const loop: Channel.Channel<
54515467
Chunk.Chunk<A>,
54525468
unknown,
@@ -5455,7 +5471,7 @@ export const retry = dual<
54555471
unknown,
54565472
unknown,
54575473
R | R2
5458-
> = toChannel(self).pipe(
5474+
> = toChannel(provideLastIterationInfo(self)).pipe(
54595475
channel.mapOutEffect((out) => Effect.as(driver.reset, out)),
54605476
channel.catchAll((error) =>
54615477
driver.next(error as E0).pipe(

0 commit comments

Comments
 (0)