Skip to content

Commit 1dc87b9

Browse files
bharath-techiedk2k
authored andcommitted
star tree file formats refactoring and fixing offset bug (opensearch-project#15975)
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 734fec6 commit 1dc87b9

File tree

12 files changed

+281
-209
lines changed

12 files changed

+281
-209
lines changed

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
3535
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
3636
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
37+
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
3738
import org.opensearch.index.mapper.DocCountFieldMapper;
3839
import org.opensearch.index.mapper.FieldMapper;
3940
import org.opensearch.index.mapper.FieldValueConverter;
@@ -193,7 +194,9 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
193194
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
194195
}
195196
metricReader = new SequentialDocValuesIterator(
196-
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
197+
new SortedNumericStarTreeValuesIterator(
198+
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
199+
)
197200
);
198201
}
199202
metricReaders.add(metricReader);
@@ -228,7 +231,7 @@ public void build(
228231
dimensionFieldInfo = getFieldInfo(dimension, DocValuesType.SORTED_NUMERIC);
229232
}
230233
dimensionReaders[i] = new SequentialDocValuesIterator(
231-
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
234+
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo))
232235
);
233236
}
234237
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
@@ -287,7 +290,7 @@ void appendDocumentsToStarTree(Iterator<StarTreeDocument> starTreeDocumentIterat
287290
}
288291
}
289292

290-
private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDocs) throws IOException {
293+
private void serializeStarTree(int numSegmentStarTreeDocuments, int numStarTreeDocs) throws IOException {
291294
// serialize the star tree data
292295
long dataFilePointer = dataOut.getFilePointer();
293296
StarTreeWriter starTreeWriter = new StarTreeWriter();
@@ -299,7 +302,7 @@ private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDo
299302
starTreeField,
300303
metricAggregatorInfos,
301304
numStarTreeNodes,
302-
numSegmentStarTreeDocument,
305+
numSegmentStarTreeDocuments,
303306
numStarTreeDocs,
304307
dataFilePointer,
305308
totalStarTreeDataLength
@@ -400,22 +403,20 @@ protected StarTreeDocument getStarTreeDocument(
400403
) throws IOException {
401404
Long[] dims = new Long[numDimensions];
402405
int i = 0;
403-
for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) {
404-
dimensionDocValueIterator.nextDoc(currentDocId);
405-
Long val = dimensionDocValueIterator.value(currentDocId);
406+
for (SequentialDocValuesIterator dimensionValueIterator : dimensionReaders) {
407+
dimensionValueIterator.nextEntry(currentDocId);
408+
Long val = dimensionValueIterator.value(currentDocId);
406409
dims[i] = val;
407410
i++;
408411
}
409412
i = 0;
410413
Object[] metrics = new Object[metricReaders.size()];
411-
for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) {
412-
metricDocValuesIterator.nextDoc(currentDocId);
414+
for (SequentialDocValuesIterator metricValuesIterator : metricReaders) {
415+
metricValuesIterator.nextEntry(currentDocId);
413416
// As part of merge, we traverse the star tree doc values
414417
// The type of data stored in metric fields is different from the
415418
// actual indexing field they're based on
416-
metrics[i] = metricAggregatorInfos.get(i)
417-
.getValueAggregators()
418-
.toAggregatedValueType(metricDocValuesIterator.value(currentDocId));
419+
metrics[i] = metricAggregatorInfos.get(i).getValueAggregators().toAggregatedValueType(metricValuesIterator.value(currentDocId));
419420
i++;
420421
}
421422
return new StarTreeDocument(dims, metrics);
@@ -502,7 +503,7 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
502503
for (int i = 0; i < numDimensions; i++) {
503504
if (dimensionReaders[i] != null) {
504505
try {
505-
dimensionReaders[i].nextDoc(currentDocId);
506+
dimensionReaders[i].nextEntry(currentDocId);
506507
} catch (IOException e) {
507508
logger.error("unable to iterate to next doc", e);
508509
throw new RuntimeException("unable to iterate to next doc", e);
@@ -530,7 +531,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<Sequential
530531
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
531532
if (metricStatReader != null) {
532533
try {
533-
metricStatReader.nextDoc(currentDocId);
534+
metricStatReader.nextEntry(currentDocId);
534535
} catch (IOException e) {
535536
logger.error("unable to iterate to next doc", e);
536537
throw new RuntimeException("unable to iterate to next doc", e);
@@ -672,7 +673,7 @@ private SequentialDocValuesIterator getIteratorForNumericField(
672673
SequentialDocValuesIterator sequentialDocValuesIterator;
673674
assert fieldProducerMap.containsKey(fieldInfo.name);
674675
sequentialDocValuesIterator = new SequentialDocValuesIterator(
675-
DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo))
676+
new SortedNumericStarTreeValuesIterator(DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo)))
676677
);
677678
return sequentialDocValuesIterator;
678679
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
153153
.size()];
154154
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
155155
String dimension = dimensionsSplitOrder.get(i).getField();
156-
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
156+
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
157157
}
158158
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
159159
// get doc id set iterators for metrics
@@ -164,7 +164,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
164164
metric.getField(),
165165
metricStat.getTypeName()
166166
);
167-
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
167+
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));
168168
}
169169
}
170170
int currentDocId = 0;

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
138138

