Skip to content

Commit 4ef0f4f

Browse files
ask-kamal-nayanKamal Nayanbowenlan-amzn
authored
Removed unnecessary user notifications for version conflict exceptions in Snapshot Management (#1413)
Co-authored-by: Kamal Nayan <askkamal@amazon.com> Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent bd6f4e3 commit 4ef0f4f

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine
77

88
import org.apache.logging.log4j.LogManager
99
import org.apache.logging.log4j.Logger
10+
import org.opensearch.ExceptionsHelper
1011
import org.opensearch.action.bulk.BackoffPolicy
1112
import org.opensearch.common.settings.Settings
1213
import org.opensearch.common.unit.TimeValue
1314
import org.opensearch.commons.ConfigConstants
15+
import org.opensearch.index.engine.VersionConflictEngineException
1416
import org.opensearch.indexmanagement.IndexManagementIndices
1517
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
1618
import org.opensearch.indexmanagement.opensearchapi.retry
@@ -215,6 +217,14 @@ class SMStateMachine(
215217
metadata = md
216218
}
217219
} catch (ex: Exception) {
220+
val unwrappedException = ExceptionsHelper.unwrapCause(ex) as Exception
221+
if (unwrappedException is VersionConflictEngineException) {
222+
// Don't throw the exception
223+
// TODO: Extract seqNo on VersionConflictException and retry updateMetadata with updated seqNo.
224+
log.error("Version conflict exception while updating metadata.", ex)
225+
return
226+
}
227+
218228
val smEx = SnapshotManagementException(ExceptionKey.METADATA_INDEXING_FAILURE, ex)
219229
log.error(smEx.message, ex)
220230
throw smEx

src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,23 @@
55

66
package org.opensearch.indexmanagement.snapshotmanagement.engine
77

8+
import com.nhaarman.mockitokotlin2.any
89
import com.nhaarman.mockitokotlin2.argumentCaptor
10+
import com.nhaarman.mockitokotlin2.doAnswer
911
import com.nhaarman.mockitokotlin2.spy
1012
import com.nhaarman.mockitokotlin2.times
1113
import com.nhaarman.mockitokotlin2.verify
14+
import com.nhaarman.mockitokotlin2.whenever
1215
import kotlinx.coroutines.runBlocking
16+
import org.opensearch.OpenSearchException
1317
import org.opensearch.common.unit.TimeValue
18+
import org.opensearch.core.action.ActionListener
19+
import org.opensearch.core.action.ActionResponse
20+
import org.opensearch.core.index.shard.ShardId
21+
import org.opensearch.index.engine.VersionConflictEngineException
1422
import org.opensearch.indexmanagement.MocksTestCase
23+
import org.opensearch.indexmanagement.opensearchapi.retry
24+
import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementException
1525
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState
1626
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.creationTransitions
1727
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.deletionTransitions
@@ -230,4 +240,66 @@ open class SMStateMachineTests : MocksTestCase() {
230240
assertEquals(1, firstValue.policyPrimaryTerm)
231241
}
232242
}
243+
244+
fun `test updateMetadata handles VersionConflictEngineException gracefully`() = runBlocking {
245+
val initialMetadata = randomSMMetadata(
246+
policySeqNo = 0,
247+
policyPrimaryTerm = 0,
248+
)
249+
val smPolicy = randomSMPolicy(
250+
seqNo = 1,
251+
primaryTerm = 1,
252+
)
253+
val updatedMetadata = randomSMMetadata(
254+
policySeqNo = 1,
255+
policyPrimaryTerm = 1,
256+
)
257+
258+
doAnswer {
259+
val listener = it.getArgument<ActionListener<ActionResponse>>(1)
260+
listener.onFailure(VersionConflictEngineException(ShardId("index", "_na_", 1), "test", "message"))
261+
}.whenever(client).index(any(), any())
262+
263+
val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager))
264+
265+
// Verify VersionConflictEngineException is handled gracefully
266+
try {
267+
stateMachineSpy.updateMetadata(updatedMetadata)
268+
} catch (e: Exception) {
269+
fail("VersionConflictEngineException should be handled without throwing: ${e.message}")
270+
}
271+
}
272+
273+
fun `test updateMetadata throws SnapshotManagementException for other exceptions`() = runBlocking {
274+
val initialMetadata = randomSMMetadata(
275+
policySeqNo = 0,
276+
policyPrimaryTerm = 0,
277+
)
278+
val smPolicy = randomSMPolicy(
279+
seqNo = 1,
280+
primaryTerm = 1,
281+
)
282+
val updatedMetadata = randomSMMetadata(
283+
policySeqNo = 1,
284+
policyPrimaryTerm = 1,
285+
)
286+
287+
val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager))
288+
289+
val openSearchException = OpenSearchException("Test exception")
290+
doAnswer {
291+
val listener = it.getArgument<ActionListener<ActionResponse>>(1)
292+
listener.onFailure(openSearchException)
293+
}.whenever(client).index(any(), any())
294+
295+
// Verify OpenSearchException is wrapped in SnapshotManagementException
296+
val thrownException = assertThrows(SnapshotManagementException::class.java) {
297+
runBlocking {
298+
stateMachineSpy.updateMetadata(updatedMetadata)
299+
}
300+
}
301+
302+
// Verify exception type and cause
303+
assertTrue(thrownException.cause is OpenSearchException)
304+
}
233305
}

0 commit comments

Comments
 (0)