-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-20383 #4428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: cep-45-mutation-tracking
Are you sure you want to change the base?
CASSANDRA-20383 #4428
Conversation
bdeggleston
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have some end to end tests, but this could also use some testing of the smaller components in more detail / with more cases.
I think it's ok to not have durability and GC as part of the initial implementation, but we do need follow tickets for them. Working out when and how to abandon transfers deserves its own ticket anyway
src/java/org/apache/cassandra/replication/UnreconciledTransfers.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
Outdated
Show resolved
Hide resolved
| { | ||
| try | ||
| { | ||
| sstable.mutateCoordinatorLogOffsetsAndReload(ImmutableCoordinatorLogOffsets.NONE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we loosen up validation on the stream receiving side, we can remove this step, which would make it safe for use with failure recovery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's loosen this up when we have consensus on the failure recovery specification? Finishing up my draft of that specification now, will be ready soon.
src/java/org/apache/cassandra/replication/MutationTrackingService.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
Outdated
Show resolved
Hide resolved
| * <p> | ||
| * REVIEW: Reset back to 1 because for transfers, replicas need to know each others' shards, since transfers are | ||
| * sliced to fit within shards. Can we achieve sharding via split range ownership, instead of it being local-only? | ||
| * <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added as an aid to perf testing. I think reverting it to one is fine, especially since there's a functional reason here, but we should open a jira to revisit it
src/java/org/apache/cassandra/replication/MutationTrackingService.java
Outdated
Show resolved
Hide resolved
| cfs.getTracker().addSSTablesTracked(Collections.singleton(moved)); | ||
| } | ||
|
|
||
| activated = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obviously we don't have durability implemented for this yet, but we should definitely have tests validating the restart safety of partially completed activations when we do
| } | ||
|
|
||
| Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty()); | ||
| Preconditions.checkState(!sstable.getCoordinatorLogOffsets().transfers().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a little weird from an api perspective. We're asserting that the log offsets are empty, then asserting that a collection contained in them is not empty. We should probably rename the ImmutableLogOffsets.isEmpty to something indicating that we want to know if the offsets are empty, or if the transfers are empty. There may also be other spots in the code where we take some action if the log offsets are empty without confirming that the transfers are also empty. So we should rethink some parts of this to make it less likely to be misused
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java
Outdated
Show resolved
Hide resolved
| ImmutableCoordinatorLogOffsets.Builder builder = new ImmutableCoordinatorLogOffsets.Builder(); | ||
| for (SSTableReader sstable : sstables) | ||
| builder.addAll(sstable.getCoordinatorLogOffsets()); | ||
| builder.purgeTransfers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the idea here is that transfer IDs that are durably reconciled will never trigger reconciliation anyway...so they just don't need to be in the new post-compaction SSTables? Makes sense, although I wonder if a short comment to that effect would be helpful...or...rename to purgeReconciledTransfers()...or maybe by favorite...make Predicate<ShortMutationId> pred an argument for purgeTransfers() and pass MutationTrackingService.instance::isDurablyReconciled.
| { | ||
| ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets(); | ||
| Preconditions.checkState(logOffsets.mutations().isEmpty()); | ||
| Preconditions.checkState(!logOffsets.transfers().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple thoughts:
1.) Would ImmutableCoordinatorLogOffsets benefit from a method like isTransfer() that we could use here? ^
2.) I'd consider actual error messages here (especially after collapsing the two checks above with isTransfer()), although if we used assertions correctly, those would be better (given this is a programmer error if it happens).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what isTransfer would represent, given that we'll have ICLO containing a mix of mutations and transfers after compaction has happened. I can make this throw a "real" exception though, with a message containing logOffsets.
|
|
||
| // Validate existing SSTable-attached indexes, and then build any that are missing: | ||
| if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum)) | ||
| cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few different scenarios here when an SAI index is present:
1.) SSTables are imported, but SAI components are not imported alongside them. In this case, buildSSTableAttachedIndexesBlocking() is called to make sure that they DO exist by the time addSSTables() is called.
2.) SSTables are imported w/ SAI components as well. In this case, we don't build anything, as validation should pass, and everything is where it should be.
In the tracked case, we need to stream, so I think one of two things can happen. The first is that we're not streaming entire files, in which case SAI indexes are going to have to be built on the receiving end. I think this should actually work right now, because the observer framework/listener stuff builds the index as the SSTable is written. However, I'm worried about the full file streaming case, because for that to work, the SAI components need to be added to the manifest before the stream starts. Looking at what's downstream of SSTableReader.open() in the importer for the tracked case, it looks like it does detect existing SAI components though...so perhaps false alarm.
Anyway, I speculate. We just need a test for this. We don't have CASSANDRA-20374 yet in this branch, of course, since it's not merged yet, but testing just the streaming and index building part (can just check index queryability) wouldn't be that bad.
src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
Outdated
Show resolved
Hide resolved
| BULK_LOAD("Bulk Load", true, false, false), | ||
| REPAIR("Repair", true, false, true); | ||
| REPAIR("Repair", true, false, true), | ||
| TRACKED_TRANSFER("Tracked Import", false, false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Are all tracked transfers tracked imports? We seem to replicate the naming for the other instances...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, yes, all tracked transfers are imports. But there's some discussion about failure recovery potentially using a tracked transfer as well, so I made this separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed naming fix, so the enum name and the human name are in agreement
| return new SSTableTxnSingleStreamWriter(txn, writer); | ||
| if (session.streamOperation() == StreamOperation.TRACKED_TRANSFER) | ||
| { | ||
| Preconditions.checkState(cfs.metadata().replicationType().isTracked()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not sure if we want a message here if we violate
src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
Outdated
Show resolved
Hide resolved
|
|
||
| cfs.addSSTables(readers); | ||
|
|
||
| //invalidate row and counter cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense that we wouldn't invalidate the caches here on a tracked import, but do we need to make sure we do it on activation?
| { | ||
| Preconditions.checkState(metadata().replicationType().isTracked()); | ||
| if (activationIds == null) | ||
| activationIds = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we ever call addActivationIds() multiple times during a read? If not, I would blow up if we try to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do actually access the same ReadExecutionController across threads, when an IN clause has multiple local partitions. I'll update this path to make it threadsafe.
Not exactly the same, but similar to: CASSANDRA-19427
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked more closely, and the case I was thinking of isn't across threads. Even when we're in SinglePartitionReadCommand.Group.executeLocally, we have a shared instance of ReadExecutionController across executions, but they're executing in serial on a single thread, so there's no concurrent access.
|
|
||
| public Iterator<ShortMutationId> getActivationIds() | ||
| { | ||
| return activationIds == null ? null : activationIds.iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we ever call getActivationIds() without having first set activationIds? If not, I would explode if we try to. I might also have addActivationIds() set activationIds to Collections.empty() when there are actually no IDs to let that newly created HashSet get collected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to have tracked reads with no intersecting transfers, so we shouldn't explode on unset
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
Outdated
Show resolved
Hide resolved
| public void mutateCoordinatorLogOffsets(Descriptor descriptor, ImmutableCoordinatorLogOffsets logOffsets) throws IOException | ||
| { | ||
| if (logger.isTraceEnabled()) | ||
| logger.trace("Mutating {} to {}", descriptor.fileFor(Components.STATS), logOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we customize the trace message like we do for mutateLevel()?
| while (activationIds.hasNext()) | ||
| { | ||
| ShortMutationId id = activationIds.next(); | ||
| builder.builderForLog(id).unreconciled.add(id.offset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What if we Just have merge() take a Set for activationIds? I'm not sure the iterator buys us anything. (I was going to say that might let us create a new addAll() in AbstractMutable, but each ID could have a different builder I guess. It just sucks, because we might double the size of the array in add().)
| public class TrackerTest | ||
| { | ||
| private static final int COMPONENT_STATS_SIZE_BYTES = 4805; | ||
| private static final int COMPONENT_STATS_SIZE_BYTES = 4806; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've had to deal with test failures here and there around SSTable component sizes in tests, where different configurations use different SSTable formats, etc. (Just want to make sure this wouldn't be problematic later...)
test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/replication/MutationTrackingService.java
Outdated
Show resolved
Hide resolved
… failed activation propose
… subsequent activation attempts to also time out, need to clean up extra logging
694fe62 to
261535d
Compare
261535d to
9b2fc35
Compare
No description provided.