Skip to content

Commit e753beb

Browse files
committed
add units
1 parent a83e815 commit e753beb

File tree

32 files changed

+155
-478
lines changed

32 files changed

+155
-478
lines changed

client-base/src/main/kotlin/app/cash/backfila/client/internal/BackfilaStartupConfigurator.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class BackfilaStartupConfigurator @Inject constructor(
7575
.description(registration.description)
7676
.parameters(parameters)
7777
.delete_by(registration.deleteBy?.toEpochMilli())
78+
.unit(registration.unit)
7879
.build()
7980
},
8081
)

client-base/src/main/kotlin/app/cash/backfila/client/spi/BackfillRegistration.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ data class BackfillRegistration(
88
val description: String?,
99
val parametersClass: KClass<Any>,
1010
val deleteBy: Instant?,
11+
val unit: String?,
1112
)

client-base/src/test/kotlin/app/cash/backfila/client/fixedset/FixedSetBackend.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.fixedset
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.parseDeleteByDate
@@ -59,6 +60,7 @@ class FixedSetBackend @Inject constructor(
5960
description = it.value.findAnnotation<Description>()?.text,
6061
parametersClass = parametersClass(it.value as KClass<FixedSetBackfill<Any>>),
6162
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
63+
unit = BackfillUnit.RECORDS.displayName,
6264
)
6365
}.toSet()
6466
}

client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/UpdateInPlaceDynamoDbBackfill.kt

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,32 @@
11
package app.cash.backfila.client.dynamodbv2
22

33
import app.cash.backfila.client.BackfillConfig
4+
import kotlin.math.min
5+
import kotlin.math.pow
46
import software.amazon.awssdk.core.exception.ApiCallTimeoutException
57
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
68
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest
79
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
810
import software.amazon.awssdk.services.dynamodb.model.PutRequest
911
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
10-
import kotlin.math.min
11-
import kotlin.math.pow
1212

