From 2a1022b29a1cc0ea9e93df1c5045040f075b2ecf Mon Sep 17 00:00:00 2001 From: Jeff Hwang Date: Thu, 1 May 2025 12:29:11 -0400 Subject: [PATCH 1/9] modify partiion - wup --- .../dashboard/BackfilaWebActionsModule.kt | 4 + .../service/persistence/RunPartitionQuery.kt | 3 + .../backfila/ui/pages/BackfillShowAction.kt | 16 ++ .../ui/pages/EditPartitionCursorAction.kt | 154 ++++++++++++++++++ .../pages/EditPartitionCursorHandlerAction.kt | 89 ++++++++++ 5 files changed, 266 insertions(+) create mode 100644 service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt create mode 100644 service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt index a028900b7..a5f30daac 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt @@ -1,5 +1,7 @@ package app.cash.backfila.dashboard +import app.cash.backfila.ui.pages.EditPartitionCursorAction +import app.cash.backfila.ui.pages.EditPartitionCursorHandlerAction import misk.inject.KAbstractModule import misk.web.WebActionModule @@ -17,5 +19,7 @@ class BackfilaWebActionsModule() : KAbstractModule() { install(WebActionModule.create()) install(WebActionModule.create()) install(WebActionModule.create()) + install(WebActionModule.create()) + install(WebActionModule.create()) } } diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt index 29c14fb6a..4da677830 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt @@ -22,4 +22,7 @@ interface RunPartitionQuery : Query { @Order("partition_name") fun orderByName(): RunPartitionQuery + + @Constraint("id", Operator.EQ) + fun partitionId(partitionId: Long): RunPartitionQuery } diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt index 2761b8b5d..49fe24917 100644 --- a/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt @@ -157,6 +157,12 @@ class BackfillShowAction @Inject constructor( scope = ThScope.col +"""ETA""" } + if (backfill.state == BackfillState.PAUSED) { + th(classes = "py-3 pl-8 pr-0 text-right font-semibold") { + scope = ThScope.col + +"""Actions""" + } + } } } tbody { @@ -198,6 +204,16 @@ class BackfillShowAction @Inject constructor( } } } + if (backfill.state == BackfillState.PAUSED) { + td("py-5 pl-8 pr-0 text-right align-top") { + a( + href = EditPartitionCursorAction.path(id, partition.name), + classes = "text-indigo-600 hover:text-indigo-900", + ) { + +"Edit Cursor" + } + } + } } } } diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt new file mode 100644 index 000000000..a38c3e2e8 --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt @@ -0,0 +1,154 @@ +package app.cash.backfila.ui.pages + +import app.cash.backfila.dashboard.GetBackfillStatusAction +import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.ui.components.DashboardPageLayout +import javax.inject.Inject +import javax.inject.Singleton +import kotlinx.html.ButtonType +import kotlinx.html.FormMethod +import kotlinx.html.InputType +import kotlinx.html.a +import kotlinx.html.button +import kotlinx.html.div +import kotlinx.html.form +import kotlinx.html.h1 +import kotlinx.html.input +import kotlinx.html.label +import kotlinx.html.p +import misk.hibernate.Query +import misk.hibernate.Transacter +import misk.security.authz.Authenticated +import misk.tailwind.Link +import misk.web.Get +import misk.web.PathParam +import misk.web.Response +import misk.web.ResponseBody +import misk.web.ResponseContentType +import misk.web.actions.WebAction +import misk.web.mediatype.MediaTypes + +@Singleton +class EditPartitionCursorAction @Inject constructor( + private val getBackfillStatusAction: GetBackfillStatusAction, + private val dashboardPageLayout: DashboardPageLayout, + @BackfilaDb private val transacter: Transacter, + private val queryFactory: Query.Factory, +) : WebAction { + + @Get("/backfills/{id}/partitions/{partitionName}/edit-cursor") + @ResponseContentType(MediaTypes.TEXT_HTML) + @Authenticated(capabilities = ["users"]) + fun get( + @PathParam id: Long, + @PathParam partitionName: String, + ): Response { + val backfill = getBackfillStatusAction.status(id) + + val partition = backfill.partitions.find { it.name == partitionName } + ?: throw IllegalArgumentException("Partition not found") + + // Take a snapshot of current cursor for validation + val cursorSnapshot = partition.pkey_cursor + + return Response( + dashboardPageLayout.newBuilder() + .title("Edit Cursor - Partition $partitionName") + .breadcrumbLinks( + Link("Backfill #$id", BackfillShowAction.path(id)), + Link("Edit Cursor", path(id, partitionName)), + ) + .buildHtmlResponseBody { + div("space-y-6 max-w-2xl mx-auto py-8") { + h1("text-xl font-semibold") { + +"Edit Cursor for Partition: $partitionName" + } + + div("rounded-md bg-yellow-50 p-4 mb-6") { + div("flex") { + div("flex-shrink-0") { + // Warning icon + div("h-5 w-5 text-yellow-400") { + +"⚠️" + } + } + div("ml-3") { + h1("text-sm font-medium text-yellow-800") { + +"Warning: Editing cursors can be dangerous" + } + div("mt-2 text-sm text-yellow-700") { + p { + +"Make sure you understand the implications of changing the cursor position. Records between the old and new cursor positions may be skipped or processed multiple times." + } + } + } + } + } + + form { + method = FormMethod.post + action = EditPartitionCursorHandlerAction.path(id, partitionName) + + input { + type = InputType.hidden + name = "cursor_snapshot" + value = cursorSnapshot ?: "" + } + + div("space-y-4") { + div { + label("block text-sm font-medium text-gray-700") { + htmlFor = "current_cursor" + +"Current Cursor" + } + div("mt-1") { + input(classes = "block w-full rounded-md border-0 py-1.5 text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 placeholder:text-gray-400 focus:ring-2 focus:ring-inset focus:ring-indigo-600 sm:text-sm sm:leading-6") { + type = InputType.text + attributes["id"] = "current_cursor" + value = cursorSnapshot ?: "Not started" + disabled = true + } + } + } + + div { + label("block text-sm font-medium text-gray-700") { + htmlFor = "new_cursor" + +"New Cursor" + } + div("mt-1") { + input(classes = "block w-full rounded-md border-0 py-1.5 text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 placeholder:text-gray-400 focus:ring-2 focus:ring-inset focus:ring-indigo-600 sm:text-sm sm:leading-6") { + type = InputType.text + name = "new_cursor" + attributes["id"] = "new_cursor" + value = cursorSnapshot ?: "" + required = true + } + } + p("mt-2 text-sm text-gray-500") { + +"Enter the new cursor value. This must be a valid UTF-8 string." + } + } + + div("flex justify-end gap-3") { + a(href = BackfillShowAction.path(id), classes = "rounded-md bg-white px-3 py-2 text-sm font-semibold text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 hover:bg-gray-50") { + +"Cancel" + } + button(classes = "rounded-md bg-indigo-600 px-3 py-2 text-sm font-semibold text-white shadow-sm hover:bg-indigo-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-indigo-600") { + type = ButtonType.submit + +"Update Cursor" + } + } + } + } + } + }, + ) + } + + companion object { + fun path(id: Long, partitionName: String): String { + return "/backfills/$id/partitions/$partitionName/edit-cursor" + } + } +} diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt new file mode 100644 index 000000000..bb8480f78 --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt @@ -0,0 +1,89 @@ +package app.cash.backfila.ui.pages + +import app.cash.backfila.dashboard.GetBackfillStatusAction +import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.RunPartitionQuery +import java.net.HttpURLConnection +import java.net.URLEncoder +import javax.inject.Inject +import javax.inject.Singleton +import misk.exceptions.BadRequestException +import misk.hibernate.Query +import misk.hibernate.Transacter +import misk.hibernate.newQuery +import misk.security.authz.Authenticated +import misk.web.FormValue +import misk.web.PathParam +import misk.web.Post +import misk.web.RequestContentType +import misk.web.Response +import misk.web.ResponseBody +import misk.web.actions.WebAction +import misk.web.mediatype.MediaTypes +import misk.web.toResponseBody +import okhttp3.Headers +import okio.ByteString.Companion.encodeUtf8 + +@Singleton +class EditPartitionCursorHandlerAction @Inject constructor( + private val getBackfillStatusAction: GetBackfillStatusAction, + @BackfilaDb private val transacter: Transacter, + private val queryFactory: Query.Factory, +) : WebAction { + + @Post("/backfills/{id}/partitions/{partitionName}/edit-cursor") + @RequestContentType(MediaTypes.APPLICATION_FORM_URLENCODED) + @Authenticated(capabilities = ["users"]) + fun post( + @PathParam id: Long, + @PathParam partitionName: String, + @FormValue cursorSnapshot: String, + @FormValue newCursor: String, + ): Response { + // Validate UTF-8 + try { + newCursor.toByteArray(Charsets.UTF_8).toString(Charsets.UTF_8) + } catch (e: Exception) { + throw BadRequestException("New cursor must be valid UTF-8") + } + + // Verify backfill state and cursor hasn't changed + val backfill = getBackfillStatusAction.status(id) + if (backfill.state != BackfillState.PAUSED) { + throw BadRequestException("Backfill must be paused to edit cursors") + } + + val partition = backfill.partitions.find { it.name == partitionName } + ?: throw BadRequestException("Partition not found") + + if (partition.pkey_cursor != cursorSnapshot) { + throw BadRequestException("Cursor has changed since edit form was loaded") + } + + // Update the cursor + transacter.transaction { session -> + queryFactory.newQuery() + .partitionId(partition.id) + .uniqueResult(session) + ?.let { partitionRecord -> + partitionRecord.pkey_cursor = newCursor.encodeUtf8() + session.save(partitionRecord) + } ?: throw BadRequestException("Partition not found") + } + + return Response( + body = "".toResponseBody(), + headers = Headers.Builder() + .add("Location", BackfillShowAction.path(id)) + .build(), + statusCode = HttpURLConnection.HTTP_MOVED_TEMP, + ) + } + + companion object { + fun path(id: Long, partitionName: String): String { + return "/backfills/$id/partitions/${URLEncoder.encode(partitionName, "UTF-8")}/edit-cursor" + } + } +} From 5d73d872ebf687eb1931c81c4e01aa2a6f5c97f8 Mon Sep 17 00:00:00 2001 From: Jeff Hwang Date: Mon, 5 May 2025 12:33:22 -0400 Subject: [PATCH 2/9] add modifying partition end points and UI --- .../dashboard/EditPartitionCursorAction.kt | 150 ++++++++++++++++++ .../EditPartitionCursorHandlerAction.kt | 123 ++++++++++++++ .../backfila/ui/pages/BackfillShowAction.kt | 1 + 3 files changed, 274 insertions(+) create mode 100644 service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorAction.kt create mode 100644 service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorAction.kt new file mode 100644 index 000000000..59cd78f19 --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorAction.kt @@ -0,0 +1,150 @@ +package app.cash.backfila.dashboard + +import app.cash.backfila.ui.components.DashboardPageLayout +import app.cash.backfila.ui.pages.BackfillShowAction +import javax.inject.Inject +import javax.inject.Singleton +import kotlinx.html.ButtonType +import kotlinx.html.FormMethod +import kotlinx.html.InputType +import kotlinx.html.a +import kotlinx.html.button +import kotlinx.html.div +import kotlinx.html.form +import kotlinx.html.h1 +import kotlinx.html.input +import kotlinx.html.label +import kotlinx.html.p +import misk.security.authz.Authenticated +import misk.tailwind.Link +import misk.web.Get +import misk.web.PathParam +import misk.web.Response +import misk.web.ResponseBody +import misk.web.ResponseContentType +import misk.web.actions.WebAction +import misk.web.mediatype.MediaTypes + +@Singleton +class EditPartitionCursorAction @Inject constructor( + private val getBackfillStatusAction: GetBackfillStatusAction, + private val dashboardPageLayout: DashboardPageLayout, +) : WebAction { + + @Get(PATH) + @ResponseContentType(MediaTypes.TEXT_HTML) + @Authenticated(capabilities = ["users"]) + fun get( + @PathParam id: Long, + @PathParam partitionName: String, + ): Response { + val backfill = getBackfillStatusAction.status(id) + + val partition = backfill.partitions.find { it.name == partitionName } + ?: throw IllegalArgumentException("Partition not found") + + // Take a snapshot of current cursor for validation + val cursorSnapshot = partition.pkey_cursor + + return Response( + dashboardPageLayout.newBuilder() + .title("Edit Cursor - Partition $partitionName") + .breadcrumbLinks( + Link("Backfill #$id", BackfillShowAction.path(id)), + Link("Edit Cursor", path(id, partitionName)), + ) + .buildHtmlResponseBody { + div("space-y-6 max-w-2xl mx-auto py-8") { + h1("text-xl font-semibold") { + +"Edit Cursor for Partition: $partitionName" + } + + div("rounded-md bg-yellow-50 p-4 mb-6") { + div("flex") { + div("flex-shrink-0") { + // Warning icon + div("h-5 w-5 text-yellow-400") { + +"⚠️" + } + } + div("ml-3") { + h1("text-sm font-medium text-yellow-800") { + +"Warning: Editing cursors can be dangerous" + } + div("mt-2 text-sm text-yellow-700") { + p { + +"Make sure you understand the implications of changing the cursor position. Records between the old and new cursor positions may be skipped or processed multiple times." + } + } + } + } + } + + form { + method = FormMethod.get + action = EditPartitionCursorHandlerAction.path(id, partitionName) + + input { + type = InputType.hidden + name = "cursor_snapshot" + value = cursorSnapshot ?: "" + } + + div("space-y-4") { + div { + label("block text-sm font-medium text-gray-700") { + htmlFor = "current_cursor" + +"Current Cursor" + } + div("mt-1") { + input(classes = "block w-full rounded-md border-0 py-1.5 text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 placeholder:text-gray-400 focus:ring-2 focus:ring-inset focus:ring-indigo-600 sm:text-sm sm:leading-6") { + type = InputType.text + attributes["id"] = "current_cursor" + value = cursorSnapshot ?: "Not started" + disabled = true + } + } + } + + div { + label("block text-sm font-medium text-gray-700") { + htmlFor = "new_cursor" + +"New Cursor" + } + div("mt-1") { + input(classes = "block w-full rounded-md border-0 py-1.5 text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 placeholder:text-gray-400 focus:ring-2 focus:ring-inset focus:ring-indigo-600 sm:text-sm sm:leading-6") { + type = InputType.text + name = "new_cursor" + attributes["id"] = "new_cursor" + value = cursorSnapshot ?: "" + required = true + } + } + p("mt-2 text-sm text-gray-500") { + +"Enter the new cursor value. This must be a valid UTF-8 string." + } + } + + div("flex justify-end gap-3") { + a(href = BackfillShowAction.path(id), classes = "rounded-md bg-white px-3 py-2 text-sm font-semibold text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 hover:bg-gray-50") { + +"Cancel" + } + button(classes = "rounded-md bg-indigo-600 px-3 py-2 text-sm font-semibold text-white shadow-sm hover:bg-indigo-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-indigo-600") { + type = ButtonType.submit + +"Update Cursor" + } + } + } + } + } + }, + ) + } + + companion object { + private const val PATH = "/backfills/{id}/{partitions}/{partitionName}/edit-cursor" + fun path(id: Long, partitionName: String) = PATH + .replace("{id}", id.toString()) + .replace("{partitionName}", partitionName) + } +} diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt new file mode 100644 index 000000000..eb8419e9c --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt @@ -0,0 +1,123 @@ +package app.cash.backfila.dashboard + +import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.RunPartitionQuery +import app.cash.backfila.ui.components.DashboardPageLayout +import app.cash.backfila.ui.pages.BackfillShowAction +import javax.inject.Inject +import javax.inject.Singleton +import kotlinx.html.div +import kotlinx.html.h1 +import kotlinx.html.p +import misk.exceptions.BadRequestException +import misk.hibernate.Query +import misk.hibernate.Transacter +import misk.hibernate.newQuery +import misk.scope.ActionScoped +import misk.security.authz.Authenticated +import misk.tailwind.Link +import misk.web.Get +import misk.web.HttpCall +import misk.web.PathParam +import misk.web.Response +import misk.web.ResponseBody +import misk.web.ResponseContentType +import misk.web.actions.WebAction +import misk.web.mediatype.MediaTypes +import okio.ByteString.Companion.encodeUtf8 + +@Singleton +class EditPartitionCursorHandlerAction @Inject constructor( + private val getBackfillStatusAction: GetBackfillStatusAction, + @BackfilaDb private val transacter: Transacter, + private val queryFactory: Query.Factory, + private val httpCall: ActionScoped, + private val dashboardPageLayout: DashboardPageLayout, +) : WebAction { + + @Get(PATH) + @ResponseContentType(MediaTypes.TEXT_HTML) + @Authenticated(capabilities = ["users"]) + fun get( + @PathParam id: Long, + @PathParam partitionName: String, + ): Response { + val request = httpCall.get().asOkHttpRequest() + val cursorSnapshot = request.url.queryParameter("cursor_snapshot") + val newCursor = request.url.queryParameter("new_cursor") + + // Validate UTF-8 + try { + newCursor?.toByteArray(Charsets.UTF_8)?.toString(Charsets.UTF_8) + } catch (e: Exception) { + throw BadRequestException("New cursor must be valid UTF-8") + } + + // Verify backfill state and cursor hasn't changed + val backfill = getBackfillStatusAction.status(id) + if (backfill.state != BackfillState.PAUSED) { + throw BadRequestException("Backfill must be paused to edit cursors") + } + + val partition = backfill.partitions.find { it.name == partitionName } + ?: throw BadRequestException("Partition not found") + + if (partition.pkey_cursor != cursorSnapshot) { + throw BadRequestException("Cursor has changed since edit form was loaded") + } + + // Update the cursor + transacter.transaction { session -> + queryFactory.newQuery() + .partitionId(partition.id) + .uniqueResult(session) + ?.let { partitionRecord -> + partitionRecord.pkey_cursor = newCursor?.encodeUtf8() + session.save(partitionRecord) + } ?: throw BadRequestException("Partition not found") + } + + // Return success page with updated form + return Response( + dashboardPageLayout.newBuilder() + .title("Edit Cursor - Partition $partitionName") + .breadcrumbLinks( + Link("Backfill #$id", BackfillShowAction.path(id)), + Link("Edit Cursor", path(id, partitionName)), + ) + .buildHtmlResponseBody { + div("space-y-6 max-w-2xl mx-auto py-8") { + // Success message + div("rounded-md bg-green-50 p-4 mb-6") { + div("flex") { + div("flex-shrink-0") { + // Success icon (checkmark) + div("h-5 w-5 text-green-400") { + +"✓" + } + } + div("ml-3") { + h1("text-sm font-medium text-green-800") { + +"Success" + } + div("mt-2 text-sm text-green-700") { + p { + +"Cursor has been updated successfully." + } + } + } + } + } + } + }, + ) + } + + companion object { + private const val PATH = "/backfills/{id}/{partitionName}/edit-cursor" + fun path(id: Long, partitionName: String) = PATH + .replace("{id}", id.toString()) + .replace("{partitionName}", partitionName) + } +} diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt index 49fe24917..bee3eda3d 100644 --- a/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt @@ -1,5 +1,6 @@ package app.cash.backfila.ui.pages +import app.cash.backfila.dashboard.EditPartitionCursorAction import app.cash.backfila.dashboard.GetBackfillStatusAction import app.cash.backfila.dashboard.GetBackfillStatusResponse import app.cash.backfila.dashboard.ViewLogsAction From 64f542df30c5a73dc34f61e22baabbe3dde6c115 Mon Sep 17 00:00:00 2001 From: Jeff Hwang Date: Mon, 5 May 2025 12:37:11 -0400 Subject: [PATCH 3/9] remove extra files --- .../dashboard/BackfilaWebActionsModule.kt | 2 - .../ui/pages/EditPartitionCursorAction.kt | 154 ------------------ .../pages/EditPartitionCursorHandlerAction.kt | 89 ---------- 3 files changed, 245 deletions(-) delete mode 100644 service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt delete mode 100644 service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt index a5f30daac..97ff0adb9 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/BackfilaWebActionsModule.kt @@ -1,7 +1,5 @@ package app.cash.backfila.dashboard -import app.cash.backfila.ui.pages.EditPartitionCursorAction -import app.cash.backfila.ui.pages.EditPartitionCursorHandlerAction import misk.inject.KAbstractModule import misk.web.WebActionModule diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt deleted file mode 100644 index a38c3e2e8..000000000 --- a/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorAction.kt +++ /dev/null @@ -1,154 +0,0 @@ -package app.cash.backfila.ui.pages - -import app.cash.backfila.dashboard.GetBackfillStatusAction -import app.cash.backfila.service.persistence.BackfilaDb -import app.cash.backfila.ui.components.DashboardPageLayout -import javax.inject.Inject -import javax.inject.Singleton -import kotlinx.html.ButtonType -import kotlinx.html.FormMethod -import kotlinx.html.InputType -import kotlinx.html.a -import kotlinx.html.button -import kotlinx.html.div -import kotlinx.html.form -import kotlinx.html.h1 -import kotlinx.html.input -import kotlinx.html.label -import kotlinx.html.p -import misk.hibernate.Query -import misk.hibernate.Transacter -import misk.security.authz.Authenticated -import misk.tailwind.Link -import misk.web.Get -import misk.web.PathParam -import misk.web.Response -import misk.web.ResponseBody -import misk.web.ResponseContentType -import misk.web.actions.WebAction -import misk.web.mediatype.MediaTypes - -@Singleton -class EditPartitionCursorAction @Inject constructor( - private val getBackfillStatusAction: GetBackfillStatusAction, - private val dashboardPageLayout: DashboardPageLayout, - @BackfilaDb private val transacter: Transacter, - private val queryFactory: Query.Factory, -) : WebAction { - - @Get("/backfills/{id}/partitions/{partitionName}/edit-cursor") - @ResponseContentType(MediaTypes.TEXT_HTML) - @Authenticated(capabilities = ["users"]) - fun get( - @PathParam id: Long, - @PathParam partitionName: String, - ): Response { - val backfill = getBackfillStatusAction.status(id) - - val partition = backfill.partitions.find { it.name == partitionName } - ?: throw IllegalArgumentException("Partition not found") - - // Take a snapshot of current cursor for validation - val cursorSnapshot = partition.pkey_cursor - - return Response( - dashboardPageLayout.newBuilder() - .title("Edit Cursor - Partition $partitionName") - .breadcrumbLinks( - Link("Backfill #$id", BackfillShowAction.path(id)), - Link("Edit Cursor", path(id, partitionName)), - ) - .buildHtmlResponseBody { - div("space-y-6 max-w-2xl mx-auto py-8") { - h1("text-xl font-semibold") { - +"Edit Cursor for Partition: $partitionName" - } - - div("rounded-md bg-yellow-50 p-4 mb-6") { - div("flex") { - div("flex-shrink-0") { - // Warning icon - div("h-5 w-5 text-yellow-400") { - +"⚠️" - } - } - div("ml-3") { - h1("text-sm font-medium text-yellow-800") { - +"Warning: Editing cursors can be dangerous" - } - div("mt-2 text-sm text-yellow-700") { - p { - +"Make sure you understand the implications of changing the cursor position. Records between the old and new cursor positions may be skipped or processed multiple times." - } - } - } - } - } - - form { - method = FormMethod.post - action = EditPartitionCursorHandlerAction.path(id, partitionName) - - input { - type = InputType.hidden - name = "cursor_snapshot" - value = cursorSnapshot ?: "" - } - - div("space-y-4") { - div { - label("block text-sm font-medium text-gray-700") { - htmlFor = "current_cursor" - +"Current Cursor" - } - div("mt-1") { - input(classes = "block w-full rounded-md border-0 py-1.5 text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 placeholder:text-gray-400 focus:ring-2 focus:ring-inset focus:ring-indigo-600 sm:text-sm sm:leading-6") { - type = InputType.text - attributes["id"] = "current_cursor" - value = cursorSnapshot ?: "Not started" - disabled = true - } - } - } - - div { - label("block text-sm font-medium text-gray-700") { - htmlFor = "new_cursor" - +"New Cursor" - } - div("mt-1") { - input(classes = "block w-full rounded-md border-0 py-1.5 text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 placeholder:text-gray-400 focus:ring-2 focus:ring-inset focus:ring-indigo-600 sm:text-sm sm:leading-6") { - type = InputType.text - name = "new_cursor" - attributes["id"] = "new_cursor" - value = cursorSnapshot ?: "" - required = true - } - } - p("mt-2 text-sm text-gray-500") { - +"Enter the new cursor value. This must be a valid UTF-8 string." - } - } - - div("flex justify-end gap-3") { - a(href = BackfillShowAction.path(id), classes = "rounded-md bg-white px-3 py-2 text-sm font-semibold text-gray-900 shadow-sm ring-1 ring-inset ring-gray-300 hover:bg-gray-50") { - +"Cancel" - } - button(classes = "rounded-md bg-indigo-600 px-3 py-2 text-sm font-semibold text-white shadow-sm hover:bg-indigo-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-indigo-600") { - type = ButtonType.submit - +"Update Cursor" - } - } - } - } - } - }, - ) - } - - companion object { - fun path(id: Long, partitionName: String): String { - return "/backfills/$id/partitions/$partitionName/edit-cursor" - } - } -} diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt deleted file mode 100644 index bb8480f78..000000000 --- a/service/src/main/kotlin/app/cash/backfila/ui/pages/EditPartitionCursorHandlerAction.kt +++ /dev/null @@ -1,89 +0,0 @@ -package app.cash.backfila.ui.pages - -import app.cash.backfila.dashboard.GetBackfillStatusAction -import app.cash.backfila.service.persistence.BackfilaDb -import app.cash.backfila.service.persistence.BackfillState -import app.cash.backfila.service.persistence.RunPartitionQuery -import java.net.HttpURLConnection -import java.net.URLEncoder -import javax.inject.Inject -import javax.inject.Singleton -import misk.exceptions.BadRequestException -import misk.hibernate.Query -import misk.hibernate.Transacter -import misk.hibernate.newQuery -import misk.security.authz.Authenticated -import misk.web.FormValue -import misk.web.PathParam -import misk.web.Post -import misk.web.RequestContentType -import misk.web.Response -import misk.web.ResponseBody -import misk.web.actions.WebAction -import misk.web.mediatype.MediaTypes -import misk.web.toResponseBody -import okhttp3.Headers -import okio.ByteString.Companion.encodeUtf8 - -@Singleton -class EditPartitionCursorHandlerAction @Inject constructor( - private val getBackfillStatusAction: GetBackfillStatusAction, - @BackfilaDb private val transacter: Transacter, - private val queryFactory: Query.Factory, -) : WebAction { - - @Post("/backfills/{id}/partitions/{partitionName}/edit-cursor") - @RequestContentType(MediaTypes.APPLICATION_FORM_URLENCODED) - @Authenticated(capabilities = ["users"]) - fun post( - @PathParam id: Long, - @PathParam partitionName: String, - @FormValue cursorSnapshot: String, - @FormValue newCursor: String, - ): Response { - // Validate UTF-8 - try { - newCursor.toByteArray(Charsets.UTF_8).toString(Charsets.UTF_8) - } catch (e: Exception) { - throw BadRequestException("New cursor must be valid UTF-8") - } - - // Verify backfill state and cursor hasn't changed - val backfill = getBackfillStatusAction.status(id) - if (backfill.state != BackfillState.PAUSED) { - throw BadRequestException("Backfill must be paused to edit cursors") - } - - val partition = backfill.partitions.find { it.name == partitionName } - ?: throw BadRequestException("Partition not found") - - if (partition.pkey_cursor != cursorSnapshot) { - throw BadRequestException("Cursor has changed since edit form was loaded") - } - - // Update the cursor - transacter.transaction { session -> - queryFactory.newQuery() - .partitionId(partition.id) - .uniqueResult(session) - ?.let { partitionRecord -> - partitionRecord.pkey_cursor = newCursor.encodeUtf8() - session.save(partitionRecord) - } ?: throw BadRequestException("Partition not found") - } - - return Response( - body = "".toResponseBody(), - headers = Headers.Builder() - .add("Location", BackfillShowAction.path(id)) - .build(), - statusCode = HttpURLConnection.HTTP_MOVED_TEMP, - ) - } - - companion object { - fun path(id: Long, partitionName: String): String { - return "/backfills/$id/partitions/${URLEncoder.encode(partitionName, "UTF-8")}/edit-cursor" - } - } -} From 622b24b27f8dc77860b9d859c664e9b720b1f2e1 Mon Sep 17 00:00:00 2001 From: jeffhwang-sq Date: Mon, 5 May 2025 13:08:06 -0400 Subject: [PATCH 4/9] Add "Cancelled" Button for Paused Backfills (#442) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change introduces a Cancel button to the UI for backfills that are currently paused. Users can now manually transition a paused backfill to a CANCELLED state, providing better control over workflows that are no longer needed. Changes Include: - Added a cancel button in the backfill details view (only shown for paused backfills). - Updated run partition table state column and backfillruns table state column, handling to correctly mark backfills as CANCELLED when triggered. Next Pr will create a new deleted column to support hiding the backfill that are soft deleted. https://github.com/user-attachments/assets/b0072300-d0ef-47cb-946e-b52f97e97073 ![Screenshot 2025-04-28 at 2 49 28 PM](https://github.com/user-attachments/assets/5def7593-345c-4ab0-9cc1-2547ca534134) --- .../proto/app/cash/backfila/service.proto | 1 + .../backfila/api/CheckBackfillStatusAction.kt | 1 + .../dashboard/CancelBackfillAction.kt | 62 +++++++++++++++++++ .../service/listener/AuditClientListener.kt | 14 +++++ .../service/listener/BackfillRunListener.kt | 1 + .../backfila/service/listener/SlackHelper.kt | 9 +++ .../service/persistence/BackfillState.kt | 1 + .../service/persistence/DbBackfillRun.kt | 1 + .../backfila/service/runner/BackfillRunner.kt | 7 ++- .../BackfillShowButtonHandlerAction.kt | 16 +++-- .../backfila/ui/pages/BackfillShowAction.kt | 46 +++++++++++++- .../resources/migrations/v017__backfila.sql | 2 + .../resources/migrations/v018__backfila.sql | 2 + 13 files changed, 157 insertions(+), 6 deletions(-) create mode 100644 service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt create mode 100644 service/src/main/resources/migrations/v017__backfila.sql create mode 100644 service/src/main/resources/migrations/v018__backfila.sql diff --git a/client/src/main/proto/app/cash/backfila/service.proto b/client/src/main/proto/app/cash/backfila/service.proto index ee43d3048..a4659464c 100644 --- a/client/src/main/proto/app/cash/backfila/service.proto +++ b/client/src/main/proto/app/cash/backfila/service.proto @@ -102,5 +102,6 @@ message CheckBackfillStatusResponse { PAUSED = 1; RUNNING = 2; COMPLETE = 3; + CANCELLED = 4; } } \ No newline at end of file diff --git a/service/src/main/kotlin/app/cash/backfila/api/CheckBackfillStatusAction.kt b/service/src/main/kotlin/app/cash/backfila/api/CheckBackfillStatusAction.kt index ba1ce553f..4614616e2 100644 --- a/service/src/main/kotlin/app/cash/backfila/api/CheckBackfillStatusAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/api/CheckBackfillStatusAction.kt @@ -37,6 +37,7 @@ class CheckBackfillStatusAction @Inject constructor( BackfillState.PAUSED -> Status.PAUSED BackfillState.RUNNING -> Status.RUNNING BackfillState.COMPLETE -> Status.COMPLETE + BackfillState.CANCELLED -> Status.CANCELLED } } } diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt new file mode 100644 index 000000000..726da21f7 --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt @@ -0,0 +1,62 @@ +package app.cash.backfila.dashboard + +import app.cash.backfila.service.listener.BackfillRunListener +import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.DbBackfillRun +import app.cash.backfila.service.persistence.DbEventLog +import javax.inject.Inject +import misk.MiskCaller +import misk.exceptions.BadRequestException +import misk.hibernate.Id +import misk.hibernate.Query +import misk.hibernate.Transacter +import misk.hibernate.load +import misk.scope.ActionScoped +import misk.security.authz.Authenticated +import misk.web.PathParam +import misk.web.Post +import misk.web.RequestContentType +import misk.web.ResponseContentType +import misk.web.actions.WebAction +import misk.web.mediatype.MediaTypes + +class CancelBackfillAction @Inject constructor( + @BackfilaDb private val transacter: Transacter, + private val queryFactory: Query.Factory, + private val backfillRunListeners: Set, + private val caller: @JvmSuppressWildcards ActionScoped, +) : WebAction { + @Post("/backfill/cancel/{id}") + @RequestContentType(MediaTypes.APPLICATION_JSON) + @ResponseContentType(MediaTypes.APPLICATION_JSON) + @Authenticated(capabilities = ["users"]) + fun cancel( + @PathParam id: Long, + ) { + transacter.transaction { session -> + val backfillRun = session.load(Id(id)) + + if (backfillRun.state != BackfillState.PAUSED) { + throw BadRequestException("Cannot cancel a ${backfillRun.state.name.lowercase()} backfill") + } + + // Update state in run_partitions table + backfillRun.setState(session, queryFactory, BackfillState.CANCELLED) + + // Log the cancellation event + session.save( + DbEventLog( + backfillRun.id, + partition_id = null, + user = caller.get()?.user, + type = DbEventLog.Type.STATE_CHANGE, + message = "backfill cancelled", + ), + ) + } + + // Notify listeners + backfillRunListeners.forEach { it.runCancelled(Id(id), caller.get()?.principal ?: "") } + } +} diff --git a/service/src/main/kotlin/app/cash/backfila/service/listener/AuditClientListener.kt b/service/src/main/kotlin/app/cash/backfila/service/listener/AuditClientListener.kt index de96f6ff6..ed9a23eb6 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/listener/AuditClientListener.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/listener/AuditClientListener.kt @@ -71,6 +71,20 @@ internal class AuditClientListener @Inject constructor( ) } + override fun runCancelled(id: Id, user: String) { + val (backfillName, serviceName, description) = transacter.transaction { session -> + val run = session.load(id) + AuditEventInputs(run.registered_backfill.name, serviceName(run), "Backfill cancelled by $user ${dryRunPrefix(run)}${nameAndId(run)}") + } + auditClient.logEvent( + target = backfillName, + description = description, + requestorLDAP = user, + applicationName = serviceName, + detailURL = idUrl(id), + ) + } + private fun serviceName(run: DbBackfillRun) = if (run.service.variant == "default") { run.service.registry_name } else { diff --git a/service/src/main/kotlin/app/cash/backfila/service/listener/BackfillRunListener.kt b/service/src/main/kotlin/app/cash/backfila/service/listener/BackfillRunListener.kt index 0d345b299..03506c240 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/listener/BackfillRunListener.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/listener/BackfillRunListener.kt @@ -8,4 +8,5 @@ interface BackfillRunListener { fun runPaused(id: Id, user: String) fun runErrored(id: Id) fun runCompleted(id: Id) + fun runCancelled(id: Id, user: String) } diff --git a/service/src/main/kotlin/app/cash/backfila/service/listener/SlackHelper.kt b/service/src/main/kotlin/app/cash/backfila/service/listener/SlackHelper.kt index 53c945bc9..28b2754c1 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/listener/SlackHelper.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/listener/SlackHelper.kt @@ -52,6 +52,15 @@ class SlackHelper @Inject constructor( slackClient.postMessage("Backfila", ":backfila:", message, channel) } + override fun runCancelled(id: Id, user: String) { + val (message, channel) = transacter.transaction { session -> + val run = session.load(id) + val message = ":backfila_cancel:${dryRunEmoji(run)} ${nameAndId(run)} canceled by @$user" + message to run.service.slack_channel + } + slackClient.postMessage("Backfila", ":backfila:", message, channel) + } + private fun nameAndId(run: DbBackfillRun) = "[${deployment.name}] ${run.service.registry_name} (${run.service.variant}) `${run.registered_backfill.name}` " + "(${idLink(run.id)})" diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt index 1d16ffb5d..8dcaaae5e 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt @@ -4,4 +4,5 @@ enum class BackfillState { PAUSED, RUNNING, COMPLETE, + CANCELLED, } diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt index 3bc8239e1..2e6d4fea8 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt @@ -135,6 +135,7 @@ class DbBackfillRun() : DbUnsharded, DbTimestampedEntity { fun setState(session: Session, queryFactory: Query.Factory, state: BackfillState) { // State can't be changed after being completed. checkState(this.state != BackfillState.COMPLETE) + this.state = state // Set the state of all the partitions that are not complete val query = session.hibernateSession.createQuery( diff --git a/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt b/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt index 5a86c2752..93dd5fd1c 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt @@ -171,7 +171,12 @@ class BackfillRunner private constructor( // Now that state is stored, check if we should exit. if (dbRunPartition.run_state != BackfillState.RUNNING) { - logger.info { "Backfill is no longer in RUNNING state, stopping runner ${logLabel()}" } + val stateChange = when (dbRunPartition.run_state) { + BackfillState.PAUSED -> "paused" + BackfillState.CANCELLED -> "cancelled" + else -> dbRunPartition.run_state.name.lowercase() + } + logger.info { "Backfill is no longer in RUNNING state (now $stateChange), stopping runner ${logLabel()}" } running = false return@transaction } diff --git a/service/src/main/kotlin/app/cash/backfila/ui/actions/BackfillShowButtonHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/actions/BackfillShowButtonHandlerAction.kt index 8bd47e3e4..426734dae 100644 --- a/service/src/main/kotlin/app/cash/backfila/ui/actions/BackfillShowButtonHandlerAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/ui/actions/BackfillShowButtonHandlerAction.kt @@ -1,5 +1,6 @@ package app.cash.backfila.ui.actions +import app.cash.backfila.dashboard.CancelBackfillAction import app.cash.backfila.dashboard.StartBackfillAction import app.cash.backfila.dashboard.StartBackfillRequest import app.cash.backfila.dashboard.StopBackfillAction @@ -31,6 +32,7 @@ class BackfillShowButtonHandlerAction @Inject constructor( private val startBackfillAction: StartBackfillAction, private val stopBackfillAction: StopBackfillAction, private val updateBackfillAction: UpdateBackfillAction, + private val cancelBackfillAction: CancelBackfillAction, ) : WebAction { @Get(PATH) @ResponseContentType(MediaTypes.TEXT_HTML) @@ -44,10 +46,16 @@ class BackfillShowButtonHandlerAction @Inject constructor( if (!field_id.isNullOrBlank()) { when (field_id) { "state" -> { - if (field_value == BackfillState.PAUSED.name) { - stopBackfillAction.stop(id.toLong(), StopBackfillRequest()) - } else if (field_value == BackfillState.RUNNING.name) { - startBackfillAction.start(id.toLong(), StartBackfillRequest()) + when (field_value) { + BackfillState.PAUSED.name -> { + stopBackfillAction.stop(id.toLong(), StopBackfillRequest()) + } + BackfillState.RUNNING.name -> { + startBackfillAction.start(id.toLong(), StartBackfillRequest()) + } + BackfillState.CANCELLED.name -> { + cancelBackfillAction.cancel(id.toLong()) + } } } diff --git a/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt b/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt index 89e10985b..d206bbf4f 100644 --- a/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/ui/pages/BackfillShowAction.kt @@ -284,6 +284,7 @@ class BackfillShowAction @Inject constructor( /* Value of the button click is provided through the button.href field. */ val button: Link? = null, val updateFieldId: String? = null, + val cancelButton: Link? = null, ) private fun getStateButton(state: BackfillState): Link? { @@ -292,8 +293,9 @@ class BackfillShowAction @Inject constructor( label = START_STATE_BUTTON_LABEL, href = BackfillState.RUNNING.name, ) - + // COMPLETE and CANCELLED represent final states. BackfillState.COMPLETE -> null + BackfillState.CANCELLED -> null else -> Link( label = PAUSE_STATE_BUTTON_LABEL, href = BackfillState.PAUSED.name, @@ -301,12 +303,23 @@ class BackfillShowAction @Inject constructor( } } + private fun getCancelButton(state: BackfillState): Link? { + return when (state) { + BackfillState.PAUSED -> Link( + label = CANCEL_STATE_BUTTON_LABEL, + href = BackfillState.CANCELLED.name, + ) + else -> null + } + } + private fun GetBackfillStatusResponse.toConfigurationRows(id: Long) = listOf( DescriptionListRow( label = "State", description = state.name, button = getStateButton(state), updateFieldId = "state", + cancelButton = getCancelButton(state), ), DescriptionListRow( label = "Dry Run", @@ -512,6 +525,36 @@ class BackfillShowAction @Inject constructor( +button.label } } + + // Add cancel button if present + it.cancelButton?.let { cancelButton -> + span("ml-2") { + form { + action = BackfillShowButtonHandlerAction.path(id) + + it.updateFieldId?.let { + input { + type = InputType.hidden + name = "field_id" + value = it + } + + input { + type = InputType.hidden + name = "field_value" + value = cancelButton.href + } + } + + button( + classes = "rounded-full bg-red-600 px-3 py-1.5 text-sm font-semibold text-white shadow-sm hover:bg-red-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-red-600", + ) { + type = ButtonType.submit + +cancelButton.label + } + } + } + } } } } @@ -563,6 +606,7 @@ class BackfillShowAction @Inject constructor( const val START_STATE_BUTTON_LABEL = "Start" const val PAUSE_STATE_BUTTON_LABEL = "Pause" + const val CANCEL_STATE_BUTTON_LABEL = "Cancel" const val UPDATE_BUTTON_LABEL = "Update" const val VIEW_LOGS_BUTTON_LABEL = "View Logs" } diff --git a/service/src/main/resources/migrations/v017__backfila.sql b/service/src/main/resources/migrations/v017__backfila.sql new file mode 100644 index 000000000..bc0e3be3e --- /dev/null +++ b/service/src/main/resources/migrations/v017__backfila.sql @@ -0,0 +1,2 @@ +ALTER TABLE backfill_runs + MODIFY COLUMN `state` enum('PAUSED','RUNNING','COMPLETE','CANCELLED') NOT NULL; \ No newline at end of file diff --git a/service/src/main/resources/migrations/v018__backfila.sql b/service/src/main/resources/migrations/v018__backfila.sql new file mode 100644 index 000000000..608482323 --- /dev/null +++ b/service/src/main/resources/migrations/v018__backfila.sql @@ -0,0 +1,2 @@ +ALTER TABLE run_partitions + MODIFY COLUMN `run_state` enum('PAUSED','RUNNING','COMPLETE','CANCELLED') NOT NULL; \ No newline at end of file From 583babf59f3354d3c3753353d1fab3934ff5437a Mon Sep 17 00:00:00 2001 From: notmikedavis <105745614+notmikedavis@users.noreply.github.com> Date: Mon, 5 May 2025 17:38:11 -0500 Subject: [PATCH 5/9] Improve BatchWriteItem retry handling (#449) ## Problem The DynamoDB BatchWriteItem API often has transient failures with unprocessed items that cause entire Backfila batches to fail. These could be more granularly retried within a run to avoid Backfila getting stuck on a batch. ## Solution Improve the retry mechanism to better align with [AWS BatchWriteItem best practices](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html): - Collect unprocessed items across all batches and retry them together - Use exponential backoff with jitter to handle throttling - Only count towards retry limit when no progress is made - Provide more detailed error reporting --- .../UpdateInPlaceDynamoDbBackfill.kt | 116 ++++++++++++++---- 1 file changed, 89 insertions(+), 27 deletions(-) diff --git a/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt b/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt index bd353ed29..505827022 100644 --- a/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt +++ b/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt @@ -3,60 +3,122 @@ package app.cash.backfila.client.dynamodbv2 import app.cash.backfila.client.BackfillConfig import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse import software.amazon.awssdk.services.dynamodb.model.PutRequest import software.amazon.awssdk.services.dynamodb.model.WriteRequest +import kotlin.math.min +import kotlin.math.pow /** * A base class that may make it easier to mutate the items in the DynamoDB store. * - * If saving fails the batch will fail. If succeeded rows are part of a failed batch they will be - * retried so implementations must be idempotent. + * Implements retry logic with exponential backoff for unprocessed items from BatchWriteItem operations. + * Failed items from all batches are collected and retried together to optimize throughput. + * The retry counter only starts when we stop making progress (no items processed in a round). + * Implementations must be idempotent as items may be retried multiple times. */ abstract class UpdateInPlaceDynamoDbBackfill( val dynamoDbClient: DynamoDbClient, - ) : DynamoDbBackfill() { companion object { // DynamoDB BatchWriteItem API has a limit of 25 items per request private const val BATCH_SIZE_LIMIT = 25 + + // Retry configuration + private const val MAX_RETRY_ATTEMPTS = 10 + private const val BASE_BACKOFF_MS = 50L + private const val MAX_BACKOFF_MS = 5000L } override fun runBatch(items: List<@JvmSuppressWildcards I>, config: BackfillConfig

) { - val itemsToSave = mutableListOf() - for (item in items) { - val saveItem = runOne(item, config) - if (saveItem) { - itemsToSave += item - } - } + val itemsToSave = items.filter { runOne(it, config) } if (itemsToSave.isNotEmpty()) { - // Process items in chunks of 25 or less - itemsToSave.chunked(BATCH_SIZE_LIMIT).forEach { chunk -> - val batchRequest = BatchWriteItemRequest.builder() - .requestItems( - mapOf( - dynamoDbTable.tableName() to chunk.map { - WriteRequest.builder().putRequest( - PutRequest.builder().item( - dynamoDbTable.tableSchema().itemToMap(it, true), - ).build(), - ).build() - }, - ), - ).build() + var unprocessedItems = itemsToSave + var stuckRetryCount = 0 + var totalAttempts = 0 + + while (unprocessedItems.isNotEmpty()) { + if (totalAttempts > 0) { + // Calculate backoff time with exponential increase and jitter + val backoffAttempts = stuckRetryCount.coerceAtLeast(1) // Use at least 1 for backoff calc + val baseWait = min( + MAX_BACKOFF_MS.toDouble(), + BASE_BACKOFF_MS * 2.0.pow(backoffAttempts.toDouble()) + ).toLong() + val jitter = (Math.random() * 0.1 * baseWait).toLong() + Thread.sleep(baseWait + jitter) + } + + // Process all items in BATCH_SIZE_LIMIT chunks, collect all unprocessed + val stillUnprocessed = mutableListOf() + + unprocessedItems.chunked(BATCH_SIZE_LIMIT).forEach { chunk -> + val writeRequests = createWriteRequests(chunk) + val batchRequest = BatchWriteItemRequest.builder() + .requestItems(mapOf(dynamoDbTable.tableName() to writeRequests)) + .build() + + val response = dynamoDbClient.batchWriteItem(batchRequest) + stillUnprocessed.addAll(getUnprocessedItems(response)) + } + + totalAttempts++ - val failedBatch = dynamoDbClient.batchWriteItem(batchRequest) - require(!failedBatch.hasUnprocessedItems() || !failedBatch.unprocessedItems().isNotEmpty()) { - "failed to save items: $failedBatch" + // Check if we made any progress + if (stillUnprocessed.size == unprocessedItems.size) { + // No progress made, increment stuck counter + stuckRetryCount++ + if (stuckRetryCount >= MAX_RETRY_ATTEMPTS) { + throw DynamoDbBatchWriteException( + """Failed to make progress after $MAX_RETRY_ATTEMPTS attempts. + |Total attempts: $totalAttempts + |Initial batch size: ${itemsToSave.size} + |Remaining unprocessed items: ${stillUnprocessed.size}""".trimMargin() + ) + } + } else { + // Made some progress, reset stuck counter + stuckRetryCount = 0 } + + unprocessedItems = stillUnprocessed } } } + private fun createWriteRequests(items: List): List { + return items.map { item -> + WriteRequest.builder() + .putRequest( + PutRequest.builder() + .item(dynamoDbTable.tableSchema().itemToMap(item, true)) + .build() + ) + .build() + } + } + + private fun getUnprocessedItems(response: BatchWriteItemResponse): List { + if (!response.hasUnprocessedItems() || response.unprocessedItems().isEmpty()) { + return emptyList() + } + + return response.unprocessedItems()[dynamoDbTable.tableName()] + ?.mapNotNull { writeRequest -> + // Convert WriteRequest back to original item type using the table schema + writeRequest.putRequest()?.item()?.let { + dynamoDbTable.tableSchema().mapToItem(it) + } + } + ?: emptyList() + } + /** * Called for each matching record. Returns true to save the item after returning; false to not * save the item. */ abstract fun runOne(item: I, config: BackfillConfig

): Boolean } + +class DynamoDbBatchWriteException(message: String) : RuntimeException(message) From d8f791d15d717c19fe224d2d6477ddb56c123ff1 Mon Sep 17 00:00:00 2001 From: notmikedavis <105745614+notmikedavis@users.noreply.github.com> Date: Wed, 7 May 2025 07:49:32 -0500 Subject: [PATCH 6/9] Add ApiCallTimeoutException handling to UpdateInPlaceDynamoDbBackfill (#452) ## Problem The DynamoDB BatchWriteItem implementation in UpdateInPlaceDynamoDbBackfill currently lacks handling for ApiCallTimeoutException. When these timeouts occur, the entire batch fails without any retry attempts, causing backfills to fail unnecessarily. ## Solution Add comprehensive timeout handling with these features: - Track and retry chunks that experience API timeouts - Use exponential backoff with jitter for retries - Only increment the timeout counter when all chunks in an iteration timeout - Reset the timeout counter if any chunk succeeds - Maintain separate counters for timeouts vs unprocessed items - Provide detailed error context through suppressed exceptions The implementation is designed to be resilient to transient timeouts while still protecting against systemic failures. It coordinates the backoff strategy between timeout retries and unprocessed item retries. --- .../UpdateInPlaceDynamoDbBackfill.kt | 79 +++++++++++++++---- 1 file changed, 64 insertions(+), 15 deletions(-) diff --git a/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt b/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt index 505827022..808a65e4c 100644 --- a/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt +++ b/client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt @@ -1,6 +1,7 @@ package app.cash.backfila.client.dynamodbv2 import app.cash.backfila.client.BackfillConfig +import software.amazon.awssdk.core.exception.ApiCallTimeoutException import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse @@ -12,9 +13,14 @@ import kotlin.math.pow /** * A base class that may make it easier to mutate the items in the DynamoDB store. * - * Implements retry logic with exponential backoff for unprocessed items from BatchWriteItem operations. - * Failed items from all batches are collected and retried together to optimize throughput. - * The retry counter only starts when we stop making progress (no items processed in a round). + * Implements retry logic with exponential backoff for: + * - Unprocessed items from BatchWriteItem operations + * - API timeouts + * + * The retry counter for unprocessed items only starts when we stop making progress + * (no items processed in a round). The timeout counter only increments when all chunks + * in an iteration timeout. + * * Implementations must be idempotent as items may be retried multiple times. */ abstract class UpdateInPlaceDynamoDbBackfill( @@ -36,12 +42,14 @@ abstract class UpdateInPlaceDynamoDbBackfill( if (itemsToSave.isNotEmpty()) { var unprocessedItems = itemsToSave var stuckRetryCount = 0 + var consecutiveTimeouts = 0 var totalAttempts = 0 while (unprocessedItems.isNotEmpty()) { if (totalAttempts > 0) { // Calculate backoff time with exponential increase and jitter - val backoffAttempts = stuckRetryCount.coerceAtLeast(1) // Use at least 1 for backoff calc + // Use the larger of stuckRetryCount or consecutiveTimeouts to determine backoff + val backoffAttempts = maxOf(stuckRetryCount, consecutiveTimeouts).coerceAtLeast(1) val baseWait = min( MAX_BACKOFF_MS.toDouble(), BASE_BACKOFF_MS * 2.0.pow(backoffAttempts.toDouble()) @@ -52,21 +60,62 @@ abstract class UpdateInPlaceDynamoDbBackfill( // Process all items in BATCH_SIZE_LIMIT chunks, collect all unprocessed val stillUnprocessed = mutableListOf() + var batchSucceeded = false + var lastTimeoutException: ApiCallTimeoutException? = null + var hadTimeoutThisIteration = false + var allChunksTimedOut = true unprocessedItems.chunked(BATCH_SIZE_LIMIT).forEach { chunk -> - val writeRequests = createWriteRequests(chunk) - val batchRequest = BatchWriteItemRequest.builder() - .requestItems(mapOf(dynamoDbTable.tableName() to writeRequests)) - .build() + try { + val writeRequests = createWriteRequests(chunk) + val batchRequest = BatchWriteItemRequest.builder() + .requestItems(mapOf(dynamoDbTable.tableName() to writeRequests)) + .build() + + val response = dynamoDbClient.batchWriteItem(batchRequest) + stillUnprocessed.addAll(getUnprocessedItems(response)) + batchSucceeded = true + allChunksTimedOut = false + } catch (e: ApiCallTimeoutException) { + // On timeout, only add the current chunk to unprocessed items + stillUnprocessed.addAll(chunk) + hadTimeoutThisIteration = true + lastTimeoutException = e + } + } - val response = dynamoDbClient.batchWriteItem(batchRequest) - stillUnprocessed.addAll(getUnprocessedItems(response)) + // Handle timeout tracking + if (hadTimeoutThisIteration && allChunksTimedOut) { + consecutiveTimeouts++ + if (consecutiveTimeouts >= MAX_RETRY_ATTEMPTS) { + throw DynamoDbBatchWriteException( + """Failed due to consecutive complete timeouts after $MAX_RETRY_ATTEMPTS attempts. + |Total attempts: $totalAttempts + |Initial batch size: ${itemsToSave.size} + |Remaining unprocessed items: ${unprocessedItems.size}""".trimMargin(), + lastTimeoutException + ) + } + } else if (batchSucceeded) { + // Reset timeout counter if any chunk succeeded + consecutiveTimeouts = 0 + } + + // If we saw any timeouts but didn't hit the consecutive limit, include the last exception + // as suppressed to maintain error context + if (lastTimeoutException != null && !allChunksTimedOut) { + lastTimeoutException!!.addSuppressed( + IllegalStateException( + """Saw partial timeouts while processing batches. + |Successfully processed some chunks, continuing with retries.""".trimMargin() + ) + ) } totalAttempts++ - // Check if we made any progress - if (stillUnprocessed.size == unprocessedItems.size) { + // Check if we made any progress with unprocessed items + if (batchSucceeded && stillUnprocessed.size == unprocessedItems.size) { // No progress made, increment stuck counter stuckRetryCount++ if (stuckRetryCount >= MAX_RETRY_ATTEMPTS) { @@ -77,8 +126,8 @@ abstract class UpdateInPlaceDynamoDbBackfill( |Remaining unprocessed items: ${stillUnprocessed.size}""".trimMargin() ) } - } else { - // Made some progress, reset stuck counter + } else if (batchSucceeded) { + // Made some progress or had different number of unprocessed items stuckRetryCount = 0 } @@ -121,4 +170,4 @@ abstract class UpdateInPlaceDynamoDbBackfill( abstract fun runOne(item: I, config: BackfillConfig

): Boolean } -class DynamoDbBatchWriteException(message: String) : RuntimeException(message) +class DynamoDbBatchWriteException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) From 16837ed61268c4508b641a311e3167bf015fcd3b Mon Sep 17 00:00:00 2001 From: Jeff Hwang Date: Wed, 7 May 2025 12:15:11 -0400 Subject: [PATCH 7/9] redirect users to backfill page after success --- .../EditPartitionCursorHandlerAction.kt | 44 +++---------------- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt index eb8419e9c..42e86e163 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt @@ -7,16 +7,12 @@ import app.cash.backfila.ui.components.DashboardPageLayout import app.cash.backfila.ui.pages.BackfillShowAction import javax.inject.Inject import javax.inject.Singleton -import kotlinx.html.div -import kotlinx.html.h1 -import kotlinx.html.p import misk.exceptions.BadRequestException import misk.hibernate.Query import misk.hibernate.Transacter import misk.hibernate.newQuery import misk.scope.ActionScoped import misk.security.authz.Authenticated -import misk.tailwind.Link import misk.web.Get import misk.web.HttpCall import misk.web.PathParam @@ -25,6 +21,8 @@ import misk.web.ResponseBody import misk.web.ResponseContentType import misk.web.actions.WebAction import misk.web.mediatype.MediaTypes +import misk.web.toResponseBody +import okhttp3.Headers import okio.ByteString.Companion.encodeUtf8 @Singleton @@ -44,7 +42,7 @@ class EditPartitionCursorHandlerAction @Inject constructor( @PathParam partitionName: String, ): Response { val request = httpCall.get().asOkHttpRequest() - val cursorSnapshot = request.url.queryParameter("cursor_snapshot") + val cursorSnapshot = request.url.queryParameter("cursor_snapshot")?.takeIf { it.isNotBlank() } val newCursor = request.url.queryParameter("new_cursor") // Validate UTF-8 @@ -78,39 +76,11 @@ class EditPartitionCursorHandlerAction @Inject constructor( } ?: throw BadRequestException("Partition not found") } - // Return success page with updated form + // Redirect to backfill page return Response( - dashboardPageLayout.newBuilder() - .title("Edit Cursor - Partition $partitionName") - .breadcrumbLinks( - Link("Backfill #$id", BackfillShowAction.path(id)), - Link("Edit Cursor", path(id, partitionName)), - ) - .buildHtmlResponseBody { - div("space-y-6 max-w-2xl mx-auto py-8") { - // Success message - div("rounded-md bg-green-50 p-4 mb-6") { - div("flex") { - div("flex-shrink-0") { - // Success icon (checkmark) - div("h-5 w-5 text-green-400") { - +"✓" - } - } - div("ml-3") { - h1("text-sm font-medium text-green-800") { - +"Success" - } - div("mt-2 text-sm text-green-700") { - p { - +"Cursor has been updated successfully." - } - } - } - } - } - } - }, + body = "go to ${BackfillShowAction.path(id)}".toResponseBody(), + statusCode = 303, + headers = Headers.headersOf("Location", BackfillShowAction.path(id)), ) } From 67b774c93436c6a002d7c74caf1140612a63594c Mon Sep 17 00:00:00 2001 From: Jeff Hwang Date: Wed, 7 May 2025 23:33:47 -0400 Subject: [PATCH 8/9] add test --- .../EditPartitionCursorHandlerAction.kt | 2 - .../actions/EditPartitionEndPointTest.kt | 162 ++++++++++++++++++ 2 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt index 42e86e163..9358444e6 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt @@ -3,7 +3,6 @@ package app.cash.backfila.dashboard import app.cash.backfila.service.persistence.BackfilaDb import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.RunPartitionQuery -import app.cash.backfila.ui.components.DashboardPageLayout import app.cash.backfila.ui.pages.BackfillShowAction import javax.inject.Inject import javax.inject.Singleton @@ -31,7 +30,6 @@ class EditPartitionCursorHandlerAction @Inject constructor( @BackfilaDb private val transacter: Transacter, private val queryFactory: Query.Factory, private val httpCall: ActionScoped, - private val dashboardPageLayout: DashboardPageLayout, ) : WebAction { @Get(PATH) diff --git a/service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt b/service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt new file mode 100644 index 000000000..059b95a21 --- /dev/null +++ b/service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt @@ -0,0 +1,162 @@ +package app.cash.backfila.actions + +import app.cash.backfila.BackfilaTestingModule +import app.cash.backfila.api.ConfigureServiceAction +import app.cash.backfila.client.Connectors +import app.cash.backfila.dashboard.CreateBackfillAction +import app.cash.backfila.dashboard.GetBackfillStatusAction +import app.cash.backfila.dashboard.StartBackfillAction +import app.cash.backfila.dashboard.StartBackfillRequest +import app.cash.backfila.fakeCaller +import app.cash.backfila.protos.service.ConfigureServiceRequest +import app.cash.backfila.protos.service.CreateBackfillRequest +import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.RunPartitionQuery +import com.google.inject.Module +import javax.inject.Inject +import misk.exceptions.BadRequestException +import misk.hibernate.Query +import misk.hibernate.Transacter +import misk.hibernate.newQuery +import misk.scope.ActionScope +import misk.testing.MiskTest +import misk.testing.MiskTestModule +import okio.ByteString.Companion.encodeUtf8 +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Test + +@MiskTest(startService = true) +class EditPartitionEndPointTest { + @Suppress("unused") + @MiskTestModule + val module: Module = BackfilaTestingModule() + + @Inject lateinit var configureServiceAction: ConfigureServiceAction + + @Inject lateinit var createBackfillAction: CreateBackfillAction + + @Inject lateinit var getBackfillStatusAction: GetBackfillStatusAction + + @Inject lateinit var startBackfillAction: StartBackfillAction + + @Inject @BackfilaDb + lateinit var transacter: Transacter + + @Inject lateinit var queryFactory: Query.Factory + + @Inject lateinit var scope: ActionScope + + /** + * Test-specific action that provides the same functionality as EditPartitionCursorHandlerAction + * but without the HTTP dependencies + */ + inner class TestEditPartitionAction { + fun editPartition(id: Long, partitionName: String, newCursor: String) { + val backfill = getBackfillStatusAction.status(id) + if (backfill.state != BackfillState.PAUSED) { + throw BadRequestException("Backfill must be paused to edit cursors") + } + + val partition = backfill.partitions.find { it.name == partitionName } + ?: throw BadRequestException("Partition not found") + + transacter.transaction { session -> + queryFactory.newQuery() + .partitionId(partition.id) + .uniqueResult(session) + ?.let { partitionRecord -> + partitionRecord.pkey_cursor = newCursor.encodeUtf8() + session.save(partitionRecord) + } ?: throw BadRequestException("Partition not found") + } + } + } + + @Test + fun `edit partition end point when paused`() { + // Setup test backfill + val backfillId = setupTestBackfill() + + // Get initial status to capture cursor snapshot + val initialStatus = getBackfillStatusAction.status(backfillId) + val partition = initialStatus.partitions[0] + requireNotNull(partition) { "Partition not found" } + + // Edit cursor + scope.fakeCaller(user = "molly") { + val testAction = TestEditPartitionAction() + val response = testAction.editPartition( + backfillId, + partition.name, + "100", + ) + + // Verify cursor was updated + val updatedStatus = getBackfillStatusAction.status(backfillId) + val updatedPartition = updatedStatus.partitions.find { it.name == partition.name } + requireNotNull(updatedPartition) { "Updated partition not found" } + assertThat(updatedPartition.pkey_cursor ?: "").isEqualTo("100") + } + } + + @Test + fun `cannot edit partition end point when running`() { + // Setup test backfill + val backfillId = setupTestBackfill() + + // Start the backfill + scope.fakeCaller(user = "molly") { + startBackfillAction.start(backfillId, StartBackfillRequest()) + } + + // Get status for cursor snapshot + val status = getBackfillStatusAction.status(backfillId) + val partition = status.partitions[0] + requireNotNull(partition) { "Partition not found" } + + // Attempt to edit cursor while running + scope.fakeCaller(user = "molly") { + val testAction = TestEditPartitionAction() + assertThatThrownBy { + testAction.editPartition( + backfillId, + partition.name, + "100", + ) + }.isInstanceOf(BadRequestException::class.java) + .hasMessageContaining("Backfill must be paused to edit cursors") + } + } + + private fun setupTestBackfill(): Long { + scope.fakeCaller(service = "deep-fryer") { + configureServiceAction.configureService( + ConfigureServiceRequest.Builder() + .backfills( + listOf( + ConfigureServiceRequest.BackfillData( + "ChickenSandwich", "Description", listOf(), null, + null, false, null, + ), + ), + ) + .connector_type(Connectors.ENVOY) + .build(), + ) + } + + val response = scope.fakeCaller(user = "molly") { + createBackfillAction.create( + "deep-fryer", + ConfigureServiceAction.RESERVED_VARIANT, + CreateBackfillRequest.Builder() + .backfill_name("ChickenSandwich") + .build(), + ) + } + + return response.backfill_run_id + } +} From 0a8d0d6e4ef112b30e61dbc4597f7a737445d034 Mon Sep 17 00:00:00 2001 From: Jeff Hwang Date: Thu, 8 May 2025 12:17:58 -0400 Subject: [PATCH 9/9] render the error to users --- .../EditPartitionCursorHandlerAction.kt | 49 ++++-- .../actions/EditPartitionEndPointTest.kt | 162 ------------------ 2 files changed, 37 insertions(+), 174 deletions(-) delete mode 100644 service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt index 9358444e6..42a1ea473 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/EditPartitionCursorHandlerAction.kt @@ -3,9 +3,12 @@ package app.cash.backfila.dashboard import app.cash.backfila.service.persistence.BackfilaDb import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.RunPartitionQuery +import app.cash.backfila.ui.components.AlertError +import app.cash.backfila.ui.components.DashboardPageLayout import app.cash.backfila.ui.pages.BackfillShowAction import javax.inject.Inject import javax.inject.Singleton +import kotlinx.html.div import misk.exceptions.BadRequestException import misk.hibernate.Query import misk.hibernate.Transacter @@ -30,6 +33,7 @@ class EditPartitionCursorHandlerAction @Inject constructor( @BackfilaDb private val transacter: Transacter, private val queryFactory: Query.Factory, private val httpCall: ActionScoped, + private val dashboardPageLayout: DashboardPageLayout, ) : WebAction { @Get(PATH) @@ -43,38 +47,58 @@ class EditPartitionCursorHandlerAction @Inject constructor( val cursorSnapshot = request.url.queryParameter("cursor_snapshot")?.takeIf { it.isNotBlank() } val newCursor = request.url.queryParameter("new_cursor") - // Validate UTF-8 - try { - newCursor?.toByteArray(Charsets.UTF_8)?.toString(Charsets.UTF_8) - } catch (e: Exception) { - throw BadRequestException("New cursor must be valid UTF-8") + if (!isValidUtf8(newCursor)) { + return buildErrorResponse("New cursor must be valid UTF8. New Cursor: $newCursor") } - // Verify backfill state and cursor hasn't changed val backfill = getBackfillStatusAction.status(id) if (backfill.state != BackfillState.PAUSED) { - throw BadRequestException("Backfill must be paused to edit cursors") + return buildErrorResponse("Backfill must be paused. Current State: ${backfill.state}") } val partition = backfill.partitions.find { it.name == partitionName } - ?: throw BadRequestException("Partition not found") + ?: return buildErrorResponse("Partition not found: $partitionName") if (partition.pkey_cursor != cursorSnapshot) { - throw BadRequestException("Cursor has changed since edit form was loaded") + return buildErrorResponse("Cursor has changed since edit form was loaded. Current Cursor: ${partition.pkey_cursor}") } - // Update the cursor + updateCursor(partition.id, newCursor) + + return redirectToBackfillPage(id) + } + + private fun isValidUtf8(input: String?): Boolean { + return input == null || input.toByteArray(Charsets.UTF_8).contentEquals(input.toByteArray(Charsets.UTF_8)) + } + + private fun buildErrorResponse(message: String): Response { + val errorHtmlResponseBody = dashboardPageLayout.newBuilder() + .buildHtmlResponseBody { + div("py-20") { + AlertError(message = "Edit partition failed. $message", label = "Try Again", onClick = "history.back(); return false;") + } + } + return Response( + body = errorHtmlResponseBody, + statusCode = 200, + headers = Headers.headersOf("Content-Type", MediaTypes.TEXT_HTML), + ) + } + + private fun updateCursor(partitionId: Long, newCursor: String?) { transacter.transaction { session -> queryFactory.newQuery() - .partitionId(partition.id) + .partitionId(partitionId) .uniqueResult(session) ?.let { partitionRecord -> partitionRecord.pkey_cursor = newCursor?.encodeUtf8() session.save(partitionRecord) } ?: throw BadRequestException("Partition not found") } + } - // Redirect to backfill page + private fun redirectToBackfillPage(id: Long): Response { return Response( body = "go to ${BackfillShowAction.path(id)}".toResponseBody(), statusCode = 303, @@ -84,6 +108,7 @@ class EditPartitionCursorHandlerAction @Inject constructor( companion object { private const val PATH = "/backfills/{id}/{partitionName}/edit-cursor" + fun path(id: Long, partitionName: String) = PATH .replace("{id}", id.toString()) .replace("{partitionName}", partitionName) diff --git a/service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt b/service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt deleted file mode 100644 index 059b95a21..000000000 --- a/service/src/test/kotlin/app/cash/backfila/actions/EditPartitionEndPointTest.kt +++ /dev/null @@ -1,162 +0,0 @@ -package app.cash.backfila.actions - -import app.cash.backfila.BackfilaTestingModule -import app.cash.backfila.api.ConfigureServiceAction -import app.cash.backfila.client.Connectors -import app.cash.backfila.dashboard.CreateBackfillAction -import app.cash.backfila.dashboard.GetBackfillStatusAction -import app.cash.backfila.dashboard.StartBackfillAction -import app.cash.backfila.dashboard.StartBackfillRequest -import app.cash.backfila.fakeCaller -import app.cash.backfila.protos.service.ConfigureServiceRequest -import app.cash.backfila.protos.service.CreateBackfillRequest -import app.cash.backfila.service.persistence.BackfilaDb -import app.cash.backfila.service.persistence.BackfillState -import app.cash.backfila.service.persistence.RunPartitionQuery -import com.google.inject.Module -import javax.inject.Inject -import misk.exceptions.BadRequestException -import misk.hibernate.Query -import misk.hibernate.Transacter -import misk.hibernate.newQuery -import misk.scope.ActionScope -import misk.testing.MiskTest -import misk.testing.MiskTestModule -import okio.ByteString.Companion.encodeUtf8 -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatThrownBy -import org.junit.jupiter.api.Test - -@MiskTest(startService = true) -class EditPartitionEndPointTest { - @Suppress("unused") - @MiskTestModule - val module: Module = BackfilaTestingModule() - - @Inject lateinit var configureServiceAction: ConfigureServiceAction - - @Inject lateinit var createBackfillAction: CreateBackfillAction - - @Inject lateinit var getBackfillStatusAction: GetBackfillStatusAction - - @Inject lateinit var startBackfillAction: StartBackfillAction - - @Inject @BackfilaDb - lateinit var transacter: Transacter - - @Inject lateinit var queryFactory: Query.Factory - - @Inject lateinit var scope: ActionScope - - /** - * Test-specific action that provides the same functionality as EditPartitionCursorHandlerAction - * but without the HTTP dependencies - */ - inner class TestEditPartitionAction { - fun editPartition(id: Long, partitionName: String, newCursor: String) { - val backfill = getBackfillStatusAction.status(id) - if (backfill.state != BackfillState.PAUSED) { - throw BadRequestException("Backfill must be paused to edit cursors") - } - - val partition = backfill.partitions.find { it.name == partitionName } - ?: throw BadRequestException("Partition not found") - - transacter.transaction { session -> - queryFactory.newQuery() - .partitionId(partition.id) - .uniqueResult(session) - ?.let { partitionRecord -> - partitionRecord.pkey_cursor = newCursor.encodeUtf8() - session.save(partitionRecord) - } ?: throw BadRequestException("Partition not found") - } - } - } - - @Test - fun `edit partition end point when paused`() { - // Setup test backfill - val backfillId = setupTestBackfill() - - // Get initial status to capture cursor snapshot - val initialStatus = getBackfillStatusAction.status(backfillId) - val partition = initialStatus.partitions[0] - requireNotNull(partition) { "Partition not found" } - - // Edit cursor - scope.fakeCaller(user = "molly") { - val testAction = TestEditPartitionAction() - val response = testAction.editPartition( - backfillId, - partition.name, - "100", - ) - - // Verify cursor was updated - val updatedStatus = getBackfillStatusAction.status(backfillId) - val updatedPartition = updatedStatus.partitions.find { it.name == partition.name } - requireNotNull(updatedPartition) { "Updated partition not found" } - assertThat(updatedPartition.pkey_cursor ?: "").isEqualTo("100") - } - } - - @Test - fun `cannot edit partition end point when running`() { - // Setup test backfill - val backfillId = setupTestBackfill() - - // Start the backfill - scope.fakeCaller(user = "molly") { - startBackfillAction.start(backfillId, StartBackfillRequest()) - } - - // Get status for cursor snapshot - val status = getBackfillStatusAction.status(backfillId) - val partition = status.partitions[0] - requireNotNull(partition) { "Partition not found" } - - // Attempt to edit cursor while running - scope.fakeCaller(user = "molly") { - val testAction = TestEditPartitionAction() - assertThatThrownBy { - testAction.editPartition( - backfillId, - partition.name, - "100", - ) - }.isInstanceOf(BadRequestException::class.java) - .hasMessageContaining("Backfill must be paused to edit cursors") - } - } - - private fun setupTestBackfill(): Long { - scope.fakeCaller(service = "deep-fryer") { - configureServiceAction.configureService( - ConfigureServiceRequest.Builder() - .backfills( - listOf( - ConfigureServiceRequest.BackfillData( - "ChickenSandwich", "Description", listOf(), null, - null, false, null, - ), - ), - ) - .connector_type(Connectors.ENVOY) - .build(), - ) - } - - val response = scope.fakeCaller(user = "molly") { - createBackfillAction.create( - "deep-fryer", - ConfigureServiceAction.RESERVED_VARIANT, - CreateBackfillRequest.Builder() - .backfill_name("ChickenSandwich") - .build(), - ) - } - - return response.backfill_run_id - } -}