Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dd8aef1
CEP-45: Bulk transfer
aratno Sep 2, 2025
92b8c4f
Fix compaction of bulk transfers with existing SSTables clearing tran…
aratno Oct 17, 2025
0effc82
Fix TrackerTest for new serialization
aratno Oct 17, 2025
1cb1252
Fix LegacySSTableTest
aratno Oct 17, 2025
689c49f
Fix setup in RepairMessageVerbHandlerOutOfRangeTest, likely impacting…
aratno Oct 17, 2025
6a86a96
Fix CoordinatorLogTest
aratno Oct 17, 2025
44634c4
Fix ReadRepairQueryTester, since tracked keyspaces always perform rea…
aratno Oct 20, 2025
a733dc7
PR Feedback: only isolate streamed SSTables for StreamOperation.TRACK…
aratno Oct 23, 2025
55a9cb8
Refactor tests
aratno Oct 23, 2025
e898f46
Fix test failure due to offsets contained in reconciled and unreconci…
aratno Oct 23, 2025
7636f66
Refactor local vs remote activation, improve tests
aratno Oct 28, 2025
c7c07dd
Fix promotion of transferred SSTables into the repaired set
aratno Oct 28, 2025
390ed74
Fix outOutOfRange test
aratno Oct 28, 2025
5bc04bb
Add failing test for activation IDs spreading across token ranges by …
aratno Oct 29, 2025
d5eb0e2
Fix test by only including activation IDs within the read range
aratno Oct 29, 2025
8b9d683
PR feedback: two-phase commit naming conventions, tests and fixes for…
aratno Nov 3, 2025
19bb035
Update enum serializer to save a few bytes
aratno Nov 3, 2025
d303b95
Move CooordinatedTransfers to its own class
aratno Nov 3, 2025
c2b89bc
PROPOSE -> PREPARE
aratno Nov 3, 2025
9739ba6
Add paranoid check to only add activation IDs for reads on tracked ke…
aratno Nov 3, 2025
803f476
Add fuzz tests for ActivatedTransfers
aratno Nov 4, 2025
a07628b
Mutual exclusion for shard range changes
aratno Nov 4, 2025
e17184b
Fix propagation of failed commit
aratno Nov 4, 2025
082584a
Fix wrapping ranges, intersects coverage
aratno Nov 4, 2025
fa549e9
Improve threading and tests, handle cleanup of failed streams
aratno Nov 5, 2025
b2ef3d5
Retry activation on failed transfer commit
aratno Nov 6, 2025
8d4360e
Fix cleanup of transfers that failed at PREPARE
aratno Nov 6, 2025
a184e8f
WIP: Fix test failure due to synchronized transfer activation causing…
aratno Nov 7, 2025
83a12f3
Cleanup fixes, ensured BulkTransfersTest all still passes
aratno Nov 7, 2025
3e2c34d
Improve threading, run directly on anti-entropy
aratno Nov 7, 2025
25a1f04
Fix synchronization in LocalTransfers
aratno Nov 11, 2025
5280ee9
Separate mutations and transfers in CoordinatorLogOffsets API
aratno Nov 11, 2025
2b9c86a
Defensive copy of transfers on initialization
aratno Nov 11, 2025
4df5522
Add tests for LocalTransfers
aratno Nov 12, 2025
b952624
Fix initializer
aratno Nov 13, 2025
0d7d22d
PR feedback
aratno Nov 17, 2025
56b1f0d
PR feedback
aratno Nov 17, 2025
3de59cb
Make pending directories range-aware
aratno Nov 18, 2025
5aa347b
PR feedback
aratno Nov 18, 2025
f35fea3
Use existing Throwables.unchecked for conversions
aratno Nov 19, 2025
9b2fc35
Rebase fixes: cep-45-mutation-tracking
aratno Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel,
.build());
}

Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
public Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
{
Descriptor newDescriptor;
do
Expand Down
43 changes: 37 additions & 6 deletions src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.cassandra.service.snapshot.SnapshotManifest;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;

Expand Down Expand Up @@ -115,6 +116,7 @@ public class Directories

public static final String BACKUPS_SUBDIR = "backups";
public static final String SNAPSHOT_SUBDIR = "snapshots";
public static final String PENDING_SUBDIR = "pending";
public static final String TMP_SUBDIR = "tmp";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";

Expand Down Expand Up @@ -316,13 +318,8 @@ public File getLocationForDisk(DataDirectory dataDirectory)
{
if (dataDirectory != null)
for (File dir : dataPaths)
{
// Note that we must compare absolute paths (not canonical) here since keyspace directories might be symlinks
Path dirPath = dir.toAbsolute().toPath();
Path locationPath = dataDirectory.location.toAbsolute().toPath();
if (dirPath.startsWith(locationPath))
if (dataDirectory.contains(dir))
return dir;
}
return null;
}

Expand Down Expand Up @@ -727,6 +724,33 @@ public static File getSnapshotSchemaFile(File snapshotDir)
return new File(snapshotDir, "schema.cql");
}

