Skip to content

Commit 1d1e85d

Browse files
Apply recent TSDB codec merge optimizations to binary values (elastic#127278) (elastic#127346)
Applies the merge optimizations from elastic#126499 and elastic#126732 to binary field types for the ES819 codec.
1 parent 2e3df0c commit 1d1e85d

File tree

5 files changed

+279
-61
lines changed

5 files changed

+279
-61
lines changed

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
*/
2121
class DocValuesConsumerUtil {
2222

23-
static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1);
23+
static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1, -1, -1);
2424

25-
record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField) {}
25+
record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField, int minLength, int maxLength) {}
2626

2727
static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) {
2828
if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) {
@@ -38,6 +38,8 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
3838

3939
long sumNumValues = 0;
4040
int sumNumDocsWithField = 0;
41+
int minLength = Integer.MAX_VALUE;
42+
int maxLength = 0;
4143

4244
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
4345
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
@@ -86,6 +88,14 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
8688
}
8789
}
8890
}
91+
case BINARY -> {
92+
var entry = tsdbDocValuesProducer.binaries.get(fieldInfo.number);
93+
if (entry != null) {
94+
sumNumDocsWithField += entry.numDocsWithField;
95+
minLength = Math.min(minLength, entry.minLength);
96+
maxLength = Math.max(maxLength, entry.maxLength);
97+
}
98+
}
8999
default -> throw new IllegalStateException("unexpected doc values producer type: " + fieldInfo.getDocValuesType());
90100
}
91101
} else {
@@ -96,7 +106,7 @@ static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, Me
96106
}
97107
}
98108

99-
return new MergeStats(true, sumNumValues, sumNumDocsWithField);
109+
return new MergeStats(true, sumNumValues, sumNumDocsWithField, minLength, maxLength);
100110
}
101111

102112
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 130 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -247,71 +247,146 @@ public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) t
247247
}
248248
}
249249

250+
@Override
251+
public void mergeBinaryField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException {
252+
var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo);
253+
if (result.supported()) {
254+
mergeBinaryField(result, mergeFieldInfo, mergeState);
255+
} else {
256+
super.mergeBinaryField(mergeFieldInfo, mergeState);
257+
}
258+
}
259+
250260
@Override
251261
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
252262
meta.writeInt(field.number);
253263
meta.writeByte(ES819TSDBDocValuesFormat.BINARY);
254264

