Skip to content

Add NonEmptyHotswap #4267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: series/3.6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/std/hotswap.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def rotating(n: Int): Resource[IO, Logger[IO]] =
Hotswap.create[IO, File].flatMap { hs =>
def file(name: String): Resource[IO, File] = ???
def write(file: File, msg: String): IO[Unit] = ???

def logFileResource: Resource[IO, File] = hs.get.evalMap {
case Some(file) => file.pure[IO]
case None => IO.raiseError(new IllegalStateException("No file to write to"))
}
Resource.eval {
for {
index <- Ref[IO].of(0)
Expand All @@ -43,7 +46,7 @@ def rotating(n: Int): Resource[IO, Logger[IO]] =
def log(msg: String): IO[Unit] =
count.get.flatMap { currentCount =>
if (msg.length() < n - currentCount)
hs.get.use { currentFile =>
logFileResource.use { currentFile =>
write(currentFile, msg) *>
count.update(_ + msg.length())
}
Expand All @@ -55,7 +58,7 @@ def rotating(n: Int): Resource[IO, Logger[IO]] =
idx <- index.updateAndGet(_ + 1)
// Close the old log file and open the new one
_ <- hs.swap(file(s"$idx.log"))
_ <- hs.get.use(write(_, msg))
_ <- logFileResource.use(write(_, msg))
} yield ()
}
}
Expand Down
59 changes: 59 additions & 0 deletions docs/std/nonemptyhotswap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
id: hotswap-nonempty
title: NonEmptyHotswap
---

## Motivation

In situations where the [`Resource`](./resource.md) managed by [`Hotswap`](./hotswap.md)
is statically known to always be present, the API can be a bit tedious in a similar
manner to working with a `List` that is statically known to never be empty.

## NonEmptyHotswap

`NonEmptyHotswap` addresses this by exposing a restricted set of capabilities similar
to `Hotswap` which remove the `Option` wrapper.

```scala
sealed trait NonEmptyHotswap[F[_], R] {
def swap(next: Resource[F, R]): F[Unit]
def get: Resource[F, R]
}
```

The rotating logger example would then look something like this:

```scala
def rotating(n: Int): Resource[IO, Logger[IO]] = {
def file(name: String): Resource[IO, File] = ???
def write(file: File, msg: String): IO[Unit] = ???

Hotswap[IO, File](file("0.log").flatMap { hs =>
Resource.eval {
for {
index <- Ref[IO].of(0)
count <- Ref[IO].of(0)
} yield new Logger[IO] {
def log(msg: String): IO[Unit] =
count.get.flatMap { currentCount =>
if (msg.length() < n - currentCount)
hs.get.use { currentFile =>
write(currentFile, msg) *>
count.update(_ + msg.length())
}
else
for {
// Reset the log length counter
_ <- count.set(msg.length())
// Increment the counter for the log file name
idx <- index.updateAndGet(_ + 1)
// Close the old log file and open the new one
_ <- hs.swap(file(s"$idx.log"))
_ <- hs.get.use(write(_, msg))
} yield ()
}
}
}
}
}
```
71 changes: 7 additions & 64 deletions std/shared/src/main/scala/cats/effect/std/Hotswap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package cats.effect.std

import cats.effect.kernel.{Concurrent, Ref, Resource}
import cats.effect.kernel.syntax.all._
import cats.effect.kernel.{Concurrent, Resource}
import cats.syntax.all._

/**
Expand Down Expand Up @@ -102,70 +101,14 @@ object Hotswap {
* swapped during the lifetime of this [[Hotswap]].
*/
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] =
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
sealed abstract class State
case object Cleared extends State
case class Acquired(r: R, fin: F[Unit]) extends State
case object Finalized extends State
NonEmptyHotswap.create[F, R].map { nes =>
new Hotswap[F, R] {
override def swap(next: Resource[F, R]): F[R] =
nes.swap(next.map(_.some)) *> get.use(_.get.pure[F])

def initialize: F[Ref[F, State]] =
F.ref(Cleared)
override def get: Resource[F, Option[R]] = nes.get

def finalize(state: Ref[F, State]): F[Unit] =
state.getAndSet(Finalized).flatMap {
case Acquired(_, finalizer) => exclusive.surround(finalizer)
case Cleared => F.unit
case Finalized => raise("Hotswap already finalized")
}

def raise(message: String): F[Unit] =
F.raiseError[Unit](new RuntimeException(message))

def exclusive: Resource[F, Unit] =
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
semaphore.releaseN(Long.MaxValue))

Resource.make(initialize)(finalize).map { state =>
new Hotswap[F, R] {

override def swap(next: Resource[F, R]): F[R] =
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
swapFinalizer(Acquired(r, fin)).as(r)
}
}
}

override def get: Resource[F, Option[R]] =
Resource.makeFull[F, Option[R]] { poll =>
poll(semaphore.acquire) *> // acquire shared lock
state.get.flatMap {
case Acquired(r, _) => F.pure(Some(r))
case _ => semaphore.release.as(None)
}
} { r => if (r.isDefined) semaphore.release else F.unit }

override def clear: F[Unit] =
exclusive.surround(swapFinalizer(Cleared).uncancelable)

private def swapFinalizer(next: State): F[Unit] =
state.modify {
case Acquired(_, fin) =>
next -> fin
case Cleared =>
next -> F.unit
case Finalized =>
val fin = next match {
case Acquired(_, fin) => fin
case _ => F.unit
}
Finalized -> (fin *> raise("Cannot swap after finalization"))
}.flatten

}
override def clear: F[Unit] = nes.clear
}
}

}
167 changes: 167 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/NonEmptyHotswap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2020-2025 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel.{Concurrent, Ref, Resource}
import cats.syntax.all._

