|
15 | 15 | import org.elasticsearch.action.RoutingMissingException;
|
16 | 16 | import org.elasticsearch.action.index.IndexRequest;
|
17 | 17 | import org.elasticsearch.cluster.metadata.IndexMetadata;
|
| 18 | +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; |
| 19 | +import org.elasticsearch.cluster.metadata.IndexReshardingState; |
18 | 20 | import org.elasticsearch.cluster.metadata.MappingMetadata;
|
19 | 21 | import org.elasticsearch.common.ParsingException;
|
20 | 22 | import org.elasticsearch.common.Strings;
|
@@ -73,11 +75,13 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {
|
73 | 75 | protected final String indexName;
|
74 | 76 | private final int routingNumShards;
|
75 | 77 | private final int routingFactor;
|
| 78 | + private final IndexReshardingMetadata indexReshardingMetadata; |
76 | 79 |
|
77 | 80 | private IndexRouting(IndexMetadata metadata) {
|
78 | 81 | this.indexName = metadata.getIndex().getName();
|
79 | 82 | this.routingNumShards = metadata.getRoutingNumShards();
|
80 | 83 | this.routingFactor = metadata.getRoutingFactor();
|
| 84 | + this.indexReshardingMetadata = metadata.getReshardingMetadata(); |
81 | 85 | }
|
82 | 86 |
|
83 | 87 | /**
|
@@ -149,6 +153,23 @@ private static int effectiveRoutingToHash(String effectiveRouting) {
|
149 | 153 | */
|
150 | 154 | public void checkIndexSplitAllowed() {}
|
151 | 155 |
|
| 156 | + /** |
| 157 | + * If this index is in the process of resharding, and the shard to which this request is being routed, |
| 158 | + * is a target shard that is not yet in HANDOFF state, then route it to the source shard. |
| 159 | + * @param shardId shardId to which the current document is routed based on hashing |
| 160 | + * @return Updated shardId |
| 161 | + */ |
| 162 | + protected final int rerouteIfResharding(int shardId) { |
| 163 | + if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) { |
| 164 | + assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split"; |
| 165 | + if (indexReshardingMetadata.getSplit() |
| 166 | + .targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) { |
| 167 | + return indexReshardingMetadata.getSplit().sourceShard(shardId); |
| 168 | + } |
| 169 | + } |
| 170 | + return shardId; |
| 171 | + } |
| 172 | + |
152 | 173 | private abstract static class IdAndRoutingOnly extends IndexRouting {
|
153 | 174 | private final boolean routingRequired;
|
154 | 175 | private final IndexVersion creationVersion;
|
@@ -195,19 +216,22 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
|
195 | 216 | throw new IllegalStateException("id is required and should have been set by process");
|
196 | 217 | }
|
197 | 218 | checkRoutingRequired(id, routing);
|
198 |
| - return shardId(id, routing); |
| 219 | + int shardId = shardId(id, routing); |
| 220 | + return rerouteIfResharding(shardId); |
199 | 221 | }
|
200 | 222 |
|
201 | 223 | @Override
|
202 | 224 | public int updateShard(String id, @Nullable String routing) {
|
203 | 225 | checkRoutingRequired(id, routing);
|
204 |
| - return shardId(id, routing); |
| 226 | + int shardId = shardId(id, routing); |
| 227 | + return rerouteIfResharding(shardId); |
205 | 228 | }
|
206 | 229 |
|
207 | 230 | @Override
|
208 | 231 | public int deleteShard(String id, @Nullable String routing) {
|
209 | 232 | checkRoutingRequired(id, routing);
|
210 |
| - return shardId(id, routing); |
| 233 | + int shardId = shardId(id, routing); |
| 234 | + return rerouteIfResharding(shardId); |
211 | 235 | }
|
212 | 236 |
|
213 | 237 | @Override
|
@@ -314,7 +338,8 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
|
314 | 338 | assert Transports.assertNotTransportThread("parsing the _source can get slow");
|
315 | 339 | checkNoRouting(routing);
|
316 | 340 | hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
|
317 |
| - return hashToShardId(hash); |
| 341 | + int shardId = hashToShardId(hash); |
| 342 | + return (rerouteIfResharding(shardId)); |
318 | 343 | }
|
319 | 344 |
|
320 | 345 | public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
|
@@ -454,13 +479,15 @@ public int updateShard(String id, @Nullable String routing) {
|
454 | 479 | @Override
|
455 | 480 | public int deleteShard(String id, @Nullable String routing) {
|
456 | 481 | checkNoRouting(routing);
|
457 |
| - return idToHash(id); |
| 482 | + int shardId = idToHash(id); |
| 483 | + return (rerouteIfResharding(shardId)); |
458 | 484 | }
|
459 | 485 |
|
460 | 486 | @Override
|
461 | 487 | public int getShard(String id, @Nullable String routing) {
|
462 | 488 | checkNoRouting(routing);
|
463 |
| - return idToHash(id); |
| 489 | + int shardId = idToHash(id); |
| 490 | + return (rerouteIfResharding(shardId)); |
464 | 491 | }
|
465 | 492 |
|
466 | 493 | private void checkNoRouting(@Nullable String routing) {
|
|
0 commit comments