139139
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
140140
String dimension = dimensionsSplitOrder.get(i).getField();
141-
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
141+
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
142142
}
143143

144144
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
@@ -150,7 +150,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
150150
metric.getField(),
151151
metricStat.getTypeName()
152152
);
153-
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
153+
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));
154154

155155
}
156156
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocsFileManager.java

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import java.io.Closeable;
2323
import java.io.IOException;
24-
import java.util.ArrayList;
2524
import java.util.LinkedHashMap;
2625
import java.util.List;
2726
import java.util.Map;
@@ -55,11 +54,9 @@ public class StarTreeDocsFileManager extends AbstractDocumentsFileManager implem
5554
private RandomAccessInput starTreeDocsFileRandomInput;
5655
private IndexOutput starTreeDocsFileOutput;
5756
private final Map<String, Integer> fileToEndDocIdMap;
58-
private final List<Integer> starTreeDocumentOffsets = new ArrayList<>();
5957
private int currentFileStartDocId;
6058
private int numReadableStarTreeDocuments;
6159
private int starTreeFileCount = -1;
62-
private int currBytes = 0;
6360
private final int fileCountMergeThreshold;
6461
private int numStarTreeDocs = 0;
6562

@@ -98,15 +95,26 @@ IndexOutput createStarTreeDocumentsFileOutput() throws IOException {
9895
public void writeStarTreeDocument(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) throws IOException {
9996
assert isAggregatedDoc == true;
10097
int numBytes = writeStarTreeDocument(starTreeDocument, starTreeDocsFileOutput, true);
101-
addStarTreeDocumentOffset(numBytes);
98+
if (docSizeInBytes == -1) {
99+
docSizeInBytes = numBytes;
100+
} else {
101+
assert docSizeInBytes == numBytes;
102+
}
102103
numStarTreeDocs++;
103104
}
104105

105106
@Override
106107
public StarTreeDocument readStarTreeDocument(int docId, boolean isAggregatedDoc) throws IOException {
107108
assert isAggregatedDoc == true;
108109
ensureDocumentReadable(docId);
109-
return readStarTreeDocument(starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId), true);
110+
return readStarTreeDocument(starTreeDocsFileRandomInput, getOffset(docId), true);
111+
}
112+
113+
/**
114+
* Returns offset for the docId based on the current file start id
115+
*/
116+
private long getOffset(int docId) {
117+
return (long) (docId - currentFileStartDocId) * docSizeInBytes;
110118
}
111119

112120
@Override
@@ -119,19 +127,10 @@ public Long getDimensionValue(int docId, int dimensionId) throws IOException {
119127
public Long[] readDimensions(int docId) throws IOException {
120128
ensureDocumentReadable(docId);
121129
Long[] dims = new Long[starTreeField.getDimensionsOrder().size()];
122-
readDimensions(dims, starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId));
130+
readDimensions(dims, starTreeDocsFileRandomInput, getOffset(docId));
123131
return dims;
124132
}
125133

126-
private void addStarTreeDocumentOffset(int bytes) {
127-
starTreeDocumentOffsets.add(currBytes);
128-
currBytes += bytes;
129-
if (docSizeInBytes == -1) {
130-
docSizeInBytes = bytes;
131-
}
132-
assert docSizeInBytes == bytes;
133-
}
134-
135134
/**
136135
* Load the correct StarTreeDocuments file based on the docId
137136
*/
@@ -199,7 +198,6 @@ private void loadStarTreeDocumentFile(int docId) throws IOException {
199198
* If the operation is only for reading existing documents, a new file is not created.
200199
*/
201200
private void closeAndMaybeCreateNewFile(boolean shouldCreateFileForAppend, int numStarTreeDocs) throws IOException {
202-
currBytes = 0;
203201
if (starTreeDocsFileOutput != null) {
204202
fileToEndDocIdMap.put(starTreeDocsFileOutput.getName(), numStarTreeDocs);
205203
IOUtils.close(starTreeDocsFileOutput);
@@ -232,7 +230,6 @@ private void mergeFiles(int numStarTreeDocs) throws IOException {
232230
deleteOldFiles();
233231
fileToEndDocIdMap.clear();
234232
fileToEndDocIdMap.put(mergedOutput.getName(), numStarTreeDocs);
235-
resetStarTreeDocumentOffsets();
236233
}
237234
}
238235

@@ -259,17 +256,6 @@ private void deleteOldFiles() throws IOException {
259256
}
260257
}
261258

262-
/**
263-
* Reset the star tree document offsets based on the merged file
264-
*/
265-
private void resetStarTreeDocumentOffsets() {
266-
int curr = 0;
267-
for (int i = 0; i < starTreeDocumentOffsets.size(); i++) {
268-
starTreeDocumentOffsets.set(i, curr);
269-
curr += docSizeInBytes;
270-
}
271-
}
272-
273259
@Override
274260
public void close() {
275261
try {
@@ -288,7 +274,6 @@ public void close() {
288274
tmpDirectory.deleteFile(file);
289275
} catch (IOException ignored) {} // similar to IOUtils.deleteFilesIgnoringExceptions
290276
}
291-
starTreeDocumentOffsets.clear();
292277
fileToEndDocIdMap.clear();
293278
}
294279
}

0 commit comments

Comments
 (0)