Skip to content

Implement docIDRunEnd() on ES819TSDBDocValuesProducer #132939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public boolean advanceExact(int target) throws IOException {
doc = target;
return true;
}

@Override
public int docIDRunEnd() throws IOException {
return maxDoc;
}
}

private abstract static class SparseBinaryDocValues extends BinaryDocValues {
Expand Down Expand Up @@ -328,6 +333,11 @@ public int advance(int target) throws IOException {
public boolean advanceExact(int target) throws IOException {
return disi.advanceExact(target);
}

@Override
public int docIDRunEnd() throws IOException {
return disi.docIDRunEnd();
}
}

@Override
Expand Down Expand Up @@ -369,6 +379,11 @@ public int advance(int target) throws IOException {
public long cost() {
return ords.cost();
}

@Override
public int docIDRunEnd() throws IOException {
return ords.docIDRunEnd();
}
};
}

Expand Down Expand Up @@ -750,6 +765,11 @@ public int advance(int target) throws IOException {
public long cost() {
return ords.cost();
}

@Override
public int docIDRunEnd() throws IOException {
return ords.docIDRunEnd();
}
};
}

Expand Down Expand Up @@ -1086,6 +1106,11 @@ public boolean advanceExact(int target) {
public long cost() {
return maxDoc;
}

@Override
public int docIDRunEnd() {
return maxDoc;
}
};
} else {
final IndexedDISI disi = new IndexedDISI(
Expand Down Expand Up @@ -1127,6 +1152,11 @@ public long cost() {
public long longValue() {
return 0L;
}

@Override
public int docIDRunEnd() throws IOException {
return disi.docIDRunEnd();
}
};
}
}
Expand Down Expand Up @@ -1178,6 +1208,11 @@ public long cost() {
return maxDoc;
}

@Override
public int docIDRunEnd() {
return maxDoc;
}

@Override
public long longValue() throws IOException {
final int index = doc;
Expand Down Expand Up @@ -1286,6 +1321,11 @@ public long cost() {
return disi.cost();
}

@Override
public int docIDRunEnd() throws IOException {
return disi.docIDRunEnd();
}

@Override
public long longValue() throws IOException {
final int index = disi.index();
Expand Down Expand Up @@ -1406,6 +1446,11 @@ public long nextValue() throws IOException {
public int docValueCount() {
return count;
}

@Override
public int docIDRunEnd() {
return maxDoc;
}
};
} else {
// sparse
Expand Down Expand Up @@ -1463,6 +1508,11 @@ public int docValueCount() {
return count;
}

@Override
public int docIDRunEnd() throws IOException {
return disi.docIDRunEnd();
}

private void set() {
if (set == false) {
final int index = disi.index();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
Expand All @@ -49,6 +50,8 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.elasticsearch.test.ESTestCase.randomFrom;

public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {

final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat());
Expand Down Expand Up @@ -959,6 +962,134 @@ private static BulkNumericDocValues getBulkNumericDocValues(LeafReader leafReade
return (BulkNumericDocValues) DocValues.unwrapSingleton(leafReader.getSortedNumericDocValues(counterField));
}

public void testDocIDEndRun() throws IOException {
String timestampField = "@timestamp";
String hostnameField = "host.name";
long baseTimestamp = 1704067200000L;

var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField);
try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
long counter1 = 0;

long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };

// IndexedDISI stores ids in blocks of 4096. To test sparse end runs, we want a mixture of
// dense and sparse blocks, so we need the gap frequency to be larger than
// this value, but smaller than two blocks, and to index at least three blocks
int gap_frequency = 4500 + random().nextInt(2048);
int numDocs = 10000 + random().nextInt(10000);
int numHosts = numDocs / 20;

for (int i = 0; i < numDocs; i++) {
var d = new Document();

int batchIndex = i / numHosts;
String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex);
long timestamp = baseTimestamp + (1000L * i);

d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName)));
// Index sorting doesn't work with NumericDocValuesField:
d.add(new SortedNumericDocValuesField(timestampField, timestamp));
d.add(new NumericDocValuesField("counter", counter1++));
if (i % gap_frequency != 0) {
d.add(new NumericDocValuesField("sparse_counter", counter1));
}

