From b309f6d98cbb8a68c65ad6967229a1eb24b3cab0 Mon Sep 17 00:00:00 2001 From: Marcus Eriksson Date: Wed, 8 Oct 2025 15:44:23 +0200 Subject: [PATCH] Avoid re-initializing underlying iterator in LazilyInitializedUnfilteredRowIterator after closing Patch by marcuse; reviewed by Aleksey Yeschenko and Branimir Lambov for CASSANDRA-20972 --- CHANGES.txt | 1 + ...azilyInitializedUnfilteredRowIterator.java | 12 +++-- .../distributed/test/DistinctReadTest.java | 47 +++++++++++++++++++ .../db/compaction/AntiCompactionTest.java | 40 ++++++++-------- .../LeveledCompactionStrategyTest.java | 8 +++- .../db/compaction/TTLExpiryTest.java | 6 ++- .../rows/ThrottledUnfilteredIteratorTest.java | 6 +-- .../io/sstable/SSTableScannerTest.java | 7 ++- 8 files changed, 96 insertions(+), 31 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 4c983ca1e492..aebbae0693d8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0.6 + * Avoid re-initializing underlying iterator in LazilyInitializedUnfilteredRowIterator after closing (CASSANDRA-20972) * Flush SAI segment builder when current SSTable writer is switched (CASSANDRA-20752) * Throw RTE instead of FSError when RTE is thrown from FileUtis.write in TOCComponent (CASSANDRA-20917) * Upgrade jackson-dataformat-yaml to 2.19.2 and snakeyaml to 2.1 (CASSANDRA-18875) diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java index 8a8b22966cbe..516b1d8cb254 100644 --- a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java @@ -32,8 +32,8 @@ public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIterator implements UnfilteredRowIterator { private final DecoratedKey partitionKey; - private UnfilteredRowIterator iterator; + private boolean closed = false; public LazilyInitializedUnfilteredRowIterator(DecoratedKey partitionKey) { @@ -97,15 +97,17 @@ protected Unfiltered computeNext() public void close() { + // don't use iterator == null as indicator if this is closed since some methods are called after the iterator is + // closed and maybeInit would re-initialize the underlying iterator in that case + closed = true; if (iterator != null) - { iterator.close(); - iterator = null; - } } public boolean isOpen() { - return iterator != null; + if (closed) + return false; + return iterator != null; // for backwards compatibility - if `maybeInit` has not been run on this class, consider it not open, for example SSTableExport seems to rely on this } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java new file mode 100644 index 000000000000..5bdb0910f82f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; + +public class DistinctReadTest extends TestBaseImpl +{ + @Test + public void test() throws IOException + { + try (Cluster cluster = init(Cluster.build() + .withNodes(1) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int, ck int, x int, PRIMARY KEY (id, ck))")); + cluster.coordinator(1).execute(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 100 WHERE id = 1 AND ck < 10 "), ConsistencyLevel.ONE); + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (id, ck, x) VALUES (1, 5, 7) USING TIMESTAMP 101"), ConsistencyLevel.ONE); + cluster.get(1).flush(KEYSPACE); + // all these failed before fix; + cluster.coordinator(1).execute(withKeyspace("select distinct id from %s.tbl where token(id) > " + Long.MIN_VALUE), ConsistencyLevel.ONE); + cluster.coordinator(1).execute(withKeyspace("select distinct id from %s.tbl where id > 0 allow filtering"), ConsistencyLevel.ONE); + cluster.coordinator(1).execute(withKeyspace("select id from %s.tbl where token(id) > " + Long.MIN_VALUE +" PER PARTITION LIMIT 1"), ConsistencyLevel.ONE); + } + } +} diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 97da2a4076db..2b1181e6c666 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -169,26 +169,28 @@ private SSTableStats antiCompactRanges(ColumnFamilyStore store, RangesAtEndpoint { while (scanner.hasNext()) { - UnfilteredRowIterator row = scanner.next(); - Token token = row.partitionKey().getToken(); - if (sstable.isPendingRepair() && !sstable.isTransient()) + try (UnfilteredRowIterator row = scanner.next()) { - assertTrue(fullContains.test(token)); - assertFalse(transContains.test(token)); - stats.pendingKeys++; - } - else if (sstable.isPendingRepair() && sstable.isTransient()) - { - - assertTrue(transContains.test(token)); - assertFalse(fullContains.test(token)); - stats.transKeys++; - } - else - { - assertFalse(fullContains.test(token)); - assertFalse(transContains.test(token)); - stats.unrepairedKeys++; + Token token = row.partitionKey().getToken(); + if (sstable.isPendingRepair() && !sstable.isTransient()) + { + assertTrue(fullContains.test(token)); + assertFalse(transContains.test(token)); + stats.pendingKeys++; + } + else if (sstable.isPendingRepair() && sstable.isTransient()) + { + + assertTrue(transContains.test(token)); + assertFalse(fullContains.test(token)); + stats.transKeys++; + } + else + { + assertFalse(fullContains.test(token)); + assertFalse(transContains.test(token)); + stats.unrepairedKeys++; + } } } } diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index ec8c8ad97509..3aeacb0fbb15 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -51,6 +51,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -286,7 +287,12 @@ public void testCompactionProgress() throws Exception ISSTableScanner scanner = scanners.get(0); // scan through to the end while (scanner.hasNext()) - scanner.next(); + { + try (UnfilteredRowIterator ignored = scanner.next()) + { + // just close the iterator + } + } // scanner.getCurrentPosition should be equal to total bytes of L1 sstables assertEquals(scanner.getCurrentPosition(), SSTableReader.getTotalUncompressedBytes(sstables)); diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index e7e97bed9988..6e1dd7fd36f4 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@ -258,8 +258,10 @@ public void testNoExpire() throws InterruptedException, IOException assertTrue(scanner.hasNext()); while(scanner.hasNext()) { - UnfilteredRowIterator iter = scanner.next(); - assertEquals(Util.dk(noTTLKey), iter.partitionKey()); + try (UnfilteredRowIterator iter = scanner.next()) + { + assertEquals(Util.dk(noTTLKey), iter.partitionKey()); + } } scanner.close(); } diff --git a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java index d2a9aa782400..3b78fffbdbe2 100644 --- a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java +++ b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java @@ -214,8 +214,6 @@ else if (ck1 == ck2 - 1) // cell tombstone { try (UnfilteredRowIterator rowIterator = scanner.next()) { - // only 1 partition data - assertFalse(scanner.hasNext()); List expectedUnfiltereds = new ArrayList<>(); rowIterator.forEachRemaining(expectedUnfiltereds::add); @@ -227,15 +225,17 @@ else if (ck1 == ck2 - 1) // cell tombstone assertTrue(scannerForThrottle.hasNext()); try (UnfilteredRowIterator rowIteratorForThrottle = scannerForThrottle.next()) { - assertFalse(scannerForThrottle.hasNext()); verifyThrottleIterator(expectedUnfiltereds, rowIteratorForThrottle, new ThrottledUnfilteredIterator(rowIteratorForThrottle, throttle), throttle); } + assertFalse(scannerForThrottle.hasNext()); } } } + // only 1 partition data + assertFalse(scanner.hasNext()); } } diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index 73195b0617ab..136b23ff8623 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@ -345,7 +345,12 @@ public void testSingleDataRangeWithMovedStart() throws IOException // full range scan ISSTableScanner scanner = sstable.getScanner(); for (int i = 4; i < 10; i++) - assertEquals(toKey(i), new String(scanner.next().partitionKey().getKey().array())); + { + try (UnfilteredRowIterator row = scanner.next()) + { + assertEquals(toKey(i), new String(row.partitionKey().getKey().array())); + } + } scanner.close();