/**
* A concurrent data structure that exposes a linear sequence of `R` resources as a single
* [[cats.effect.kernel.Resource]] in `F` without accumulation.
*
* A [[NonEmptyHotswap]] is allocated within a [[cats.effect.kernel.Resource]] that dictates the
* scope of its lifetime. After creation, a `Resource[F, R]` can be swapped in by calling
* [[swap]]. The newly acquired resource is returned and is released either when the
* [[NonEmptyHotswap]] is finalized or upon the next call to [[swap]], whichever occurs first.
*
* The following diagram illustrates the linear allocation and release of three resources `r1`,
* `r2`, and `r3` cycled through [[NonEmptyHotswap]]:
*
* {{{
* create(r1) ----- swap(r2) ---- swap(r3) ---- X
* | | | |
* r1 acquired | | |
* r2 acquired | |
* r1 released r3 acquired |
* r2 released |
* r3 released
* }}}
*
* [[NonEmptyHotswap]] is particularly useful when working with effects that cycle through
* resources, like writing bytes to files or rotating files every N bytes or M seconds. Without
* [[NonEmptyHotswap]], such effects leak resources: on each file rotation, a file handle or
* some internal resource handle accumulates. With [[NonEmptyHotswap]], the only registered
* resource is the [[NonEmptyHotswap]] itself, and each file is swapped in only after swapping
* the previous one out.
*
* Born from [[Hotswap]].
*/
sealed trait NonEmptyHotswap[F[_], R] {

/**
* Allocates a new resource, closes the previous one, and returns the newly allocated `R`.
*
* When the lifetime of the [[NonEmptyHotswap]] is completed, the resource allocated by the
* most recent [[swap]] will be finalized.
*
* [[swap]] finalizes the previous resource immediately, so users must ensure that the old `R`
* is not used thereafter. Failure to do so may result in an error on the _consumer_ side. In
* any case, no resources will be leaked.
*
* For safer access to the current resource see [[get]], which guarantees that it will not be
* released while it is being used.
*
* If [[swap]] is called after the lifetime of the [[NonEmptyHotswap]] is over, it will raise
* an error, but will ensure that all resources are finalized before returning.
*/
def swap(next: Resource[F, R]): F[Unit]

/**
* Gets the current resource. The returned resource is guaranteed to be available for the
* duration of the returned resource.
*/
def get: Resource[F, R]
}

object NonEmptyHotswap {

/**
* Creates a new [[NonEmptyHotswap]] initialized with the specified resource, which represents
* a [[cats.effect.kernel.Resource]] that can be swapped during the lifetime of this
* [[NonEmptyHotswap]].
*/
def apply[F[_], R](initial: Resource[F, R])(
implicit F: Concurrent[F]): Resource[F, NonEmptyHotswap[F, R]] =
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
sealed abstract class State
case class Acquired(r: R, fin: F[Unit]) extends State
case object Finalized extends State

def initialize: F[Ref[F, State]] =
F.uncancelable { poll =>
poll(initial.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
F.ref(Acquired(r, fin))
}
}
}

def finalize(state: Ref[F, State]): F[Unit] =
state.getAndSet(Finalized).flatMap {
case Acquired(_, finalizer) => exclusive.surround(finalizer)
case Finalized => raise("NonEmptyHotswap already finalized")
}

def raise[A](message: String): F[A] =
F.raiseError[A](new IllegalStateException(message))

def exclusive: Resource[F, Unit] =
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
semaphore.releaseN(Long.MaxValue))

Resource.make(initialize)(finalize).map { state =>
new NonEmptyHotswap[F, R] {

override def swap(next: Resource[F, R]): F[Unit] =
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
swapFinalizer(Acquired(r, fin))
}
}
}

override def get: Resource[F, R] =
Resource.makeFull[F, R] { poll =>
poll(semaphore.acquire) *> // acquire shared lock
state.get.flatMap {
case Acquired(r, _) => F.pure(r)
case _ => raise("NonEmptyHotswap already finalized")
}
}(_ => semaphore.release)

private def swapFinalizer(next: State): F[Unit] =
state.flatModify {
case Acquired(_, fin) =>
next -> fin
case Finalized =>
val fin = next match {
case Acquired(_, fin) => fin
case _ => F.unit
}
Finalized -> (fin *> raise[Unit]("Cannot swap after finalization"))
}
}
}
}

/**
* Creates a `NonEmptyHotswap` that is largely functionally equivalent to `Hotswap`
*
* The primary difference is that [[NonEmptyHotswap.swap]] does not leak the resource.
*/
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, NonEmptyHotswap[F, Option[R]]] =
apply[F, Option[R]](Resource.pure(none))

implicit final class NonEmptyHotswapThatIsActuallyJustHotswapOps[F[_], R](
private val hs: NonEmptyHotswap[F, Option[R]])
extends AnyVal {
def clear: F[Unit] = hs.swap(Resource.pure(none))
}
}
10 changes: 0 additions & 10 deletions tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,5 @@ class HotswapSpec extends BaseSpec { outer =>

TestControl.executeEmbed(go, IORuntimeConfig(1, 2)).replicateA_(1000) must completeAs(())
}

"get should not acquire a lock when there is no resource present" in ticked {
implicit ticker =>
val go = Hotswap.create[IO, Unit].use { hs =>
hs.get.useForever.start *>
IO.sleep(2.seconds) *>
hs.swap(Resource.unit)
}
go must completeAs(())
}
}
}
Loading
Loading