-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-20383 #4428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: cep-45-mutation-tracking
Are you sure you want to change the base?
CASSANDRA-20383 #4428
Changes from all commits
dd8aef1
92b8c4f
0effc82
1cb1252
689c49f
6a86a96
44634c4
a733dc7
55a9cb8
e898f46
7636f66
c7c07dd
390ed74
5bc04bb
d5eb0e2
8b9d683
19bb035
d303b95
c2b89bc
9739ba6
803f476
a07628b
e17184b
082584a
fa549e9
b2ef3d5
8d4360e
a184e8f
83a12f3
3e2c34d
25a1f04
5280ee9
2b9c86a
4df5522
b952624
0d7d22d
56b1f0d
3de59cb
5aa347b
f35fea3
9b2fc35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,21 +18,37 @@ | |
| package org.apache.cassandra.db; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import javax.annotation.concurrent.NotThreadSafe; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Preconditions; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.cassandra.db.ColumnFamilyStore.ViewFragment; | ||
| import org.apache.cassandra.db.filter.DataLimits; | ||
| import org.apache.cassandra.index.Index; | ||
| import org.apache.cassandra.io.sstable.format.SSTableReader; | ||
| import org.apache.cassandra.replication.ActivatedTransfers; | ||
| import org.apache.cassandra.replication.ShortMutationId; | ||
| import org.apache.cassandra.schema.TableMetadata; | ||
| import org.apache.cassandra.tracing.Tracing; | ||
| import org.apache.cassandra.utils.MonotonicClock; | ||
| import org.apache.cassandra.utils.concurrent.OpOrder; | ||
|
|
||
| import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; | ||
|
|
||
| @NotThreadSafe | ||
| public class ReadExecutionController implements AutoCloseable | ||
aratno marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| private static final Logger logger = LoggerFactory.getLogger(ReadExecutionController.class); | ||
|
|
||
| private static final long NO_SAMPLING = Long.MIN_VALUE; | ||
|
|
||
| // For every reads | ||
|
|
@@ -50,6 +66,13 @@ public class ReadExecutionController implements AutoCloseable | |
| private final RepairedDataInfo repairedDataInfo; | ||
| private long oldestUnrepairedTombstone = Long.MAX_VALUE; | ||
|
|
||
| /** | ||
| * Track bulk transfers involved in the read, so we can do read reconciliation. These come from the | ||
| * {@link ViewFragment}, not the SSTable read path, so bloom filters and short-circuiting SSTable scans will still | ||
| * include the total set of relevant bulk transfers. | ||
| */ | ||
aratno marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private Set<ShortMutationId> activationIds = null; | ||
|
|
||
| ReadExecutionController(ReadCommand command, | ||
| OpOrder.Group baseOp, | ||
| TableMetadata baseMetadata, | ||
|
|
@@ -243,4 +266,29 @@ private void addSample() | |
| if (cfs != null) | ||
| cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros); | ||
| } | ||
|
|
||
| public void addActivationIds(Iterable<SSTableReader> sstables) | ||
| { | ||
| Preconditions.checkState(metadata().replicationType().isTracked()); | ||
| if (activationIds == null) | ||
| activationIds = new HashSet<>(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we ever call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do actually access the same ReadExecutionController across threads, when an IN clause has multiple local partitions. I'll update this path to make it threadsafe. Not exactly the same, but similar to: CASSANDRA-19427
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looked more closely, and the case I was thinking of isn't across threads. Even when we're in |
||
| for (SSTableReader sstable : sstables) | ||
| { | ||
| ActivatedTransfers transfers = sstable.getCoordinatorLogOffsets().transfers(); | ||
| if (transfers.isEmpty()) | ||
| continue; | ||
| logger.trace("Adding overlapping IDs to read keyRange {}", command.dataRange.keyRange); | ||
| transfers.forEachIntersecting(command.dataRange.keyRange, id -> { | ||
| String msg = String.format("Found overlapping activation ID %s for queried range %s", id, command.dataRange.keyRange); | ||
| Tracing.trace(msg); | ||
| logger.debug(msg); | ||
| activationIds.add(id); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| public Iterator<ShortMutationId> getActivationIds() | ||
| { | ||
| return activationIds == null ? null : activationIds.iterator(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we ever call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine to have tracked reads with no intersecting transfers, so we shouldn't explode on unset |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |
| import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; | ||
| import org.apache.cassandra.io.sstable.format.SSTableReader; | ||
| import org.apache.cassandra.io.util.File; | ||
| import org.apache.cassandra.replication.MutationTrackingService; | ||
| import org.apache.cassandra.schema.TableMetadata; | ||
| import org.apache.cassandra.service.ActiveRepairService; | ||
| import org.apache.cassandra.service.StorageService; | ||
|
|
@@ -80,11 +81,8 @@ synchronized List<String> importNewSSTables(Options options) | |
| UUID importID = UUID.randomUUID(); | ||
| logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options); | ||
|
|
||
| // This will be supported in the future | ||
| TableMetadata metadata = cfs.metadata(); | ||
| if (metadata.replicationType() != null && metadata.replicationType().isTracked()) | ||
| throw new IllegalStateException("Can't import into tables with mutation tracking enabled"); | ||
|
|
||
| boolean isTracked = metadata.replicationType().isTracked(); | ||
| List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths); | ||
|
|
||
| Set<Descriptor> currentDescriptors = new HashSet<>(); | ||
|
|
@@ -183,7 +181,10 @@ synchronized List<String> importNewSSTables(Options options) | |
| Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir); | ||
| maybeMutateMetadata(entry.getKey(), options); | ||
| movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue())); | ||
| SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData); | ||
| // Don't move tracked SSTables, since that will move them to the live set on bounce | ||
| SSTableReader sstable = isTracked | ||
| ? SSTableReader.open(cfs, oldDescriptor, metadata.ref) | ||
| : SSTableReader.moveAndOpenSSTable(cfs, oldDescriptor, newDescriptor, entry.getValue(), options.copyData); | ||
| newSSTablesPerDirectory.add(sstable); | ||
| } | ||
| catch (Throwable t) | ||
|
|
@@ -233,12 +234,14 @@ synchronized List<String> importNewSSTables(Options options) | |
| if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum)) | ||
| cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are a few different scenarios here when an SAI index is present: 1.) SSTables are imported, but SAI components are not imported alongside them. In this case, 2.) SSTables are imported w/ SAI components as well. In this case, we don't build anything, as validation should pass, and everything is where it should be. In the tracked case, we need to stream, so I think one of two things can happen. The first is that we're not streaming entire files, in which case SAI indexes are going to have to be built on the receiving end. I think this should actually work right now, because the observer framework/listener stuff builds the index as the SSTable is written. However, I'm worried about the full file streaming case, because for that to work, the SAI components need to be added to the manifest before the stream starts. Looking at what's downstream of Anyway, I speculate. We just need a test for this. We don't have CASSANDRA-20374 yet in this branch, of course, since it's not merged yet, but testing just the streaming and index building part (can just check index queryability) wouldn't be that bad. |
||
|
|
||
| cfs.getTracker().addSSTables(newSSTables); | ||
| if (isTracked) | ||
| TrackedBulkTransfer.execute(cfs.keyspace.getName(), newSSTables); | ||
| else | ||
| cfs.getTracker().addSSTables(newSSTables); | ||
|
|
||
| for (SSTableReader reader : newSSTables) | ||
| { | ||
| if (options.invalidateCaches && cfs.isRowCacheEnabled()) | ||
| invalidateCachesForSSTable(reader); | ||
| } | ||
| } | ||
| catch (Throwable t) | ||
| { | ||
|
|
@@ -250,6 +253,17 @@ synchronized List<String> importNewSSTables(Options options) | |
| return failedDirectories; | ||
| } | ||
|
|
||
| /** | ||
| * TODO: Support user-defined consistency level for import, for import with replicas down | ||
| */ | ||
| private static class TrackedBulkTransfer | ||
| { | ||
| private static void execute(String keyspace, Set<SSTableReader> sstables) | ||
| { | ||
| MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check the state of this node and throws an {@link InterruptedException} if it is currently draining | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Function; | ||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.base.Predicate; | ||
| import com.google.common.base.Predicates; | ||
| import com.google.common.collect.Iterables; | ||
|
|
@@ -61,6 +62,7 @@ | |
| import org.apache.cassandra.notifications.TableDroppedNotification; | ||
| import org.apache.cassandra.notifications.TablePreScrubNotification; | ||
| import org.apache.cassandra.notifications.TruncationNotification; | ||
| import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; | ||
| import org.apache.cassandra.utils.Pair; | ||
| import org.apache.cassandra.utils.Throwables; | ||
| import org.apache.cassandra.utils.TimeUUID; | ||
|
|
@@ -270,6 +272,20 @@ public void updateInitialSSTableSize(Iterable<SSTableReader> sstables) | |
|
|
||
| public void addSSTables(Collection<SSTableReader> sstables) | ||
| { | ||
| Preconditions.checkState(!cfstore.metadata().replicationType().isTracked()); | ||
| addSSTablesInternal(sstables, false, true, true); | ||
| } | ||
|
|
||
| public void addSSTablesTracked(Collection<SSTableReader> sstables) | ||
| { | ||
| Preconditions.checkState(cfstore.metadata().replicationType().isTracked()); | ||
| for (SSTableReader sstable : sstables) | ||
| { | ||
| ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets(); | ||
| Preconditions.checkState(logOffsets.mutations().isEmpty()); | ||
| Preconditions.checkState(!logOffsets.transfers().isEmpty()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couple thoughts: 1.) Would
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what isTransfer would represent, given that we'll have ICLO containing a mix of mutations and transfers after compaction has happened. I can make this throw a "real" exception though, with a message containing logOffsets. |
||
| } | ||
|
|
||
| addSSTablesInternal(sstables, false, true, true); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -402,4 +402,4 @@ public boolean apply(T t) | |
| } | ||
| }; | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.