Skip to content

Conversation

@aratno
Copy link
Contributor

@aratno aratno commented Oct 14, 2025

No description provided.

@aratno aratno requested a review from bdeggleston October 14, 2025 20:42
@aratno aratno self-assigned this Oct 14, 2025
@aratno aratno mentioned this pull request Oct 14, 2025
Copy link
Member

@bdeggleston bdeggleston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have some end to end tests, but this could also use some testing of the smaller components in more detail / with more cases.

I think it's ok to not have durability and GC as part of the initial implementation, but we do need follow tickets for them. Working out when and how to abandon transfers deserves its own ticket anyway

{
try
{
sstable.mutateCoordinatorLogOffsetsAndReload(ImmutableCoordinatorLogOffsets.NONE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we loosen up validation on the stream receiving side, we can remove this step, which would make it safe for use with failure recovery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's loosen this up when we have consensus on the failure recovery specification? Finishing up my draft of that specification now, will be ready soon.

* <p>
* REVIEW: Reset back to 1 because for transfers, replicas need to know each others' shards, since transfers are
* sliced to fit within shards. Can we achieve sharding via split range ownership, instead of it being local-only?
* <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added as an aid to perf testing. I think reverting it to one is fine, especially since there's a functional reason here, but we should open a jira to revisit it

cfs.getTracker().addSSTablesTracked(Collections.singleton(moved));
}

activated = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obviously we don't have durability implemented for this yet, but we should definitely have tests validating the restart safety of partially completed activations when we do

}

Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty());
Preconditions.checkState(!sstable.getCoordinatorLogOffsets().transfers().isEmpty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little weird from an api perspective. We're asserting that the log offsets are empty, then asserting that a collection contained in them is not empty. We should probably rename the ImmutableLogOffsets.isEmpty to something indicating that we want to know if the offsets are empty, or if the transfers are empty. There may also be other spots in the code where we take some action if the log offsets are empty without confirming that the transfers are also empty. So we should rethink some parts of this to make it less likely to be misused

@maedhroz maedhroz self-requested a review November 14, 2025 16:33
ImmutableCoordinatorLogOffsets.Builder builder = new ImmutableCoordinatorLogOffsets.Builder();
for (SSTableReader sstable : sstables)
builder.addAll(sstable.getCoordinatorLogOffsets());
builder.purgeTransfers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that transfer IDs that are durably reconciled will never trigger reconciliation anyway...so they just don't need to be in the new post-compaction SSTables? Makes sense, although I wonder if a short comment to that effect would be helpful...or...rename to purgeReconciledTransfers()...or maybe by favorite...make Predicate<ShortMutationId> pred an argument for purgeTransfers() and pass MutationTrackingService.instance::isDurablyReconciled.

{
ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets();
Preconditions.checkState(logOffsets.mutations().isEmpty());
Preconditions.checkState(!logOffsets.transfers().isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple thoughts:

1.) Would ImmutableCoordinatorLogOffsets benefit from a method like isTransfer() that we could use here? ^
2.) I'd consider actual error messages here (especially after collapsing the two checks above with isTransfer()), although if we used assertions correctly, those would be better (given this is a programmer error if it happens).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what isTransfer would represent, given that we'll have ICLO containing a mix of mutations and transfers after compaction has happened. I can make this throw a "real" exception though, with a message containing logOffsets.


// Validate existing SSTable-attached indexes, and then build any that are missing:
if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum))
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);
Copy link
Contributor

@maedhroz maedhroz Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few different scenarios here when an SAI index is present:

1.) SSTables are imported, but SAI components are not imported alongside them. In this case, buildSSTableAttachedIndexesBlocking() is called to make sure that they DO exist by the time addSSTables() is called.

2.) SSTables are imported w/ SAI components as well. In this case, we don't build anything, as validation should pass, and everything is where it should be.

