Skip to content

Conversation

@nitsanw
Copy link
Contributor

@nitsanw nitsanw commented Sep 29, 2025

patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD

  • EPPv1 - all new code
  • Cursor compaction integration
  • JMH benchmarks for compaction and cursor impls
  • EPPv1 - New tests
  • Existing tests tweaks for new code
  • [revert?] change the default partitioner to expand testing of new code
  • [revert?] test data used some benchmarks
  • [revert?] jmh tweak GC settings for stability
  • [revert?] javadoc typos, marking unused params
  • [revert?] clarifying comment
  • [revert?] toString improvement
  • [revert?] remove spurious keywords
  • [revert?] marking metadata collection
  • [revert?] cursor verifier
  • Exclude SAI and counter column
  • Exclude BTI and legacy versions
  • Temporarily skip very long running test

Thanks for sending a pull request! Here are some tips if you're new here:

  • Ensure you have added or run the appropriate tests for your PR.
  • Be sure to keep the PR description updated to reflect all changes.
  • Write your PR title to summarize what this PR proposes.
  • If possible, provide a concise example to reproduce the issue for a faster review.
  • Read our contributor guidelines
  • If you're making a documentation change, see our guide to documentation contribution

Commit messages should follow the following format:

<One sentence description, usually Jira title or CHANGES.txt summary>

<Optional lengthier description (context on patch)>

patch by <Authors>; reviewed by <Reviewers> for CASSANDRA-#####

Co-authored-by: Name1 <email1>
Co-authored-by: Name2 <email2>

The Cassandra Jira

Nitsan Wakart and others added 2 commits August 28, 2025 14:26
patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD

- EPPv1 - all new code
- Cursor compaction integration
- JMH benchmarks for compaction and cursor impls
- EPPv1 - New tests
- Existing tests tweaks for new code
- [revert?] change the default partitioner to expand testing of new code
- [revert?] test data used some benchmarks
- [revert?] jmh tweak GC settings for stability
- [revert?] javadoc typos, marking unused params
- [revert?] clarifying comment
- [revert?] toString improvement
- [revert?] remove spurious keywords
- [revert?] marking metadata collection
- [revert?] cursor verifier
- Exclude SAI and counter column
- Exclude BTI and legacy versions
- Temporarily skip very long running test
@nitsanw nitsanw changed the title Add cursor based optimized compaction path (WIP) CASSANDRA-20918 Add cursor-based low allocation optimized compaction implementation Sep 29, 2025
@blambov blambov self-requested a review September 30, 2025 07:40
{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
public static final int MEGABYTE = 1024 * 1024 * 1024;
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to Config.java? One advantage of having it there is that the AST tests (like SingleNodeTableWalkTest) that generate random config would be able to exercise it. That's our best path to coverage with lots of different schemas and configurations.

If you can locally do a longer run of that test with cursor-compaction enabled, that would be useful too. That would be done via overriding StatefulASTBase#clusterConfig with the new config set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.

Let's make sure that among test, test-oa and test-latest we have at least one that is running with cursor compaction and one without.

Copy link
Contributor Author

@nitsanw nitsanw Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure that among test, test-oa and test-latest we have at least one that is running with cursor compaction and one without.

How do I do that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the testlist-xxx definitions in build.xml. E.g. adding a line to set this flag to false to testlist-oa should be enough.

{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
public static final int MEGABYTE = 1024 * 1024 * 1024;
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.

Let's make sure that among test, test-oa and test-latest we have at least one that is running with cursor compaction and one without.

Copy link
Contributor

@blambov blambov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must be stated that this approach that bundles all the steps of the processing in one single file will be quite difficult to maintain and keep in sync with the combination of iterators and transformations that we use in other parts of the code such as the query path. However, once we have reached a point of stability for a piece of functionality where we do not expect it to change significantly for a long time, it does makes sense to unpack the code and present it in a way that makes its execution as direct as possible, and this patch is a good such representation of the compaction process.

Personally, I am very unhappy about switching to mutable, pooled and reused objects, which are significantly more unwieldy and error prone, especially in contexts where concurrent access can occur. It seems this is becoming a necessity if we need to achieve acceptable performance with the current state of our heap usage, but we still need to very carefully separate the mutable versions of concepts from the immutable ones used throughout the code base. Suddenly making a DeletionTime mutable is not an acceptable change.

First batch of targeted comments below, mainly going over CompactionCursor.java.

else if (e instanceof CompactionInterruptedException)
throw (CompactionInterruptedException) e;
else
throw new IllegalStateException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this conversion addressing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensively checking for incorrect exception types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to change the existing behaviour (which wraps these into RuntimeException instead of IllegalStateException), as I don't know what may be relying on this wrapping (including customer tools).

* data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly
* an optimization).</li>
* </ul>
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the JavaDoc and explain the approach.

Copy link
Contributor

@blambov blambov Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the unchanged part of the doc isn't quite right. This is not creating a compacted iterator, an iterator of any kind, or even a cursor of any kind.

The class pulls data from multiple cursors and pushes it into a given writer. This is fundamentally different from the way compaction works in other code. Please explain the difference, because it is critical to understanding the class and being able to work on it.

Copy link
Contributor

@blambov blambov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next batch of comments.

}
if (!UnfilteredSerializer.hasAllColumns(flags))
{
// TODO: re-implement GC free
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataStax's branch has an implementation of it.

}