255-
BinaryDocValues values = valuesProducer.getBinary(field);
256-
long start = data.getFilePointer();
257-
meta.writeLong(start); // dataOffset
258-
int numDocsWithField = 0;
259-
int minLength = Integer.MAX_VALUE;
260-
int maxLength = 0;
261-
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
262-
numDocsWithField++;
263-
BytesRef v = values.binaryValue();
264-
int length = v.length;
265-
data.writeBytes(v.bytes, v.offset, v.length);
266-
minLength = Math.min(length, minLength);
267-
maxLength = Math.max(length, maxLength);
268-
}
269-
assert numDocsWithField <= maxDoc;
270-
meta.writeLong(data.getFilePointer() - start); // dataLength
271-
272-
if (numDocsWithField == 0) {
273-
meta.writeLong(-2); // docsWithFieldOffset
274-
meta.writeLong(0L); // docsWithFieldLength
275-
meta.writeShort((short) -1); // jumpTableEntryCount
276-
meta.writeByte((byte) -1); // denseRankPower
277-
} else if (numDocsWithField == maxDoc) {
278-
meta.writeLong(-1); // docsWithFieldOffset
279-
meta.writeLong(0L); // docsWithFieldLength
280-
meta.writeShort((short) -1); // jumpTableEntryCount
281-
meta.writeByte((byte) -1); // denseRankPower
282-
} else {
283-
long offset = data.getFilePointer();
284-
meta.writeLong(offset); // docsWithFieldOffset
285-
values = valuesProducer.getBinary(field);
286-
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
287-
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
288-
meta.writeShort(jumpTableEntryCount);
289-
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
290-
}
265+
if (valuesProducer instanceof TsdbDocValuesProducer tsdbValuesProducer && tsdbValuesProducer.mergeStats.supported()) {
266+
final int numDocsWithField = tsdbValuesProducer.mergeStats.sumNumDocsWithField();
267+
final int minLength = tsdbValuesProducer.mergeStats.minLength();
268+
final int maxLength = tsdbValuesProducer.mergeStats.maxLength();
291269

292-
meta.writeInt(numDocsWithField);
293-
meta.writeInt(minLength);
294-
meta.writeInt(maxLength);
295-
if (maxLength > minLength) {
296-
start = data.getFilePointer();
297-
meta.writeLong(start);
298-
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
270+
assert numDocsWithField <= maxDoc;
299271

300-
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
301-
meta,
302-
data,
303-
numDocsWithField + 1,
304-
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
305-
);
306-
long addr = 0;
307-
writer.add(addr);
308-
values = valuesProducer.getBinary(field);
272+
BinaryDocValues values = valuesProducer.getBinary(field);
273+
long start = data.getFilePointer();
274+
meta.writeLong(start); // dataOffset
275+
276+
OffsetsAccumulator offsetsAccumulator = null;
277+
DISIAccumulator disiAccumulator = null;
278+
try {
279+
if (numDocsWithField > 0 && numDocsWithField < maxDoc) {
280+
disiAccumulator = new DISIAccumulator(dir, context, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
281+
}
282+
283+
assert maxLength >= minLength;
284+
if (maxLength > minLength) {
285+
offsetsAccumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField);
286+
}
287+
288+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
289+
BytesRef v = values.binaryValue();
290+
data.writeBytes(v.bytes, v.offset, v.length);
291+
if (disiAccumulator != null) {
292+
disiAccumulator.addDocId(doc);
293+
}
294+
if (offsetsAccumulator != null) {
295+
offsetsAccumulator.addDoc(v.length);
296+
}
297+
}
298+
meta.writeLong(data.getFilePointer() - start); // dataLength
299+
300+
if (numDocsWithField == 0) {
301+
meta.writeLong(-2); // docsWithFieldOffset
302+
meta.writeLong(0L); // docsWithFieldLength
303+
meta.writeShort((short) -1); // jumpTableEntryCount
304+
meta.writeByte((byte) -1); // denseRankPower
305+
} else if (numDocsWithField == maxDoc) {
306+
meta.writeLong(-1); // docsWithFieldOffset
307+
meta.writeLong(0L); // docsWithFieldLength
308+
meta.writeShort((short) -1); // jumpTableEntryCount
309+
meta.writeByte((byte) -1); // denseRankPower
310+
} else {
311+
long offset = data.getFilePointer();
312+
meta.writeLong(offset); // docsWithFieldOffset
313+
final short jumpTableEntryCount = disiAccumulator.build(data);
314+
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
315+
meta.writeShort(jumpTableEntryCount);
316+
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
317+
}
318+
319+
meta.writeInt(numDocsWithField);
320+
meta.writeInt(minLength);
321+
meta.writeInt(maxLength);
322+
if (offsetsAccumulator != null) {
323+
offsetsAccumulator.build(meta, data);
324+
}
325+
} finally {
326+
IOUtils.close(disiAccumulator, offsetsAccumulator);
327+
}
328+
} else {
329+
BinaryDocValues values = valuesProducer.getBinary(field);
330+
long start = data.getFilePointer();
331+
meta.writeLong(start); // dataOffset
332+
int numDocsWithField = 0;
333+
int minLength = Integer.MAX_VALUE;
334+
int maxLength = 0;
309335
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
310-
addr += values.binaryValue().length;
336+
numDocsWithField++;
337+
BytesRef v = values.binaryValue();
338+
int length = v.length;
339+
data.writeBytes(v.bytes, v.offset, v.length);
340+
minLength = Math.min(length, minLength);
341+
maxLength = Math.max(length, maxLength);
342+
}
343+
assert numDocsWithField <= maxDoc;
344+
meta.writeLong(data.getFilePointer() - start); // dataLength
345+
346+
if (numDocsWithField == 0) {
347+
meta.writeLong(-2); // docsWithFieldOffset
348+
meta.writeLong(0L); // docsWithFieldLength
349+
meta.writeShort((short) -1); // jumpTableEntryCount
350+
meta.writeByte((byte) -1); // denseRankPower
351+
} else if (numDocsWithField == maxDoc) {
352+
meta.writeLong(-1); // docsWithFieldOffset
353+
meta.writeLong(0L); // docsWithFieldLength
354+
meta.writeShort((short) -1); // jumpTableEntryCount
355+
meta.writeByte((byte) -1); // denseRankPower
356+
} else {
357+
long offset = data.getFilePointer();
358+
meta.writeLong(offset); // docsWithFieldOffset
359+
values = valuesProducer.getBinary(field);
360+
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
361+
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
362+
meta.writeShort(jumpTableEntryCount);
363+
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
364+
}
365+
366+
meta.writeInt(numDocsWithField);
367+
meta.writeInt(minLength);
368+
meta.writeInt(maxLength);
369+
if (maxLength > minLength) {
370+
start = data.getFilePointer();
371+
meta.writeLong(start);
372+
meta.writeVInt(ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT);
373+
374+
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
375+
meta,
376+
data,
377+
numDocsWithField + 1,
378+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
379+
);
380+
long addr = 0;
311381
writer.add(addr);
382+
values = valuesProducer.getBinary(field);
383+
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
384+
addr += values.binaryValue().length;
385+
writer.add(addr);
386+
}
387+
writer.finish();
388+
meta.writeLong(data.getFilePointer() - start);
312389
}
313-
writer.finish();
314-
meta.writeLong(data.getFilePointer() - start);
315390
}
316391
}
317392

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747

4848
final class ES819TSDBDocValuesProducer extends DocValuesProducer {
4949
final IntObjectHashMap<NumericEntry> numerics;
50-
private final IntObjectHashMap<BinaryEntry> binaries;
50+
final IntObjectHashMap<BinaryEntry> binaries;
5151
final IntObjectHashMap<SortedEntry> sorted;
5252
final IntObjectHashMap<SortedSetEntry> sortedSets;
5353
final IntObjectHashMap<SortedNumericEntry> sortedNumerics;
@@ -1318,7 +1318,7 @@ static class NumericEntry {
13181318
long valuesLength;
13191319
}
13201320

1321-
private static class BinaryEntry {
1321+
static class BinaryEntry {
13221322
long dataOffset;
13231323
long dataLength;
13241324
long docsWithFieldOffset;

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.codecs.DocValuesConsumer;
1212
import org.apache.lucene.codecs.DocValuesProducer;
1313
import org.apache.lucene.index.BaseTermsEnum;
14+
import org.apache.lucene.index.BinaryDocValues;
1415
import org.apache.lucene.index.DocIDMerger;
1516
import org.apache.lucene.index.DocValues;
1617
import org.apache.lucene.index.DocValuesType;
@@ -152,6 +153,102 @@ public long longValue() throws IOException {
152153
};
153154
}
154155

156+
/** Tracks state of one binary sub-reader that we are merging */
157+
private static class BinaryDocValuesSub extends DocIDMerger.Sub {
158+
159+
final BinaryDocValues values;
160+
161+
BinaryDocValuesSub(MergeState.DocMap docMap, BinaryDocValues values) {
162+
super(docMap);
163+
this.values = values;
164+
assert values.docID() == -1;
165+
}
166+
167+
@Override
168+
public int nextDoc() throws IOException {
169+
return values.nextDoc();
170+
}
171+
}
172+
173+
/**
174+
* Merges the binary docvalues from <code>MergeState</code>.
175+
*
176+
* <p>The default implementation calls {@link #addBinaryField}, passing a DocValuesProducer that
177+
* merges and filters deleted documents on the fly.
178+
*/
179+
public void mergeBinaryField(MergeStats mergeStats, FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException {
180+
addBinaryField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) {
181+
@Override
182+
public BinaryDocValues getBinary(FieldInfo fieldInfo) throws IOException {
183+
if (fieldInfo != mergeFieldInfo) {
184+
throw new IllegalArgumentException("wrong fieldInfo");
185+
}
186+
187+
List<BinaryDocValuesSub> subs = new ArrayList<>();
188+
189+
long cost = 0;
190+
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
191+
BinaryDocValues values = null;
192+
DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i];
193+
if (docValuesProducer != null) {
194+
FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name);
195+
if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.BINARY) {
196+
values = docValuesProducer.getBinary(readerFieldInfo);
197+
}
198+
}
199+
if (values != null) {
200+
cost += values.cost();
201+
subs.add(new BinaryDocValuesSub(mergeState.docMaps[i], values));
202+
}
203+
}
204+
205+
final DocIDMerger<BinaryDocValuesSub> docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort);
206+
final long finalCost = cost;
207+
208+
return new BinaryDocValues() {
209+
private BinaryDocValuesSub current;
210+
private int docID = -1;
211+
212+
@Override
213+
public int docID() {
214+
return docID;
215+
}
216+
217+
@Override
218+
public int nextDoc() throws IOException {
219+
current = docIDMerger.next();
220+
if (current == null) {
221+
docID = NO_MORE_DOCS;
222+
} else {
223+
docID = current.mappedDocID;
224+
}
225+
return docID;
226+
}
227+
228+
@Override
229+
public int advance(int target) throws IOException {
230+
throw new UnsupportedOperationException();
231+
}
232+
233+
@Override
234+
public boolean advanceExact(int target) throws IOException {
235+
throw new UnsupportedOperationException();
236+
}
237+
238+
@Override
239+
public long cost() {
240+
return finalCost;
241+
}
242+
243+
@Override
244+
public BytesRef binaryValue() throws IOException {
245+
return current.values.binaryValue();
246+
}
247+
};
248+
}
249+
});
250+
}
251+
155252
/** Tracks state of one sorted numeric sub-reader that we are merging */
156253
private static class SortedNumericDocValuesSub extends DocIDMerger.Sub {
157254

0 commit comments

Comments
 (0)