int numGauge2 = 1 + random().nextInt(8);
for (int j = 0; j < numGauge2; j++) {
d.add(new SortedNumericDocValuesField("gauge", gauge2Values[(i + j) % gauge2Values.length]));
if (i % gap_frequency != 0) {
d.add(new SortedNumericDocValuesField("sparse_gauge", gauge2Values[(i + j) % gauge2Values.length]));
}
}

d.add(new SortedDocValuesField("tag", new BytesRef(randomFrom(tags))));
if (i % gap_frequency != 0) {
d.add(new SortedDocValuesField("sparse_tag", new BytesRef(randomFrom(tags))));
}

int numTags = 1 + random().nextInt(8);
for (int j = 0; j < numTags; j++) {
d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(i + j) % tags.length])));
if (i % gap_frequency != 0) {
d.add(new SortedSetDocValuesField("sparse_tags", new BytesRef(tags[(i + j) % tags.length])));
}
}

d.add(new BinaryDocValuesField("tags_as_bytes", new BytesRef(tags[i % tags.length])));
if (i % gap_frequency != 0) {
d.add(new BinaryDocValuesField("sparse_tags_as_bytes", new BytesRef(tags[i % tags.length])));
}

iw.addDocument(d);
if (i % 100 == 0) {
iw.commit();
}
}
iw.commit();

iw.forceMerge(1);

try (var reader = DirectoryReader.open(iw)) {
assertEquals(1, reader.leaves().size());
assertEquals(numDocs, reader.maxDoc());
var leaf = reader.leaves().get(0).reader();
var hostNameDV = leaf.getSortedDocValues(hostnameField);
assertNotNull(hostNameDV);
validateRunEnd(hostNameDV);
var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField));
assertNotNull(timestampDV);
validateRunEnd(timestampDV);
var counterOneDV = leaf.getNumericDocValues("counter");
assertNotNull(counterOneDV);
validateRunEnd(counterOneDV);
var sparseCounter = leaf.getNumericDocValues("sparse_counter");
assertNotNull(sparseCounter);
validateRunEnd(sparseCounter);
var gaugeOneDV = leaf.getSortedNumericDocValues("gauge");
assertNotNull(gaugeOneDV);
validateRunEnd(gaugeOneDV);
var sparseGaugeDV = leaf.getSortedNumericDocValues("sparse_gauge");
assertNotNull(sparseGaugeDV);
validateRunEnd(sparseGaugeDV);
var tagDV = leaf.getSortedDocValues("tag");
assertNotNull(tagDV);
validateRunEnd(tagDV);
var sparseTagDV = leaf.getSortedDocValues("sparse_tag");
assertNotNull(sparseTagDV);
validateRunEnd(sparseTagDV);
var tagsDV = leaf.getSortedSetDocValues("tags");
assertNotNull(tagsDV);
validateRunEnd(tagsDV);
var sparseTagsDV = leaf.getSortedSetDocValues("sparse_tags");
assertNotNull(sparseTagsDV);
validateRunEnd(sparseTagsDV);
var tagBytesDV = leaf.getBinaryDocValues("tags_as_bytes");
assertNotNull(tagBytesDV);
validateRunEnd(tagBytesDV);
var sparseTagBytesDV = leaf.getBinaryDocValues("sparse_tags_as_bytes");
assertNotNull(sparseTagBytesDV);
validateRunEnd(sparseTagBytesDV);
}
}
}

private void validateRunEnd(DocIdSetIterator iterator) throws IOException {
int runCount = 0;
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
int runLength = iterator.docIDRunEnd() - iterator.docID() - 1;
if (runLength > 1) {
runCount++;
for (int i = 0; i < runLength; i++) {
int expected = iterator.docID() + 1;
assertEquals(expected, iterator.advance(expected));
}
}
}
assertTrue("Expected docid runs of greater than 1", runCount > 0);
}

private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
var config = new IndexWriterConfig();
if (hostnameField != null) {
Expand Down