Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9368327
Add cursor based optimized compaction path (WIP)
Jun 22, 2025
759dc2c
Merge branch 'trunk' into compaction-work-pr-prep
nitsanw Sep 29, 2025
d75d59c
Fix MEGABYTE constant
Oct 9, 2025
35ea514
Fix 0x11/0x10 should be 0b11/0b10
Oct 14, 2025
33b9e47
Introduce ENABLE_CURSOR_COMPACTION controls via CassandraRelevantProp…
Oct 14, 2025
d46511d
Revert `matched` comments left to track stats tracking
Oct 14, 2025
b417274
Revert change to RandomAccessReader and match row skipping logic from…
Oct 14, 2025
85379f1
Improve CompactionCursor javadoc
Oct 14, 2025
097b995
Extract `isSupported` from constructor
Oct 14, 2025
1af0f0e
Fix Trasnactions/Transformations
Oct 15, 2025
7576864
Add comment and rename `sortForPartitionMerge` -> `prepareForPartitio…
Oct 15, 2025
2205e48
Typo: `preturbed` -> `perturbed`
Oct 15, 2025
dea15c9
Javadoc for bubbleInsertToPreSorted and minor refactor
Oct 15, 2025
5d200ef
Typo: passed -> past
Oct 15, 2025
1a1c761
Remove redundant TODOs
Oct 15, 2025
b9dc6db
Revert 'unused' params
Oct 20, 2025
5accdd5
Rename `ElementDescriptor` -> `UnfilteredDescriptor` (and fallout)
Oct 20, 2025
17be5f4
Remove unused parameter
Oct 20, 2025
3b0f0a9
Remove unused method
Oct 20, 2025
ae65b7e
Fix indentation
Oct 20, 2025
62837f9
Move SSTableCursorPipeUtil to benchmarks
Oct 20, 2025
4064d32
Rename `partitionLength` back to `finishResult` and clarify comment
Oct 20, 2025
797ace1
Remove unused methods
Oct 20, 2025
855a740
Improve bubbleInsertElementToPreSorted, delay element insert
Oct 21, 2025
beef9ad
Remove redundant cursor status check
Oct 21, 2025
b19b959
Simplify deletion merging loop, clarify partitionDeletion variable names
Oct 21, 2025
f5718b3
Neaten up SSTableCursorReader
Oct 22, 2025
87a1c1c
Dead code removal
Oct 22, 2025
ff5eff1
Revert making classes public
Oct 22, 2025
ed48fe5
Transform LivenessInfo an interface
Oct 22, 2025
a8bc083
Fix javadoc
Oct 22, 2025
be8b4e3
Fix intellij warnings
Oct 22, 2025
011213a
Explicitly split DeletionTime implementations
Oct 29, 2025
4d74fce
Rely on nextElementEquality in findMergeLimit
Oct 30, 2025
b9f5802
Refactor prepareAndSortForMerge code
Oct 30, 2025
308a3b3
Move merge limit == 0 out of mergeRows
Oct 31, 2025
562e437
Add TODO for clustering read/skip
Oct 31, 2025
37104f6
Simplify ClusteringComparator code and remove redundant code
Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/Clustering.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,16 @@ public String toString(TableMetadata metadata)
/**
* Serializer for Clustering object.
* <p>
* Because every clustering in a given table must have the same size (ant that size cannot actually change once the table
* Because every clustering in a given table must have the same size (and that size cannot actually change once the table
* has been defined), we don't record that size.
*/
public static class Serializer
{
public void serialize(Clustering<?> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
public void serialize(Clustering<?> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
assert clustering != STATIC_CLUSTERING : "We should never serialize a static clustering";
assert clustering.size() == types.size() : "Invalid clustering for the table: " + clustering;
ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types);
ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, unused, types);
}

public ByteBuffer serialize(Clustering<?> clustering, int version, List<AbstractType<?>> types)
Expand All @@ -158,9 +158,9 @@ public ByteBuffer serialize(Clustering<?> clustering, int version, List<Abstract
}
}

public long serializedSize(Clustering<?> clustering, int version, List<AbstractType<?>> types)
public long serializedSize(Clustering<?> clustering, int unused, List<AbstractType<?>> types)
{
return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types);
return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, unused, types);
}

public void skip(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ default String toString(ClusteringComparator comparator)

public static class Serializer
{
public <T> void serialize(ClusteringBoundOrBoundary<T> bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
public <T> void serialize(ClusteringBoundOrBoundary<T> bound, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
out.writeByte(bound.kind().ordinal());
out.writeShort(bound.size());
ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, unused, types);
}

public <T> long serializedSize(ClusteringBoundOrBoundary<T> bound, int version, List<AbstractType<?>> types)
Expand Down
129 changes: 126 additions & 3 deletions src/java/org/apache/cassandra/db/ClusteringComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;

import org.apache.cassandra.io.sstable.ClusteringDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.serializers.MarshalException;

import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.vint.VIntCoding;

import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
Expand Down Expand Up @@ -156,6 +159,126 @@ public <V1, V2> int compare(ClusteringPrefix<V1> c1, ClusteringPrefix<V2> c2)
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
}

public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2)
{
final int c1Size = c1.clusteringColumnsBound();
final int c2Size = c2.clusteringColumnsBound();
final int minColumns = Math.min(c1Size, c2Size);

final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns);
if (cmp != 0)
return cmp;

final ClusteringPrefix.Kind c1Kind = c1.clusteringKind();
final ClusteringPrefix.Kind c2Kind = c2.clusteringKind();
if (c1Size == c2Size)
{
return ClusteringPrefix.Kind.compare(c1Kind, c2Kind);
}

return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering;
}

public static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2) {
return compare(types, c1, c2, types.length);
}

private static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2, int size)
{
long clusteringBlock1 = 0;
long clusteringBlock2 = 0;
final int position1 = c1.position();
final int position2 = c2.position();
final int limit1 = c1.limit();
final int limit2 = c2.limit();
try
{
int ofst1 = position1;
int ofst2 = position2;
for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++)
{
if (clusteringIndex % 32 == 0)
{
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1);
ofst1 += VIntCoding.computeUnsignedVIntSize(clusteringBlock1);
clusteringBlock2 = VIntCoding.getUnsignedVInt(c2, ofst2, limit2);
ofst2 += VIntCoding.computeUnsignedVIntSize(clusteringBlock2);
}

AbstractType<?> type = types[clusteringIndex];

boolean v1Present = (clusteringBlock1 & 0x11) == 0;
boolean v2Present = (clusteringBlock2 & 0x11) == 0;

if (v1Present && v2Present)
{
boolean isByteOrderComparable = type.isByteOrderComparable;
int vlen1,vlen2;
if (type.isValueLengthFixed())
{
vlen1 = vlen2 = type.valueLengthIfFixed();
}
else
{
vlen1 = VIntCoding.getUnsignedVInt32(c1, ofst1, limit1);
ofst1 += VIntCoding.computeUnsignedVIntSize(vlen1);
vlen2 = VIntCoding.getUnsignedVInt32(c2, ofst2, limit2);
ofst2 += VIntCoding.computeUnsignedVIntSize(vlen2);
}
int v1Limit = ofst1 + vlen1;
if (v1Limit > limit1)
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
c1.position(ofst1).limit(v1Limit);
int v2Limit = ofst2 + vlen2;
if (v2Limit > limit2)
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
c2.position(ofst2).limit(v2Limit);
int cmp = isByteOrderComparable ?
ByteBufferUtil.compareUnsigned(c1, c2) :
type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance);
if (cmp != 0)
return cmp;
c1.limit(limit1);
c2.limit(limit2);
ofst1 += vlen1;
ofst2 += vlen2;
}
// present > not present
else if (v1Present && !v2Present)
{
return 1;
}
else if (!v1Present && v2Present)
{
return -1;
}
else
{
boolean v1Null = (clusteringBlock1 & 0x10) == 0;
boolean v2Null = (clusteringBlock2 & 0x10) == 0;
// empty > null
if (!v1Null && v2Null)
{
return 1;
}
else if (v1Null && !v2Null)
{
return -1;
}
// empty == empty, continue...
}
clusteringBlock1 = clusteringBlock1 >>> 2;
clusteringBlock2 = clusteringBlock2 >>> 2;
}
}
finally
{
c1.position(position1).limit(limit1);
c2.position(position2).limit(limit2);
}
return 0;
}

public <V1, V2> int compare(Clustering<V1> c1, Clustering<V2> c2)
{
return compare(c1, c2, size());
Expand Down
18 changes: 9 additions & 9 deletions src/java/org/apache/cassandra/db/ClusteringPrefix.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,18 @@ public default String clusteringString(List<AbstractType<?>> types)

public static class Serializer
{
public void serialize(ClusteringPrefix<?> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
public void serialize(ClusteringPrefix<?> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
// We shouldn't serialize static clusterings
assert clustering.kind() != Kind.STATIC_CLUSTERING;
if (clustering.kind() == Kind.CLUSTERING)
{
out.writeByte(clustering.kind().ordinal());
Clustering.serializer.serialize((Clustering<?>)clustering, out, version, types);
Clustering.serializer.serialize((Clustering<?>)clustering, out, unused, types);
}
else
{
ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary<?>)clustering, out, version, types);
ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary<?>)clustering, out, unused, types);
}
}

Expand All @@ -462,17 +462,17 @@ public ClusteringPrefix<byte[]> deserialize(DataInputPlus in, int version, List<
return ClusteringBoundOrBoundary.serializer.deserializeValues(in, kind, version, types);
}

public long serializedSize(ClusteringPrefix<?> clustering, int version, List<AbstractType<?>> types)
public long serializedSize(ClusteringPrefix<?> clustering, int unused, List<AbstractType<?>> types)
{
// We shouldn't serialize static clusterings
assert clustering.kind() != Kind.STATIC_CLUSTERING;
if (clustering.kind() == Kind.CLUSTERING)
return 1 + Clustering.serializer.serializedSize((Clustering<?>)clustering, version, types);
return 1 + Clustering.serializer.serializedSize((Clustering<?>)clustering, unused, types);
else
return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary<?>)clustering, version, types);
return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary<?>)clustering, unused, types);
}

<V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
<V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
int offset = 0;
int clusteringSize = clustering.size();
Expand All @@ -496,7 +496,7 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl
}
}

<V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int version, List<AbstractType<?>> types)
<V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int unused, List<AbstractType<?>> types)
{
long result = 0;
int offset = 0;
Expand All @@ -519,7 +519,7 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
return result;
}

byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
{
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway).
assert size > 0;
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,11 @@ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
return partitionKeySetIgnoreGcGrace.contains(dk);
}

public boolean shouldIgnoreGcGraceForAnyKey()
{
return !partitionKeySetIgnoreGcGrace.isEmpty();
}

public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
Expand Down Expand Up @@ -529,6 +531,7 @@ public void serializeSubset(Collection<ColumnMetadata> columns, Columns superset
int supersetCount = superset.size();
if (columnCount == supersetCount)
{
/** This is prevented by caller for row serialization: {@link UnfilteredSerializer#serializeRowBody(Row, int, SerializationHelper, DataOutputPlus)}*/
out.writeUnsignedVInt32(0);
}
else if (supersetCount < 64)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/DecoratedKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public ByteSource asComparableBytes(Version version)
// The OSS50 version avoids this by adding a terminator.
return ByteSource.withTerminatorMaybeLegacy(version,
ByteSource.END_OF_STREAM,
token.asComparableBytes(version),
getToken().asComparableBytes(version),
keyComparableBytes(version));
}

Expand All @@ -127,7 +127,7 @@ public ByteComparable asComparableBound(boolean before)

return ByteSource.withTerminator(
before ? ByteSource.LT_NEXT_COMPONENT : ByteSource.GT_NEXT_COMPONENT,
token.asComparableBytes(version),
getToken().asComparableBytes(version),
keyComparableBytes(version));
};
}
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/DeletionPurger.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

public interface DeletionPurger
{
public static final DeletionPurger PURGE_ALL = (ts, ldt) -> true;
DeletionPurger PURGE_ALL = (ts, ldt) -> true;

public boolean shouldPurge(long timestamp, long localDeletionTime);
boolean shouldPurge(long timestamp, long localDeletionTime);

public default boolean shouldPurge(DeletionTime dt)
default boolean shouldPurge(DeletionTime dt)
{
return !dt.isLive() && shouldPurge(dt.markedForDeleteAt(), dt.localDeletionTime());
}

public default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
{
return !liveness.isLive(nowInSec) && shouldPurge(liveness.timestamp(), liveness.localExpirationTime());
}
Expand Down
Loading