public void updateClusteringValues(ClusteringDescriptor newClustering) {
if (newClustering == null || newClustering.clusteringKind().isBoundary())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to copy the comment from updateClusteringValuesByBoundOrBoundary to explain skipping boundaries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure which comment you mean

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for doing nothing on isBoundary is not obvious. This is the comment I am referring to:

        // In a SSTable, every opening marker will be closed, so the start of a range tombstone marker will never be
        // the maxClustering (the corresponding close might though) and there is no point in doing the comparison
        // (and vice-versa for the close). By the same reasoning, a boundary will never be either the min or max
        // clustering, and we can save on comparisons.

Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
Slice coveredClustering;
if (minClusteringDescriptor.clusteringKind() != ClusteringPrefix.Kind.EXCL_START_BOUND) // min is end only if the descriptors are unused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum can certainly be EXCL_START_BOUND when it is used, if a partition starts with a range tombstone. The maximum, on the other hand, can't.

If you want to do this by a single operation (and also remove the minClusteringDescriptor.clusteringColumnsBound() == 0 check in updateClusteringValues), you can change the uninitialized min kind to SSTABLE_UPPER_BOUND, because that won't ever be given to updateClusteringValues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mirroring existing code AFAICT:

if (minClustering == ClusteringBound.MAX_START)
                minClustering = clustering;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAX_START also has an empty values array, while this line only checks the kind of clustering (i.e. the && minClusteringDescriptor.clusteringColumnsBound() == 0 part is missing here). The additional check is necessary, as the code as it stands will do the wrong thing for sstables whose lower limit is a range tombstone.

Copy link
Contributor

@blambov blambov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next batch of comments.

import org.apache.cassandra.dht.Murmur3Partitioner;
import org.jctools.util.UnsafeAccess;

public class ReusableLongToken extends Murmur3Partitioner.LongToken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This shouldn't need to be public.

import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;

public class ReusableDecoratedKey extends DecoratedKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is pretty hacky. It shouldn't be hard to move the support for reusable tokens to the partitioner (throwing exceptions for all except Murmur and local).

return state;
}

private int checkNextFlags() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller of this method appears to be pretty well aware what kind of flags/state it expects this to be called in. Would it make sense to split it into checkNext(Partition|Unfiltered|Cell)Flags?

appendBIGIndex(partitionKey, partitionKeyLength, partitionStart, headerLength, partitionDeletionTime, partitionEnd);
}

private void appendBIGIndex(byte[] key, int keyLength, long partitionStart, int headerLength, DeletionTime partitionDeletionTime, long partitionEnd) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not easy to modify and reuse the index building code from BigFormatPartitionWriter? The duplication here seems quite unnecessary.

private final int indexBlockThreshold;


private SSTableCursorWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be split into a common SortedTableCursorWriter, with format-specific subclasses that instantiate the index builders it uses, and placed into the correct per-format packages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only BIG is supported, so I held off here. I think the index code split can be postponed to the time when the BTI format is supported. It may also be interesting to explore splitting the SSTable write phase and indexing phase so as to allow more flexibility in composing the phases (e.g. index on the fly/index per partition write/index at end of write, parallel index pass etc).

@Replaces(oldName = "enable_drop_compact_storage", converter = Converters.IDENTITY, deprecated = true)
public volatile boolean drop_compact_storage_enabled = false;

public boolean enable_cursor_compaction = ENABLE_CURSOR_COMPACTION.getBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the enable_X style of naming is deprecated. Could you rename this (as well as the getter and system property) to the preferred cursor_compaction_enabled?

else if (e instanceof CompactionInterruptedException)
throw (CompactionInterruptedException) e;
else
throw new IllegalStateException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to change the existing behaviour (which wraps these into RuntimeException instead of IllegalStateException), as I don't know what may be relying on this wrapping (including customer tools).

* data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly
* an optimization).</li>
* </ul>
*/
Copy link
Contributor

@blambov blambov Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the unchanged part of the doc isn't quite right. This is not creating a compacted iterator, an iterator of any kind, or even a cursor of any kind.

The class pulls data from multiple cursors and pushes it into a given writer. This is fundamentally different from the way compaction works in other code. Please explain the difference, because it is critical to understanding the class and being able to work on it.

public static boolean isSupported(AbstractCompactionStrategy.ScannerList scanners, AbstractCompactionController controller)
{
TableMetadata metadata = controller.cfs.metadata();
if (metadata.getTableDirectoryName().contains("system") ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SchemaConstants.isSystemKeyspace(table.keyspace) or some of the other variations in SchemaConstants for this check.

What makes the check necessary? Why isn't the partitioner class check sufficient?


for (SSTableReader reader : scanner.getBackingSSTables()) {
Version version = reader.descriptor.version;
if (!(version.format instanceof BigFormat))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input sstable format and the output format are not the same thing (we can be in the middle of an upgrade). This cursor's restriction is that it can't write the bti format, which we can't check by going through the source sstables -- we need to use DatabaseDescriptor.getSelectedSSTableFormat() instead.

{
if (DatabaseDescriptor.enableCursorCompaction()) {
try {
if (CompactionCursor.isSupported(scanners, controller))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still log whether we are doing cursor compaction.

* <ul>
* <li>PARTITION_START - Partition header is loaded in preparation for merge</li>
* <li>begining of unfiltered/end of partition - header is loaded, list is sorted after this point</li>
* <li>DONE - need to be reset</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear at all why the cursor "need to be reset". Also, we will keep resetting finished cursors again and again if e.g. only one scanner remains.

Could you explain both points?

Or maybe add an intermediate state that this method advances to DONE? Alternatively, only recognize the end of the file in the PARTITION_START processing, which we may need to do anyway to handle the state where a file is immediately exhausted (which is more likely to happen for partial scanners)?

* Sorts the cursors array in preparation for partition merge. This assumes cursors are in one of 3 states:
* <ul>
* <li>PARTITION_START - Partition header is loaded in preparation for merge</li>
* <li>begining of unfiltered/end of partition - header is loaded, list is sorted after this point</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why? Something in the sense of "If the cursor was used in the processing of the previous partition, its state would have advanced to PARTITION_START or DONE. Otherwise, it would remain positioned after the partition header, in one of these states."

/**
* Sorts the cursors array in preparation for partition merge. This assumes cursors are in one of 3 states:
* <ul>
* <li>PARTITION_START - Partition header is loaded in preparation for merge</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be "needs to be loaded"?

if (!mergedDeletion.isLive() && !purger.shouldPurge(mergedDeletion))
{
toWritePartitionDeletion = mergedDeletion;
maybeSwitchWriter(compactionAwareWriter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this may be slightly less efficient, but I would prefer the writing to be done by the caller, rechecking isLive on the purged deletion, for clarity.

Alternatively, rename the method to something that clarifies that it may also write the partition header.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants