Skip to content

Commit 3af0568

Browse files
authored
Speed up read dimension fields in TS (elastic#128283)
When reading dimension fields in the TS command, we can skip reading values while the `tsid` remains unchanged to improve performance. I benchmarked this change with the following query: ``` POST /_query { "query": "TS metrics-hostmetricsreceiver.otel-default | WHERE @timestamp >= \"2025-05-08T18:00:08.001Z\" | STATS cpu = avg(rate(`metrics.process.cpu.time`)) BY host.name, BUCKET(@timestamp, 5 minute)" } ``` The total query time was reduced from 51ms to 39ms, with processing time in the time-series source operator reduced from 26ms to 17ms.
1 parent 2bf6d54 commit 3af0568

File tree

5 files changed

+92
-16
lines changed

5 files changed

+92
-16
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,10 @@ public interface ShardContext {
5353
* Returns something to load values from this field into a {@link Block}.
5454
*/
5555
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
56+
57+
/**
58+
* Returns the {@link MappedFieldType} for the given field name.
59+
* By default, this delegate to {@link org.elasticsearch.index.query.SearchExecutionContext#getFieldType(String)}
60+
*/
61+
MappedFieldType fieldType(String name);
5662
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,12 @@ public Page getCheckedOutput() throws IOException {
133133
if (docCollector != null) {
134134
blocks[blockIndex++] = docCollector.build().asBlock();
135135
}
136-
blocks[blockIndex++] = tsHashesBuilder.build().asBlock();
136+
OrdinalBytesRefVector tsidVector = tsHashesBuilder.build();
137+
blocks[blockIndex++] = tsidVector.asBlock();
137138
tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize));
138139
blocks[blockIndex++] = timestampsBuilder.build().asBlock();
139140
timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
140-
System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExtracts.size());
141+
System.arraycopy(fieldsReader.buildBlocks(tsidVector.getOrdinalsVector()), 0, blocks, blockIndex, fieldsToExtracts.size());
141142
page = new Page(currentPagePos, blocks);
142143
currentPagePos = 0;
143144
}
@@ -217,6 +218,7 @@ void readDocsForNextPage() throws IOException {
217218
}
218219

219220
private boolean readValuesForOneTsid(PriorityQueue<LeafIterator> sub) throws IOException {
221+
boolean first = true;
220222
do {
221223
LeafIterator top = sub.top();
222224
currentPagePos++;
@@ -226,7 +228,8 @@ private boolean readValuesForOneTsid(PriorityQueue<LeafIterator> sub) throws IOE
226228
}
227229
tsHashesBuilder.appendOrdinal();
228230
timestampsBuilder.appendLong(top.timestamp);
229-
fieldsReader.readValues(top.segmentOrd, top.docID);
231+
fieldsReader.readValues(top.segmentOrd, top.docID, first == false);
232+
first = false;
230233
if (top.nextDoc()) {
231234
sub.updateTop();
232235
} else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) {
@@ -350,6 +353,7 @@ static final class ShardLevelFieldsReader implements Releasable {
350353
private final BlockLoaderFactory blockFactory;
351354
private final SegmentLevelFieldsReader[] segments;
352355
private final BlockLoader[] loaders;
356+
private final boolean[] dimensions;
353357
private final Block.Builder[] builders;
354358
private final StoredFieldsSpec storedFieldsSpec;
355359
private final SourceLoader sourceLoader;
@@ -377,10 +381,18 @@ static final class ShardLevelFieldsReader implements Releasable {
377381
sourceLoader = null;
378382
}
379383
this.storedFieldsSpec = storedFieldsSpec;
384+
this.dimensions = new boolean[fields.size()];
385+
for (int i = 0; i < fields.size(); i++) {
386+
dimensions[i] = shardContext.fieldType(fields.get(i).name()).isDimension();
387+
}
380388
}
381389

382-
void readValues(int segment, int docID) throws IOException {
383-
segments[segment].read(docID, builders);
390+
/**
391+
* For dimension fields, skips reading them when {@code nonDimensionFieldsOnly} is true,
392+
* since they only need to be read once per tsid.
393+
*/
394+
void readValues(int segment, int docID, boolean nonDimensionFieldsOnly) throws IOException {
395+
segments[segment].read(docID, builders, nonDimensionFieldsOnly, dimensions);
384396
}
385397

386398
void prepareForReading(int estimatedSize) throws IOException {
@@ -396,12 +408,46 @@ void prepareForReading(int estimatedSize) throws IOException {
396408
}
397409
}
398410

399-
Block[] buildBlocks() {
400-
Block[] blocks = Block.Builder.buildAll(builders);
401-
Arrays.fill(builders, null);
411+
Block[] buildBlocks(IntVector tsidOrdinals) {
412+
final Block[] blocks = new Block[loaders.length];
413+
try {
414+
for (int i = 0; i < builders.length; i++) {
415+
if (dimensions[i]) {
416+
blocks[i] = buildBlockForDimensionField(builders[i], tsidOrdinals);
417+
} else {
418+
blocks[i] = builders[i].build();
419+
}
420+
}
421+
Arrays.fill(builders, null);
422+
} finally {
423+
if (blocks.length > 0 && blocks[blocks.length - 1] == null) {
424+
Releasables.close(blocks);
425+
}
426+
}
402427
return blocks;
403428
}
404429

430+
private Block buildBlockForDimensionField(Block.Builder builder, IntVector tsidOrdinals) {
431+
try (var values = builder.build()) {
432+
if (values.asVector() instanceof BytesRefVector bytes) {
433+
tsidOrdinals.incRef();
434+
values.incRef();
435+
return new OrdinalBytesRefVector(tsidOrdinals, bytes).asBlock();
436+
} else if (values.areAllValuesNull()) {
437+
return blockFactory.factory.newConstantNullBlock(tsidOrdinals.getPositionCount());
438+
} else {
439+
final int positionCount = tsidOrdinals.getPositionCount();
440+
try (var newBuilder = values.elementType().newBlockBuilder(positionCount, blockFactory.factory)) {
441+
for (int p = 0; p < positionCount; p++) {
442+
int pos = tsidOrdinals.getInt(p);
443+
newBuilder.copyFrom(values, pos, pos + 1);
444+
}
445+
return newBuilder.build();
446+
}
447+
}
448+
}
449+
}
450+
405451
@Override
406452
public void close() {
407453
Releasables.close(builders);
@@ -435,10 +481,18 @@ private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec st
435481
}
436482
}
437483

438-
void read(int docId, Block.Builder[] builder) throws IOException {
484+
void read(int docId, Block.Builder[] builder, boolean nonDimensionFieldsOnly, boolean[] dimensions) throws IOException {
439485
storedFields.advanceTo(docId);
440-
for (int i = 0; i < rowStride.length; i++) {
441-
rowStride[i].read(docId, storedFields, builder[i]);
486+
if (nonDimensionFieldsOnly) {
487+
for (int i = 0; i < rowStride.length; i++) {
488+
if (dimensions[i] == false) {
489+
rowStride[i].read(docId, storedFields, builder[i]);
490+
}
491+
}
492+
} else {
493+
for (int i = 0; i < rowStride.length; i++) {
494+
rowStride[i].read(docId, storedFields, builder[i]);
495+
}
442496
}
443497
}
444498
}
@@ -480,9 +534,9 @@ public void close() {
480534
Releasables.close(dictBuilder, ordinalsBuilder);
481535
}
482536

483-
BytesRefVector build() throws IOException {
537+
OrdinalBytesRefVector build() throws IOException {
484538
BytesRefVector dict = null;
485-
BytesRefVector result = null;
539+
OrdinalBytesRefVector result = null;
486540
IntVector ordinals = null;
487541
try {
488542
dict = dictBuilder.build();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,5 +324,10 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
324324
public String shardIdentifier() {
325325
return "test";
326326
}
327+
328+
@Override
329+
public MappedFieldType fieldType(String name) {
330+
throw new UnsupportedOperationException();
331+
}
327332
}
328333
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,17 @@ public static TimeSeriesSourceOperatorFactory createTimeSeriesSourceOperator(
438438
} catch (IOException e) {
439439
throw new UncheckedIOException(e);
440440
}
441-
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
441+
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0) {
442+
@Override
443+
public MappedFieldType fieldType(String name) {
444+
for (ExtractField e : extractFields) {
445+
if (e.ft.name().equals(name)) {
446+
return e.ft;
447+
}
448+
}
449+
throw new IllegalArgumentException("Unknown field [" + name + "]");
450+
}
451+
};
442452
Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
443453

444454
var fieldInfos = extractFields.stream()

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private static class DefaultShardContextForUnmappedField extends DefaultShardCon
177177
}
178178

179179
@Override
180-
protected @Nullable MappedFieldType fieldType(String name) {
180+
public @Nullable MappedFieldType fieldType(String name) {
181181
var superResult = super.fieldType(name);
182182
return superResult == null && name.equals(unmappedEsField.getName())
183183
? new KeywordFieldMapper.KeywordFieldType(name, false /* isIndexed */, false /* hasDocValues */, Map.of() /* meta */)
@@ -459,7 +459,8 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() {
459459
return loader;
460460
}
461461

462-
protected @Nullable MappedFieldType fieldType(String name) {
462+
@Override
463+
public @Nullable MappedFieldType fieldType(String name) {
463464
return ctx.getFieldType(name);
464465
}
465466

0 commit comments

Comments
 (0)