Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package app.cash.backfila.client.dynamodbv2

import app.cash.backfila.client.BackfillConfig
import kotlin.math.min
import kotlin.math.pow
import software.amazon.awssdk.core.exception.ApiCallTimeoutException
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
import software.amazon.awssdk.services.dynamodb.model.PutRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import kotlin.math.min
import kotlin.math.pow

/**
* A base class that may make it easier to mutate the items in the DynamoDB store.
*
* Implements retry logic with exponential backoff for:
* - Unprocessed items from BatchWriteItem operations
* - API timeouts
*
* The retry counter for unprocessed items only starts when we stop making progress
* (no items processed in a round). The timeout counter only increments when all chunks
* The retry counter for unprocessed items only starts when we stop making progress * (no items processed in a round). The timeout counter only increments when all chunks
* in an iteration timeout.
*
* Implementations must be idempotent as items may be retried multiple times.
*/
abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
Expand All @@ -29,7 +26,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
companion object {
// DynamoDB BatchWriteItem API has a limit of 25 items per request
private const val BATCH_SIZE_LIMIT = 25

// Retry configuration
private const val MAX_RETRY_ATTEMPTS = 10
private const val BASE_BACKOFF_MS = 50L
Expand All @@ -38,7 +35,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(

override fun runBatch(items: List<@JvmSuppressWildcards I>, config: BackfillConfig<P>) {
val itemsToSave = items.filter { runOne(it, config) }

if (itemsToSave.isNotEmpty()) {
var unprocessedItems = itemsToSave
var stuckRetryCount = 0
Expand All @@ -52,7 +49,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
val backoffAttempts = maxOf(stuckRetryCount, consecutiveTimeouts).coerceAtLeast(1)
val baseWait = min(
MAX_BACKOFF_MS.toDouble(),
BASE_BACKOFF_MS * 2.0.pow(backoffAttempts.toDouble())
BASE_BACKOFF_MS * 2.0.pow(backoffAttempts.toDouble()),
).toLong()
val jitter = (Math.random() * 0.1 * baseWait).toLong()
Thread.sleep(baseWait + jitter)
Expand All @@ -64,7 +61,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
var lastTimeoutException: ApiCallTimeoutException? = null
var hadTimeoutThisIteration = false
var allChunksTimedOut = true

unprocessedItems.chunked(BATCH_SIZE_LIMIT).forEach { chunk ->
try {
val writeRequests = createWriteRequests(chunk)
Expand Down Expand Up @@ -92,28 +89,30 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
"""Failed due to consecutive complete timeouts after $MAX_RETRY_ATTEMPTS attempts.
|Total attempts: $totalAttempts
|Initial batch size: ${itemsToSave.size}
|Remaining unprocessed items: ${unprocessedItems.size}""".trimMargin(),
lastTimeoutException
|Remaining unprocessed items: ${unprocessedItems.size}
""".trimMargin(),
lastTimeoutException,
)
}
} else if (batchSucceeded) {
// Reset timeout counter if any chunk succeeded
consecutiveTimeouts = 0
}

// If we saw any timeouts but didn't hit the consecutive limit, include the last exception
// as suppressed to maintain error context
if (lastTimeoutException != null && !allChunksTimedOut) {
lastTimeoutException!!.addSuppressed(
IllegalStateException(
"""Saw partial timeouts while processing batches.
|Successfully processed some chunks, continuing with retries.""".trimMargin()
)
|Successfully processed some chunks, continuing with retries.
""".trimMargin(),
),
)
}

totalAttempts++

// Check if we made any progress with unprocessed items
if (batchSucceeded && stillUnprocessed.size == unprocessedItems.size) {
// No progress made, increment stuck counter
Expand All @@ -123,7 +122,8 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
"""Failed to make progress after $MAX_RETRY_ATTEMPTS attempts.
|Total attempts: $totalAttempts
|Initial batch size: ${itemsToSave.size}
|Remaining unprocessed items: ${stillUnprocessed.size}""".trimMargin()
|Remaining unprocessed items: ${stillUnprocessed.size}
""".trimMargin(),
)
}
} else if (batchSucceeded) {
Expand All @@ -142,7 +142,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
.putRequest(
PutRequest.builder()
.item(dynamoDbTable.tableSchema().itemToMap(item, true))
.build()
.build(),
)
.build()
}
Expand All @@ -156,7 +156,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
return response.unprocessedItems()[dynamoDbTable.tableName()]
?.mapNotNull { writeRequest ->
// Convert WriteRequest back to original item type using the table schema
writeRequest.putRequest()?.item()?.let {
writeRequest.putRequest()?.item()?.let {
dynamoDbTable.tableSchema().mapToItem(it)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ class BackfillShowAction @Inject constructor(
td("hidden py-5 pl-8 pr-0 text-right align-top tabular-nums text-gray-700 sm:table-cell") {
when {
partition.state != BackfillState.RUNNING -> +"-"
!partition.precomputing_done -> +"Computing..."
partition.matching_records_per_minute == null ||
partition.matching_records_per_minute <= 0 -> +"Calculating..."
partition.matching_records_per_minute <= 0 -> {
if (!partition.precomputing_done) { +"Computing..." } else { +"Calculating..." }
}
else -> +"""${partition.matching_records_per_minute} #/m"""
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package app.cash.backfila.ui.components
import app.cash.backfila.BackfilaTestingModule
import com.google.inject.Provider
import jakarta.inject.Inject
import javax.servlet.http.HttpServletRequest
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import misk.Action
import misk.MiskCaller
import misk.api.HttpRequest
Expand All @@ -19,9 +22,6 @@ import misk.web.dashboard.BaseDashboardModule
import misk.web.v2.DashboardPageLayout
import okhttp3.HttpUrl.Companion.toHttpUrl
import org.junit.jupiter.api.Test
import javax.servlet.http.HttpServletRequest
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith

@MiskTest
class DashboardPageLayoutTest {
Expand All @@ -42,8 +42,8 @@ class DashboardPageLayoutTest {
actionScope.enter(
mapOf(
HttpCall::class.toKey() to fakeHttpCall,
MiskCaller::class.toKey() to fakeMiskCaller
)
MiskCaller::class.toKey() to fakeMiskCaller,
),
).use {
// No exception thrown on correct usage
layout.get().newBuilder().build()
Expand Down