1313
/**
1414
* A base class that may make it easier to mutate the items in the DynamoDB store.
1515
*
1616
* Implements retry logic with exponential backoff for:
1717
* - Unprocessed items from BatchWriteItem operations
1818
* - API timeouts
19-
*
20-
* The retry counter for unprocessed items only starts when we stop making progress
21-
* (no items processed in a round). The timeout counter only increments when all chunks
19+
* * The retry counter for unprocessed items only starts when we stop making progress * (no items processed in a round). The timeout counter only increments when all chunks
2220
* in an iteration timeout.
23-
*
24-
* Implementations must be idempotent as items may be retried multiple times.
21+
* * Implementations must be idempotent as items may be retried multiple times.
2522
*/
2623
abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
2724
val dynamoDbClient: DynamoDbClient,
2825
) : DynamoDbBackfill<I, P>() {
2926
companion object {
3027
// DynamoDB BatchWriteItem API has a limit of 25 items per request
3128
private const val BATCH_SIZE_LIMIT = 25
32-
29+
3330
// Retry configuration
3431
private const val MAX_RETRY_ATTEMPTS = 10
3532
private const val BASE_BACKOFF_MS = 50L
@@ -38,7 +35,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
3835

3936
override fun runBatch(items: List<@JvmSuppressWildcards I>, config: BackfillConfig<P>) {
4037
val itemsToSave = items.filter { runOne(it, config) }
41-
38+
4239
if (itemsToSave.isNotEmpty()) {
4340
var unprocessedItems = itemsToSave
4441
var stuckRetryCount = 0
@@ -52,7 +49,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
5249
val backoffAttempts = maxOf(stuckRetryCount, consecutiveTimeouts).coerceAtLeast(1)
5350
val baseWait = min(
5451
MAX_BACKOFF_MS.toDouble(),
55-
BASE_BACKOFF_MS * 2.0.pow(backoffAttempts.toDouble())
52+
BASE_BACKOFF_MS * 2.0.pow(backoffAttempts.toDouble()),
5653
).toLong()
5754
val jitter = (Math.random() * 0.1 * baseWait).toLong()
5855
Thread.sleep(baseWait + jitter)
@@ -64,7 +61,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
6461
var lastTimeoutException: ApiCallTimeoutException? = null
6562
var hadTimeoutThisIteration = false
6663
var allChunksTimedOut = true
67-
64+
6865
unprocessedItems.chunked(BATCH_SIZE_LIMIT).forEach { chunk ->
6966
try {
7067
val writeRequests = createWriteRequests(chunk)
@@ -92,28 +89,30 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
9289
"""Failed due to consecutive complete timeouts after $MAX_RETRY_ATTEMPTS attempts.
9390
|Total attempts: $totalAttempts
9491
|Initial batch size: ${itemsToSave.size}
95-
|Remaining unprocessed items: ${unprocessedItems.size}""".trimMargin(),
96-
lastTimeoutException
92+
|Remaining unprocessed items: ${unprocessedItems.size}
93+
""".trimMargin(),
94+
lastTimeoutException,
9795
)
9896
}
9997
} else if (batchSucceeded) {
10098
// Reset timeout counter if any chunk succeeded
10199
consecutiveTimeouts = 0
102100
}
103-
101+
104102
// If we saw any timeouts but didn't hit the consecutive limit, include the last exception
105103
// as suppressed to maintain error context
106104
if (lastTimeoutException != null && !allChunksTimedOut) {
107105
lastTimeoutException!!.addSuppressed(
108106
IllegalStateException(
109107
"""Saw partial timeouts while processing batches.
110-
|Successfully processed some chunks, continuing with retries.""".trimMargin()
111-
)
108+
|Successfully processed some chunks, continuing with retries.
109+
""".trimMargin(),
110+
),
112111
)
113112
}
114113

115114
totalAttempts++
116-
115+
117116
// Check if we made any progress with unprocessed items
118117
if (batchSucceeded && stillUnprocessed.size == unprocessedItems.size) {
119118
// No progress made, increment stuck counter
@@ -123,7 +122,8 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
123122
"""Failed to make progress after $MAX_RETRY_ATTEMPTS attempts.
124123
|Total attempts: $totalAttempts
125124
|Initial batch size: ${itemsToSave.size}
126-
|Remaining unprocessed items: ${stillUnprocessed.size}""".trimMargin()
125+
|Remaining unprocessed items: ${stillUnprocessed.size}
126+
""".trimMargin(),
127127
)
128128
}
129129
} else if (batchSucceeded) {
@@ -142,7 +142,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
142142
.putRequest(
143143
PutRequest.builder()
144144
.item(dynamoDbTable.tableSchema().itemToMap(item, true))
145-
.build()
145+
.build(),
146146
)
147147
.build()
148148
}
@@ -156,7 +156,7 @@ abstract class UpdateInPlaceDynamoDbBackfill<I : Any, P : Any>(
156156
return response.unprocessedItems()[dynamoDbTable.tableName()]
157157
?.mapNotNull { writeRequest ->
158158
// Convert WriteRequest back to original item type using the table schema
159-
writeRequest.putRequest()?.item()?.let {
159+
writeRequest.putRequest()?.item()?.let {
160160
dynamoDbTable.tableSchema().mapToItem(it)
161161
}
162162
}

client-dynamodb-v2/src/main/kotlin/app/cash/backfila/client/dynamodbv2/internal/DynamoDbBackend.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.dynamodbv2.internal
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.dynamodbv2.DynamoDbBackfill
@@ -64,6 +65,7 @@ class DynamoDbBackend @Inject constructor(
6465
description = it.value.findAnnotation<Description>()?.text,
6566
parametersClass = parametersClass(it.value as KClass<DynamoDbBackfill<Any, Any>>),
6667
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
68+
unit = BackfillUnit.SEGMENTS.displayName,
6769
)
6870
}.toSet()
6971
}

client-dynamodb/src/main/kotlin/app/cash/backfila/client/dynamodb/internal/DynamoDbBackend.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.dynamodb.internal
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.dynamodb.DynamoDbBackfill
@@ -64,6 +65,7 @@ class DynamoDbBackend @Inject constructor(
6465
description = it.value.findAnnotation<Description>()?.text,
6566
parametersClass = parametersClass(it.value as KClass<DynamoDbBackfill<Any, Any>>),
6667
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
68+
unit = BackfillUnit.SEGMENTS.displayName,
6769
)
6870
}.toSet()
6971
}

client-jooq/src/main/kotlin/app/cash/backfila/client/jooq/internal/JooqBackend.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.jooq.internal
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.jooq.ForJooqBackend
@@ -37,6 +38,7 @@ class JooqBackend @Inject constructor(
3738
description = it.value.findAnnotation<Description>()?.text,
3839
parametersClass = parametersClass(it.value as KClass<JooqBackfill<*, Any>>),
3940
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
41+
unit = BackfillUnit.RECORDS.displayName,
4042
)
4143
}.toSet()
4244
}

client-misk-hibernate/src/main/kotlin/app/cash/backfila/client/misk/hibernate/internal/HibernateBackend.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.misk.hibernate.internal
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.misk.hibernate.ForHibernateBackend
@@ -57,6 +58,7 @@ internal class HibernateBackend @Inject constructor(
5758
description = it.value.findAnnotation<Description>()?.text,
5859
parametersClass = parametersClass(it.value as KClass<HibernateBackfill<*, *, Any>>),
5960
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
61+
unit = BackfillUnit.RECORDS.displayName,
6062
)
6163
}.toSet()
6264
}

client-s3/src/main/kotlin/app/cash/backfila/client/s3/internal/S3DatasourceBackend.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.s3.internal
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.parseDeleteByDate
@@ -56,6 +57,7 @@ class S3DatasourceBackend @Inject constructor(
5657
description = it.value.findAnnotation<Description>()?.text,
5758
parametersClass = parametersClass(it.value as KClass<S3DatasourceBackfill<Any, Any>>),
5859
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
60+
unit = BackfillUnit.BYTES.displayName,
5961
)
6062
}.toSet()
6163
}

client-sqldelight/src/main/kotlin/app/cash/backfila/client/sqldelight/SqlDelightDatasourceBackfillModule.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.cash.backfila.client.sqldelight
22

3+
import app.cash.backfila.client.BackfillUnit
34
import app.cash.backfila.client.DeleteBy
45
import app.cash.backfila.client.Description
56
import app.cash.backfila.client.parseDeleteByDate
@@ -28,6 +29,7 @@ class SqlDelightDatasourceBackfillModule<T : SqlDelightDatasourceBackfill<*, *,
2829
description = backfillClass.findAnnotation<Description>()?.text,
2930
parametersClass = parametersClass(),
3031
deleteBy = backfillClass.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
32+
unit = BackfillUnit.RECORDS.displayName,
3133
)
3234

3335
override fun configure() {

0 commit comments

Comments
 (0)