11
11
import org .apache .logging .log4j .LogManager ;
12
12
import org .apache .logging .log4j .Logger ;
13
13
import org .apache .logging .log4j .message .ParameterizedMessage ;
14
- import org .apache .lucene .index .CorruptIndexException ;
15
14
import org .opensearch .ExceptionsHelper ;
16
- import org .opensearch .OpenSearchCorruptionException ;
17
15
import org .opensearch .action .support .ChannelActionListener ;
18
16
import org .opensearch .cluster .ClusterChangedEvent ;
19
17
import org .opensearch .cluster .ClusterStateListener ;
24
22
import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
25
23
import org .opensearch .common .settings .Settings ;
26
24
import org .opensearch .common .util .CancellableThreads ;
27
- import org .opensearch .common .util .concurrent .AbstractRunnable ;
28
25
import org .opensearch .common .util .concurrent .ConcurrentCollections ;
29
26
import org .opensearch .core .action .ActionListener ;
30
27
import org .opensearch .core .index .shard .ShardId ;
33
30
import org .opensearch .index .shard .IndexEventListener ;
34
31
import org .opensearch .index .shard .IndexShard ;
35
32
import org .opensearch .index .shard .IndexShardState ;
36
- import org .opensearch .index .store .Store ;
37
33
import org .opensearch .indices .IndicesService ;
38
34
import org .opensearch .indices .recovery .FileChunkRequest ;
39
35
import org .opensearch .indices .recovery .ForceSyncRequest ;
61
57
import static org .opensearch .indices .replication .SegmentReplicationSourceService .Actions .UPDATE_VISIBLE_CHECKPOINT ;
62
58
63
59
/**
64
- * Service class that orchestrates replication events on replicas.
60
+ * Service class that handles incoming checkpoints to initiate replication events on replicas.
65
61
*
66
62
* @opensearch.internal
67
63
*/
@@ -72,17 +68,14 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent
72
68
private final ThreadPool threadPool ;
73
69
private final RecoverySettings recoverySettings ;
74
70
75
- private final ReplicationCollection <SegmentReplicationTarget > onGoingReplications ;
76
-
77
- private final Map <ShardId , SegmentReplicationState > completedReplications = ConcurrentCollections .newConcurrentMap ();
78
-
79
71
private final SegmentReplicationSourceFactory sourceFactory ;
80
72
81
73
protected final Map <ShardId , ReplicationCheckpoint > latestReceivedCheckpoint = ConcurrentCollections .newConcurrentMap ();
82
74
83
75
private final IndicesService indicesService ;
84
76
private final ClusterService clusterService ;
85
77
private final TransportService transportService ;
78
+ private final SegmentReplicator replicator ;
86
79
87
80
/**
88
81
* The internal actions
@@ -94,6 +87,7 @@ public static class Actions {
94
87
public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync" ;
95
88
}
96
89
90
+ @ Deprecated
97
91
public SegmentReplicationTargetService (
98
92
final ThreadPool threadPool ,
99
93
final RecoverySettings recoverySettings ,
@@ -113,6 +107,7 @@ public SegmentReplicationTargetService(
113
107
);
114
108
}
115
109
110
+ @ Deprecated
116
111
public SegmentReplicationTargetService (
117
112
final ThreadPool threadPool ,
118
113
final RecoverySettings recoverySettings ,
@@ -121,14 +116,34 @@ public SegmentReplicationTargetService(
121
116
final IndicesService indicesService ,
122
117
final ClusterService clusterService ,
123
118
final ReplicationCollection <SegmentReplicationTarget > ongoingSegmentReplications
119
+ ) {
120
+ this (
121
+ threadPool ,
122
+ recoverySettings ,
123
+ transportService ,
124
+ sourceFactory ,
125
+ indicesService ,
126
+ clusterService ,
127
+ new SegmentReplicator (threadPool )
128
+ );
129
+ }
130
+
131
+ public SegmentReplicationTargetService (
132
+ final ThreadPool threadPool ,
133
+ final RecoverySettings recoverySettings ,
134
+ final TransportService transportService ,
135
+ final SegmentReplicationSourceFactory sourceFactory ,
136
+ final IndicesService indicesService ,
137
+ final ClusterService clusterService ,
138
+ final SegmentReplicator replicator
124
139
) {
125
140
this .threadPool = threadPool ;
126
141
this .recoverySettings = recoverySettings ;
127
- this .onGoingReplications = ongoingSegmentReplications ;
128
142
this .sourceFactory = sourceFactory ;
129
143
this .indicesService = indicesService ;
130
144
this .clusterService = clusterService ;
131
145
this .transportService = transportService ;
146
+ this .replicator = replicator ;
132
147
133
148
transportService .registerRequestHandler (
134
149
Actions .FILE_CHUNK ,
@@ -154,7 +169,7 @@ protected void doStart() {
154
169
@ Override
155
170
protected void doStop () {
156
171
if (DiscoveryNode .isDataNode (clusterService .getSettings ())) {
157
- assert onGoingReplications .size () == 0 : "Replication collection should be empty on shutdown" ;
172
+ assert replicator .size () == 0 : "Replication collection should be empty on shutdown" ;
158
173
clusterService .removeListener (this );
159
174
}
160
175
}
@@ -199,7 +214,7 @@ public void clusterChanged(ClusterChangedEvent event) {
199
214
@ Override
200
215
public void beforeIndexShardClosed (ShardId shardId , @ Nullable IndexShard indexShard , Settings indexSettings ) {
201
216
if (indexShard != null && indexShard .indexSettings ().isSegRepEnabledOrRemoteNode ()) {
202
- onGoingReplications . cancelForShard (indexShard .shardId (), "Shard closing" );
217
+ replicator . cancel (indexShard .shardId (), "Shard closing" );
203
218
latestReceivedCheckpoint .remove (shardId );
204
219
}
205
220
}
@@ -224,7 +239,7 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
224
239
&& indexShard .indexSettings ().isSegRepEnabledOrRemoteNode ()
225
240
&& oldRouting .primary () == false
226
241
&& newRouting .primary ()) {
227
- onGoingReplications . cancelForShard (indexShard .shardId (), "Shard has been promoted to primary" );
242
+ replicator . cancel (indexShard .shardId (), "Shard has been promoted to primary" );
228
243
latestReceivedCheckpoint .remove (indexShard .shardId ());
229
244
}
230
245
}
@@ -234,17 +249,15 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
234
249
*/
235
250
@ Nullable
236
251
public SegmentReplicationState getOngoingEventSegmentReplicationState (ShardId shardId ) {
237
- return Optional .ofNullable (onGoingReplications .getOngoingReplicationTarget (shardId ))
238
- .map (SegmentReplicationTarget ::state )
239
- .orElse (null );
252
+ return Optional .ofNullable (replicator .get (shardId )).map (SegmentReplicationTarget ::state ).orElse (null );
240
253
}
241
254
242
255
/**
243
256
* returns SegmentReplicationState of latest completed segment replication events.
244
257
*/
245
258
@ Nullable
246
259
public SegmentReplicationState getlatestCompletedEventSegmentReplicationState (ShardId shardId ) {
247
- return completedReplications . get (shardId );
260
+ return replicator . getCompleted (shardId );
248
261
}
249
262
250
263
/**
@@ -257,11 +270,11 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) {
257
270
}
258
271
259
272
public ReplicationRef <SegmentReplicationTarget > get (long replicationId ) {
260
- return onGoingReplications .get (replicationId );
273
+ return replicator .get (replicationId );
261
274
}
262
275
263
276
public SegmentReplicationTarget get (ShardId shardId ) {
264
- return onGoingReplications . getOngoingReplicationTarget (shardId );
277
+ return replicator . get (shardId );
265
278
}
266
279
267
280
/**
@@ -285,7 +298,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
285
298
// checkpoint to be replayed once the shard is Active.
286
299
if (replicaShard .state ().equals (IndexShardState .STARTED ) == true ) {
287
300
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
288
- SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications . getOngoingReplicationTarget (replicaShard .shardId ());
301
+ SegmentReplicationTarget ongoingReplicationTarget = replicator . get (replicaShard .shardId ());
289
302
if (ongoingReplicationTarget != null ) {
290
303
if (ongoingReplicationTarget .getCheckpoint ().getPrimaryTerm () < receivedCheckpoint .getPrimaryTerm ()) {
291
304
logger .debug (
@@ -504,28 +517,12 @@ public SegmentReplicationTarget startReplication(
504
517
final ReplicationCheckpoint checkpoint ,
505
518
final SegmentReplicationListener listener
506
519
) {
507
- final SegmentReplicationTarget target = new SegmentReplicationTarget (
508
- indexShard ,
509
- checkpoint ,
510
- sourceFactory .get (indexShard ),
511
- listener
512
- );
513
- startReplication (target );
514
- return target ;
520
+ return replicator .startReplication (indexShard , checkpoint , sourceFactory .get (indexShard ), listener );
515
521
}
516
522
517
523
// pkg-private for integration tests
518
524
void startReplication (final SegmentReplicationTarget target ) {
519
- final long replicationId ;
520
- try {
521
- replicationId = onGoingReplications .startSafe (target , recoverySettings .activityTimeout ());
522
- } catch (ReplicationFailedException e ) {
523
- // replication already running for shard.
524
- target .fail (e , false );
525
- return ;
526
- }
527
- logger .trace (() -> new ParameterizedMessage ("Added new replication to collection {}" , target .description ()));
528
- threadPool .generic ().execute (new ReplicationRunner (replicationId ));
525
+ replicator .startReplication (target , recoverySettings .activityTimeout ());
529
526
}
530
527
531
528
/**
@@ -550,89 +547,14 @@ default void onFailure(ReplicationState state, ReplicationFailedException e, boo
550
547
void onReplicationFailure (SegmentReplicationState state , ReplicationFailedException e , boolean sendShardFailure );
551
548
}
552
549
553
- /**
554
- * Runnable implementation to trigger a replication event.
555
- */
556
- private class ReplicationRunner extends AbstractRunnable {
557
-
558
- final long replicationId ;
559
-
560
- public ReplicationRunner (long replicationId ) {
561
- this .replicationId = replicationId ;
562
- }
563
-
564
- @ Override
565
- public void onFailure (Exception e ) {
566
- onGoingReplications .fail (replicationId , new ReplicationFailedException ("Unexpected Error during replication" , e ), false );
567
- }
568
-
569
- @ Override
570
- public void doRun () {
571
- start (replicationId );
572
- }
573
- }
574
-
575
- private void start (final long replicationId ) {
576
- final SegmentReplicationTarget target ;
577
- try (ReplicationRef <SegmentReplicationTarget > replicationRef = onGoingReplications .get (replicationId )) {
578
- // This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the
579
- // threadpool.
580
- if (replicationRef == null ) {
581
- return ;
582
- }
583
- target = replicationRef .get ();
584
- }
585
- target .startReplication (new ActionListener <>() {
586
- @ Override
587
- public void onResponse (Void o ) {
588
- logger .debug (() -> new ParameterizedMessage ("Finished replicating {} marking as done." , target .description ()));
589
- onGoingReplications .markAsDone (replicationId );
590
- if (target .state ().getIndex ().recoveredFileCount () != 0 && target .state ().getIndex ().recoveredBytes () != 0 ) {
591
- completedReplications .put (target .shardId (), target .state ());
592
- }
593
- }
594
-
595
- @ Override
596
- public void onFailure (Exception e ) {
597
- logger .debug ("Replication failed {}" , target .description ());
598
- if (isStoreCorrupt (target ) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException ) {
599
- onGoingReplications .fail (replicationId , new ReplicationFailedException ("Store corruption during replication" , e ), true );
600
- return ;
601
- }
602
- onGoingReplications .fail (replicationId , new ReplicationFailedException ("Segment Replication failed" , e ), false );
603
- }
604
- });
605
- }
606
-
607
- private boolean isStoreCorrupt (SegmentReplicationTarget target ) {
608
- // ensure target is not already closed. In that case
609
- // we can assume the store is not corrupt and that the replication
610
- // event completed successfully.
611
- if (target .refCount () > 0 ) {
612
- final Store store = target .store ();
613
- if (store .tryIncRef ()) {
614
- try {
615
- return store .isMarkedCorrupted ();
616
- } catch (IOException ex ) {
617
- logger .warn ("Unable to determine if store is corrupt" , ex );
618
- return false ;
619
- } finally {
620
- store .decRef ();
621
- }
622
- }
623
- }
624
- // store already closed.
625
- return false ;
626
- }
627
-
628
550
private class FileChunkTransportRequestHandler implements TransportRequestHandler <FileChunkRequest > {
629
551
630
552
// How many bytes we've copied since we last called RateLimiter.pause
631
553
final AtomicLong bytesSinceLastPause = new AtomicLong ();
632
554
633
555
@ Override
634
556
public void messageReceived (final FileChunkRequest request , TransportChannel channel , Task task ) throws Exception {
635
- try (ReplicationRef <SegmentReplicationTarget > ref = onGoingReplications . getSafe (request .recoveryId (), request .shardId ())) {
557
+ try (ReplicationRef <SegmentReplicationTarget > ref = replicator . get (request .recoveryId (), request .shardId ())) {
636
558
final SegmentReplicationTarget target = ref .get ();
637
559
final ActionListener <Void > listener = target .createOrFinishListener (channel , Actions .FILE_CHUNK , request );
638
560
target .handleFileChunk (request , target , bytesSinceLastPause , recoverySettings .replicationRateLimiter (), listener );
0 commit comments