@VisibleForTesting
public Set<File> getPendingLocations()
{
Set<File> result = new HashSet<>();
for (DataDirectory dataDirectory : dataDirectories.getAllDirectories())
{
for (File dir : dataPaths)
{
if (!dataDirectory.contains(dir))
continue;
result.add(getOrCreate(dir, PENDING_SUBDIR));
}
}
return result;
}

public File getPendingLocationForDisk(DataDirectory dataDirectory, TimeUUID planId)
{
for (File dir : dataPaths)
{
if (!dataDirectory.contains(dir))
continue;
return getOrCreate(dir, PENDING_SUBDIR, planId.toString());
}
throw new RuntimeException("Could not find pending location");
}

public static File getBackupsDirectory(Descriptor desc)
{
return getBackupsDirectory(desc.directory);
Expand Down Expand Up @@ -815,6 +839,13 @@ public DataDirectory(Path location)
this.location = new File(location);
}

public boolean contains(File file)
{
// Note that we must compare absolute paths (not canonical) here since keyspace directories might be symlinks
Path path = file.toAbsolute().toPath();
return path.startsWith(location.toAbsolute().toPath());
}

public long getAvailableSpace()
{
long availableSpace = PathUtils.tryGetSpace(location.toPath(), FileStore::getUsableSpace) - DatabaseDescriptor.getMinFreeSpacePerDriveInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ protected void recordLatency(TableMetrics metric, long latencyNanos)
public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController controller)
{
ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view.sstables);
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));

// fetch data from current memtable, historical memtables, and SSTables in the correct order.
Expand Down
48 changes: 48 additions & 0 deletions src/java/org/apache/cassandra/db/ReadExecutionController.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,37 @@
package org.apache.cassandra.db;

import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.NotThreadSafe;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.ColumnFamilyStore.ViewFragment;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.replication.ActivatedTransfers;
import org.apache.cassandra.replication.ShortMutationId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.OpOrder;

import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;

@NotThreadSafe
public class ReadExecutionController implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(ReadExecutionController.class);

private static final long NO_SAMPLING = Long.MIN_VALUE;

// For every reads
Expand All @@ -50,6 +66,13 @@ public class ReadExecutionController implements AutoCloseable
private final RepairedDataInfo repairedDataInfo;
private long oldestUnrepairedTombstone = Long.MAX_VALUE;

/**
* Track bulk transfers involved in the read, so we can do read reconciliation. These come from the
* {@link ViewFragment}, not the SSTable read path, so bloom filters and short-circuiting SSTable scans will still
* include the total set of relevant bulk transfers.
*/
private Set<ShortMutationId> activationIds = null;

ReadExecutionController(ReadCommand command,
OpOrder.Group baseOp,
TableMetadata baseMetadata,
Expand Down Expand Up @@ -243,4 +266,29 @@ private void addSample()
if (cfs != null)
cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros);
}