In the tracked case, we need to stream, so I think one of two things can happen. The first is that we're not streaming entire files, in which case SAI indexes are going to have to be built on the receiving end. I think this should actually work right now, because the observer framework/listener stuff builds the index as the SSTable is written. However, I'm worried about the full file streaming case, because for that to work, the SAI components need to be added to the manifest before the stream starts. Looking at what's downstream of SSTableReader.open() in the importer for the tracked case, it looks like it does detect existing SAI components though...so perhaps false alarm.

Anyway, I speculate. We just need a test for this. We don't have CASSANDRA-20374 yet in this branch, of course, since it's not merged yet, but testing just the streaming and index building part (can just check index queryability) wouldn't be that bad.

BULK_LOAD("Bulk Load", true, false, false),
REPAIR("Repair", true, false, true);
REPAIR("Repair", true, false, true),
TRACKED_TRANSFER("Tracked Import", false, false, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Are all tracked transfers tracked imports? We seem to replicate the naming for the other instances...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, yes, all tracked transfers are imports. But there's some discussion about failure recovery potentially using a tracked transfer as well, so I made this separate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed naming fix, so the enum name and the human name are in agreement

return new SSTableTxnSingleStreamWriter(txn, writer);
if (session.streamOperation() == StreamOperation.TRACKED_TRANSFER)
{
Preconditions.checkState(cfs.metadata().replicationType().isTracked());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not sure if we want a message here if we violate


cfs.addSSTables(readers);

//invalidate row and counter cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that we wouldn't invalidate the caches here on a tracked import, but do we need to make sure we do it on activation?

{
Preconditions.checkState(metadata().replicationType().isTracked());
if (activationIds == null)
activationIds = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ever call addActivationIds() multiple times during a read? If not, I would blow up if we try to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do actually access the same ReadExecutionController across threads, when an IN clause has multiple local partitions. I'll update this path to make it threadsafe.

Not exactly the same, but similar to: CASSANDRA-19427

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked more closely, and the case I was thinking of isn't across threads. Even when we're in SinglePartitionReadCommand.Group.executeLocally, we have a shared instance of ReadExecutionController across executions, but they're executing in serial on a single thread, so there's no concurrent access.


public Iterator<ShortMutationId> getActivationIds()
{
return activationIds == null ? null : activationIds.iterator();
Copy link
Contributor

@maedhroz maedhroz Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ever call getActivationIds() without having first set activationIds? If not, I would explode if we try to. I might also have addActivationIds() set activationIds to Collections.empty() when there are actually no IDs to let that newly created HashSet get collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to have tracked reads with no intersecting transfers, so we shouldn't explode on unset

public void mutateCoordinatorLogOffsets(Descriptor descriptor, ImmutableCoordinatorLogOffsets logOffsets) throws IOException
{
if (logger.isTraceEnabled())
logger.trace("Mutating {} to {}", descriptor.fileFor(Components.STATS), logOffsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we customize the trace message like we do for mutateLevel()?

while (activationIds.hasNext())
{
ShortMutationId id = activationIds.next();
builder.builderForLog(id).unreconciled.add(id.offset());
Copy link
Contributor

@maedhroz maedhroz Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What if we Just have merge() take a Set for activationIds? I'm not sure the iterator buys us anything. (I was going to say that might let us create a new addAll() in AbstractMutable, but each ID could have a different builder I guess. It just sucks, because we might double the size of the array in add().)

public class TrackerTest
{
private static final int COMPONENT_STATS_SIZE_BYTES = 4805;
private static final int COMPONENT_STATS_SIZE_BYTES = 4806;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had to deal with test failures here and there around SSTable component sizes in tests, where different configurations use different SSTable formats, etc. (Just want to make sure this wouldn't be problematic later...)

… subsequent activation attempts to also time out, need to clean up extra logging
@aratno aratno force-pushed the CASSANDRA-20383-rebase-iii branch from 694fe62 to 261535d Compare November 19, 2025 20:20
@aratno aratno force-pushed the CASSANDRA-20383-rebase-iii branch from 261535d to 9b2fc35 Compare November 19, 2025 20:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants