Skip to content

Commit 9763027

Browse files
committed
Introduce CHANGE_POINT ... BY ... syntax
1 parent d8aa6c7 commit 9763027

File tree

22 files changed

+2757
-1955
lines changed

22 files changed

+2757
-1955
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java

Lines changed: 154 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.util.ArrayDeque;
2323
import java.util.ArrayList;
2424
import java.util.Deque;
25+
import java.util.Iterator;
2526
import java.util.List;
27+
import java.util.Objects;
28+
import java.util.stream.Collectors;
2629

2730
/**
2831
* Find spikes, dips and change point in a list of values.
@@ -35,20 +38,21 @@ public class ChangePointOperator implements Operator {
3538

3639
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;
3740

38-
public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
41+
public record Factory(int metricChannel, List<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
3942
@Override
4043
public Operator get(DriverContext driverContext) {
41-
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
44+
return new ChangePointOperator(driverContext, metricChannel, partitionChannel, sourceText, sourceLine, sourceColumn);
4245
}
4346

4447
@Override
4548
public String describe() {
46-
return "ChangePointOperator[channel=" + channel + "]";
49+
return ChangePointOperator.describe(metricChannel, partitionChannel);
4750
}
4851
}
4952

5053
private final DriverContext driverContext;
51-
private final int channel;
54+
private final int metricChannel;
55+
private final List<Integer> partitionChannel;
5256
private final String sourceText;
5357
private final int sourceLine;
5458
private final int sourceColumn;
@@ -60,9 +64,10 @@ public String describe() {
6064

6165
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6266
// (by modularizing esql-core) and use that instead of the individual fields.
63-
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
67+
public ChangePointOperator(DriverContext driverContext, int metricChannel, List<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) {
6468
this.driverContext = driverContext;
65-
this.channel = channel;
69+
this.metricChannel = metricChannel;
70+
this.partitionChannel = partitionChannel;
6671
this.sourceText = sourceText;
6772
this.sourceLine = sourceLine;
6873
this.sourceColumn = sourceColumn;
@@ -105,61 +110,140 @@ public Page getOutput() {
105110
}
106111

107112
private void createOutputPages() {
108-
int valuesCount = 0;
109-
for (Page page : inputPages) {
110-
valuesCount += page.getPositionCount();
111-
}
112-
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
113-
if (tooManyValues) {
114-
valuesCount = INPUT_VALUE_COUNT_LIMIT;
113+
int maxValuesCount = 0;
114+
{
115+
int valuesCount = 0;
116+
String lastPartitionFieldValue = null;
117+
for (Page inputPage : inputPages) {
118+
String currentPartitionFieldValue = getPartitionKey(inputPage, 0);
119+
if (lastPartitionFieldValue != null) {
120+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
121+
valuesCount = 0;
122+
}
123+
}
124+
lastPartitionFieldValue = currentPartitionFieldValue;
125+
valuesCount += inputPage.getPositionCount();
126+
maxValuesCount = Math.max(maxValuesCount, valuesCount);
127+
}
115128
}
129+
boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT;
116130

117-
List<Double> values = new ArrayList<>(valuesCount);
118-
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
119-
int valuesIndex = 0;
131+
132+
List<MlAggsHelper.DoubleBucketValues> bucketValuesPerPartition = new ArrayList<>();
120133
boolean hasNulls = false;
121134
boolean hasMultivalued = false;
122-
for (Page inputPage : inputPages) {
123-
Block inputBlock = inputPage.getBlock(channel);
124-
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
125-
Object value = BlockUtils.toJavaObject(inputBlock, i);
126-
if (value == null) {
127-
hasNulls = true;
128-
valuesIndex++;
129-
} else if (value instanceof List<?>) {
130-
hasMultivalued = true;
131-
valuesIndex++;
132-
} else {
133-
values.add(((Number) value).doubleValue());
134-
bucketIndexes.add(valuesIndex++);
135+
{
136+
List<Double> values = new ArrayList<>(maxValuesCount);
137+
List<Integer> bucketIndexes = new ArrayList<>(maxValuesCount);
138+
int valuesIndex = 0;
139+
String lastPartitionFieldValue = null;
140+
for (Page inputPage : inputPages) {
141+
String currentPartitionFieldValue = getPartitionKey(inputPage, 0);
142+
if (lastPartitionFieldValue != null) {
143+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
144+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
145+
null,
146+
values.stream().mapToDouble(Double::doubleValue).toArray(),
147+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
148+
);
149+
bucketValuesPerPartition.add(bucketValues);
150+
151+
values = new ArrayList<>(maxValuesCount);
152+
bucketIndexes = new ArrayList<>(maxValuesCount);
153+
valuesIndex = 0;
154+
}
155+
}
156+
lastPartitionFieldValue = currentPartitionFieldValue;
157+
Block inputBlock = inputPage.getBlock(metricChannel);
158+
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < maxValuesCount; i++, valuesIndex++) {
159+
Object value = BlockUtils.toJavaObject(inputBlock, i);
160+
if (value == null) {
161+
hasNulls = true;
162+
} else if (value instanceof List<?>) {
163+
hasMultivalued = true;
164+
} else {
165+
values.add(((Number) value).doubleValue());
166+
bucketIndexes.add(valuesIndex);
167+
}
168+
}
169+
}
170+
// Handle last partition separately
171+
// if (lastPartitionFieldValue != null) {
172+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
173+
null,
174+
values.stream().mapToDouble(Double::doubleValue).toArray(),
175+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
176+
);
177+
bucketValuesPerPartition.add(bucketValues);
178+
// }
179+
}
180+
181+
List<ChangeType> changeTypes = new ArrayList<>();
182+
{
183+
for (MlAggsHelper.DoubleBucketValues bucketValues : bucketValuesPerPartition) {
184+
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
185+
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
186+
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
135187
}
188+
changeTypes.add(changeType);
136189
}
137190
}
138191

139-
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
140-
null,
141-
values.stream().mapToDouble(Double::doubleValue).toArray(),
142-
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
143-
);
144-
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
145-
int changePointIndex = changeType.changePoint();
192+
insertChangePoints(changeTypes);
193+
194+
if (tooManyValues) {
195+
warnings(true).registerException(
196+
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197+
);
198+
}
199+
if (hasNulls) {
200+
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
201+
}
202+
if (hasMultivalued) {
203+
warnings(true).registerException(
204+
new IllegalArgumentException(
205+
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206+
)
207+
);
208+
}
209+
}
146210

211+
private void insertChangePoints(Iterable<ChangeType> changeTypes) {
212+
Iterator<ChangeType> changeTypesIterator = changeTypes.iterator();
213+
ChangeType changeType = null;
214+
if (changeTypesIterator.hasNext()) {
215+
changeType = changeTypesIterator.next();
216+
}
147217
BlockFactory blockFactory = driverContext.blockFactory();
148218
int pageStartIndex = 0;
219+
String lastPartitionFieldValue = null;
149220
while (inputPages.isEmpty() == false) {
150221
Page inputPage = inputPages.peek();
151222
Page outputPage;
152223
Block changeTypeBlock = null;
153224
Block changePvalueBlock = null;
154225
boolean success = false;
226+
227+
String currentPartitionFieldValue = getPartitionKey(inputPage, 0);
228+
if (lastPartitionFieldValue != null) {
229+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
230+
pageStartIndex = 0;
231+
if (changeTypesIterator.hasNext()) {
232+
changeType = changeTypesIterator.next();
233+
}
234+
}
235+
}
236+
lastPartitionFieldValue = currentPartitionFieldValue;
237+
155238
try {
156-
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
239+
// TODO: How to handle case when there are no change points
240+
if (changeType != null && pageStartIndex <= changeType.changePoint() && changeType.changePoint() < pageStartIndex + inputPage.getPositionCount()) {
157241
try (
158242
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
159243
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
160244
) {
161245
for (int i = 0; i < inputPage.getPositionCount(); i++) {
162-
if (pageStartIndex + i == changePointIndex) {
246+
if (pageStartIndex + i == changeType.changePoint()) {
163247
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
164248
pvalueBlockBuilder.appendDouble(changeType.pValue());
165249
} else {
@@ -174,8 +258,10 @@ private void createOutputPages() {
174258
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
175259
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
176260
}
177-
178-
outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
261+
outputPage = inputPage.appendBlocks(new Block[]{changeTypeBlock, changePvalueBlock});
262+
if (pageStartIndex + inputPage.getPositionCount() > INPUT_VALUE_COUNT_LIMIT) {
263+
outputPage = outputPage.subPage(0, INPUT_VALUE_COUNT_LIMIT - pageStartIndex);
264+
}
179265
success = true;
180266
} finally {
181267
if (success == false) {
@@ -187,25 +273,28 @@ private void createOutputPages() {
187273
outputPages.add(outputPage);
188274
pageStartIndex += inputPage.getPositionCount();
189275
}
276+
}
190277

191-
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
192-
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
193-
}
194-
if (tooManyValues) {
195-
warnings(true).registerException(
196-
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197-
);
198-
}
199-
if (hasNulls) {
200-
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
278+
/**
279+
* Calculates the partition key of the i-th row of the given page.
280+
*
281+
* @param page page for which the partition key should be calculated
282+
* @param i row index
283+
* @return partition key of the i-th row of the given page
284+
*/
285+
private String getPartitionKey(Page page, int i) {
286+
if (partitionChannel.isEmpty()) {
287+
return "";
201288
}
202-
if (hasMultivalued) {
203-
warnings(true).registerException(
204-
new IllegalArgumentException(
205-
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206-
)
207-
);
289+
assert page.getPositionCount() > 0;
290+
StringBuilder builder = new StringBuilder();
291+
for (Integer partitionChannel : partitionChannel) {
292+
try (var block = page.getBlock(partitionChannel).filter(i)) {
293+
BytesRef partitionFieldValue = ((BytesRefBlock) block).getBytesRef(i, new BytesRef());
294+
builder.append(partitionFieldValue.utf8ToString());
295+
}
208296
}
297+
return builder.toString();
209298
}
210299

211300
@Override
@@ -220,7 +309,15 @@ public void close() {
220309

221310
@Override
222311
public String toString() {
223-
return "ChangePointOperator[channel=" + channel + "]";
312+
return describe(metricChannel, partitionChannel);
313+
}
314+
315+
private static String describe(int metricChannel, List<Integer> partitionChannel) {
316+
return "ChangePointOperator[metricChannel="
317+
+ metricChannel
318+
+ ", partitionChannels="
319+
+ partitionChannel.stream().map(c -> c.toString()).collect(Collectors.joining(",", "[", "]"))
320+
+ "]";
224321
}
225322

226323
private Warnings warnings(boolean onlyWarnings) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.BytesRefBlock;
13+
import org.elasticsearch.compute.data.DoubleBlock;
14+
import org.elasticsearch.compute.data.LongBlock;
15+
import org.elasticsearch.compute.data.Page;
16+
import org.elasticsearch.compute.test.OperatorTestCase;
17+
import org.elasticsearch.core.Tuple;
18+
import org.hamcrest.Matcher;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.hasSize;
26+
import static org.hamcrest.Matchers.lessThan;
27+
28+
public class ChangePointByOperatorTests extends OperatorTestCase {
29+
30+
@Override
31+
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
32+
// size must be in [25, 1000] for ChangePoint to function correctly and detect the step change.
33+
size = Math.clamp(size, 25, 1000);
34+
List<Tuple<Long, BytesRef>> data = new ArrayList<>(size);
35+
for (int i = 0; i < size; i++) { // step change 0 -> 100
36+
data.add(Tuple.tuple(i < size / 2 ? randomLongBetween(0, 3) : randomLongBetween(100, 103), new BytesRef("prod")));
37+
}
38+
for (int i = 0; i < size; i++) { // step change 300 -> 200
39+
data.add(Tuple.tuple(i < size / 2 ? randomLongBetween(300, 303) : randomLongBetween(200, 203), new BytesRef("staging")));
40+
}
41+
for (int i = 0; i < size; i++) { // spike 50 -> 500 -> 50
42+
data.add(Tuple.tuple(i == 2 * size / 3 ? randomLongBetween(500, 503) : randomLongBetween(50, 53), new BytesRef("qa")));
43+
}
44+
return new LongBytesRefTupleBlockSourceOperator(blockFactory, data, size);
45+
}
46+
47+
@Override
48+
protected void assertSimpleOutput(List<Page> input, List<Page> output) {
49+
assertThat(output, hasSize(input.size()));
50+
int rowCount = 0;
51+
List<Tuple<Integer, String>> actualChangePoints = new ArrayList<>();
52+
for (int i = 0; i < output.size(); i++) {
53+
Page inputPage = input.get(i);
54+
Page outputPage = output.get(i);
55+
assertThat(outputPage.getPositionCount(), equalTo(inputPage.getPositionCount()));
56+
assertThat(outputPage.getBlockCount(), equalTo(4));
57+
for (int j = 0; j < outputPage.getPositionCount(); j++) {
58+
long inputValue = ((LongBlock) inputPage.getBlock(0)).getLong(j);
59+
long outputValue = ((LongBlock) outputPage.getBlock(0)).getLong(j);
60+
String inputPartition = ((BytesRefBlock) inputPage.getBlock(1)).getBytesRef(j, new BytesRef()).utf8ToString();
61+
String outputPartition = ((BytesRefBlock) outputPage.getBlock(1)).getBytesRef(j, new BytesRef()).utf8ToString();
62+
assertThat(outputValue, equalTo(inputValue));
63+
assertThat(outputPartition, equalTo(inputPartition));
64+
if (outputPage.getBlock(2).isNull(j) == false) { // change point detected at this position
65+
String type = (((BytesRefBlock) outputPage.getBlock(2)).getBytesRef(j, new BytesRef())).utf8ToString();
66+
double pvalue = ((DoubleBlock) outputPage.getBlock(3)).getDouble(j);
67+
assertThat(pvalue, lessThan(1E-9));
68+
actualChangePoints.add(Tuple.tuple(rowCount, type));
69+
} else { // no change point at this position
70+
assertThat(outputPage.getBlock(3).isNull(j), equalTo(true));
71+
}
72+
rowCount++;
73+
}
74+
}
75+
assertThat(
76+
actualChangePoints,
77+
equalTo(List.of(
78+
Tuple.tuple(rowCount / 6, "step_change"),
79+
Tuple.tuple(rowCount / 2, "step_change"),
80+
Tuple.tuple(8 * rowCount / 9, "spike"))
81+
)
82+
);
83+
}
84+
85+
@Override
86+
protected Operator.OperatorFactory simple(SimpleOptions options) {
87+
return new ChangePointOperator.Factory(0, List.of(1), null,0, 0);
88+
}
89+
90+
@Override
91+
protected Matcher<String> expectedDescriptionOfSimple() {
92+
return equalTo("ChangePointOperator[metricChannel=0, partitionChannels=[1]]");
93+
}
94+
95+
@Override
96+
protected Matcher<String> expectedToStringOfSimple() {
97+
return equalTo("ChangePointOperator[metricChannel=0, partitionChannels=[1]]");
98+
}
99+
}

0 commit comments

Comments
 (0)