public void addActivationIds(Iterable<SSTableReader> sstables)
{
Preconditions.checkState(metadata().replicationType().isTracked());
if (activationIds == null)
activationIds = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ever call addActivationIds() multiple times during a read? If not, I would blow up if we try to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do actually access the same ReadExecutionController across threads, when an IN clause has multiple local partitions. I'll update this path to make it threadsafe.

Not exactly the same, but similar to: CASSANDRA-19427

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked more closely, and the case I was thinking of isn't across threads. Even when we're in SinglePartitionReadCommand.Group.executeLocally, we have a shared instance of ReadExecutionController across executions, but they're executing in serial on a single thread, so there's no concurrent access.

for (SSTableReader sstable : sstables)
{
ActivatedTransfers transfers = sstable.getCoordinatorLogOffsets().transfers();
if (transfers.isEmpty())
continue;
logger.trace("Adding overlapping IDs to read keyRange {}", command.dataRange.keyRange);
transfers.forEachIntersecting(command.dataRange.keyRange, id -> {
String msg = String.format("Found overlapping activation ID %s for queried range %s", id, command.dataRange.keyRange);
Tracing.trace(msg);
logger.debug(msg);
activationIds.add(id);
});
}
}

public Iterator<ShortMutationId> getActivationIds()
{
return activationIds == null ? null : activationIds.iterator();
Copy link
Contributor

@maedhroz maedhroz Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ever call getActivationIds() without having first set activationIds? If not, I would explode if we try to. I might also have addActivationIds() set activationIds to Collections.empty() when there are actually no IDs to let that newly created HashSet get collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to have tracked reads with no intersecting transfers, so we shouldn't explode on unset

}
}
30 changes: 22 additions & 8 deletions src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -80,11 +81,8 @@ synchronized List<String> importNewSSTables(Options options)
UUID importID = UUID.randomUUID();
logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options);

// This will be supported in the future
TableMetadata metadata = cfs.metadata();
if (metadata.replicationType() != null && metadata.replicationType().isTracked())
throw new IllegalStateException("Can't import into tables with mutation tracking enabled");

boolean isTracked = metadata.replicationType().isTracked();
List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths);

Set<Descriptor> currentDescriptors = new HashSet<>();
Expand Down Expand Up @@ -183,7 +181,10 @@ synchronized List<String> importNewSSTables(Options options)
Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
maybeMutateMetadata(entry.getKey(), options);
movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
// Don't move tracked SSTables, since that will move them to the live set on bounce
SSTableReader sstable = isTracked
? SSTableReader.open(cfs, oldDescriptor, metadata.ref)
: SSTableReader.moveAndOpenSSTable(cfs, oldDescriptor, newDescriptor, entry.getValue(), options.copyData);
newSSTablesPerDirectory.add(sstable);
}
catch (Throwable t)
Expand Down Expand Up @@ -233,12 +234,14 @@ synchronized List<String> importNewSSTables(Options options)
if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum))
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);
Copy link
Contributor

@maedhroz maedhroz Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few different scenarios here when an SAI index is present:

1.) SSTables are imported, but SAI components are not imported alongside them. In this case, buildSSTableAttachedIndexesBlocking() is called to make sure that they DO exist by the time addSSTables() is called.

2.) SSTables are imported w/ SAI components as well. In this case, we don't build anything, as validation should pass, and everything is where it should be.

In the tracked case, we need to stream, so I think one of two things can happen. The first is that we're not streaming entire files, in which case SAI indexes are going to have to be built on the receiving end. I think this should actually work right now, because the observer framework/listener stuff builds the index as the SSTable is written. However, I'm worried about the full file streaming case, because for that to work, the SAI components need to be added to the manifest before the stream starts. Looking at what's downstream of SSTableReader.open() in the importer for the tracked case, it looks like it does detect existing SAI components though...so perhaps false alarm.

Anyway, I speculate. We just need a test for this. We don't have CASSANDRA-20374 yet in this branch, of course, since it's not merged yet, but testing just the streaming and index building part (can just check index queryability) wouldn't be that bad.


cfs.getTracker().addSSTables(newSSTables);
if (isTracked)
TrackedBulkTransfer.execute(cfs.keyspace.getName(), newSSTables);
else
cfs.getTracker().addSSTables(newSSTables);

for (SSTableReader reader : newSSTables)
{
if (options.invalidateCaches && cfs.isRowCacheEnabled())
invalidateCachesForSSTable(reader);
}
}
catch (Throwable t)
{
Expand All @@ -250,6 +253,17 @@ synchronized List<String> importNewSSTables(Options options)
return failedDirectories;
}

/**
* TODO: Support user-defined consistency level for import, for import with replicas down
*/
private static class TrackedBulkTransfer
{
private static void execute(String keyspace, Set<SSTableReader> sstables)
{
MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL);
}
}

/**
* Check the state of this node and throws an {@link InterruptedException} if it is currently draining
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ReadableView view, Co
Tracing.trace("Acquiring sstable references");
List<SSTableReader> sstables = view.sstables();
sstables.sort(SSTableReader.maxTimestampDescending);
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(sstables);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
long mostRecentPartitionTombstone = Long.MIN_VALUE;
Expand Down Expand Up @@ -997,7 +999,6 @@ private boolean queriesMulticellType()
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ReadableView view, ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
{
Tracing.trace("Acquiring sstable references");

ImmutableBTreePartition result = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

Expand All @@ -1020,6 +1021,11 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ReadableV
/* add the SSTables on disk */
List<SSTableReader> sstables = view.sstables();
sstables.sort(SSTableReader.maxTimestampDescending);
if (cfs.metadata().replicationType().isTracked())
{
logger.trace("Executing read against SSTables {}", sstables);
controller.addActivationIds(view.sstables());
}
// read sorted sstables
for (SSTableReader sstable : sstables)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.service.snapshot.SnapshotOptions;
Expand Down Expand Up @@ -397,6 +398,7 @@ public static ImmutableCoordinatorLogOffsets getCoordinatorLogOffsets(Set<SSTabl
ImmutableCoordinatorLogOffsets.Builder builder = new ImmutableCoordinatorLogOffsets.Builder();
for (SSTableReader sstable : sstables)
builder.addAll(sstable.getCoordinatorLogOffsets());
builder.purgeTransfers(MutationTrackingService.instance::isDurablyReconciled);
return builder.build();
}

Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/db/lifecycle/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.cassandra.notifications.TableDroppedNotification;
import org.apache.cassandra.notifications.TablePreScrubNotification;
import org.apache.cassandra.notifications.TruncationNotification;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
Expand Down Expand Up @@ -270,6 +272,20 @@ public void updateInitialSSTableSize(Iterable<SSTableReader> sstables)

public void addSSTables(Collection<SSTableReader> sstables)
{
Preconditions.checkState(!cfstore.metadata().replicationType().isTracked());
addSSTablesInternal(sstables, false, true, true);
}

public void addSSTablesTracked(Collection<SSTableReader> sstables)
{
Preconditions.checkState(cfstore.metadata().replicationType().isTracked());
for (SSTableReader sstable : sstables)
{
ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets();
Preconditions.checkState(logOffsets.mutations().isEmpty());
Preconditions.checkState(!logOffsets.transfers().isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple thoughts:

1.) Would ImmutableCoordinatorLogOffsets benefit from a method like isTransfer() that we could use here? ^
2.) I'd consider actual error messages here (especially after collapsing the two checks above with isTransfer()), although if we used assertions correctly, those would be better (given this is a programmer error if it happens).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what isTransfer would represent, given that we'll have ICLO containing a mix of mutations and transfers after compaction has happened. I can make this throw a "real" exception though, with a message containing logOffsets.

}

addSSTablesInternal(sstables, false, true, true);
}

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/lifecycle/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,4 @@ public boolean apply(T t)
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.FBUtilities;

import static java.lang.String.format;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
Expand Down Expand Up @@ -139,7 +140,7 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException

UnaryOperator<StatsMetadata> transform = stats -> stats.mutateLevel(header.sstableLevel)
.mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair);
String description = String.format("level %s and repairedAt time %s and pendingRepair %s",
String description = format("level %s and repairedAt time %s and pendingRepair %s",
header.sstableLevel, messageHeader.repairedAt, messageHeader.pendingRepair);
writer.descriptor().getMetadataSerializer().mutate(writer.descriptor(), description, transform);
return writer;
Expand All @@ -159,9 +160,13 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException

private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException
{
boolean isTracked = cfs.metadata().replicationType().isTracked();

Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize)));
throw new IOException(format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
if (isTracked)
return cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId());

File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));

Expand Down
Loading