22
22
import java .util .ArrayDeque ;
23
23
import java .util .ArrayList ;
24
24
import java .util .Deque ;
25
+ import java .util .Iterator ;
25
26
import java .util .List ;
27
+ import java .util .Optional ;
26
28
27
29
/**
28
30
* Find spikes, dips and change point in a list of values.
@@ -35,20 +37,21 @@ public class ChangePointOperator implements Operator {
35
37
36
38
public static final int INPUT_VALUE_COUNT_LIMIT = 1000 ;
37
39
38
- public record Factory (int channel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
40
+ public record Factory (int metricChannel , Optional < Integer > partitionChannel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
39
41
@ Override
40
42
public Operator get (DriverContext driverContext ) {
41
- return new ChangePointOperator (driverContext , channel , sourceText , sourceLine , sourceColumn );
43
+ return new ChangePointOperator (driverContext , metricChannel , partitionChannel , sourceText , sourceLine , sourceColumn );
42
44
}
43
45
44
46
@ Override
45
47
public String describe () {
46
- return " ChangePointOperator[channel=" + channel + "]" ;
48
+ return ChangePointOperator . describe ( metricChannel , partitionChannel ) ;
47
49
}
48
50
}
49
51
50
52
private final DriverContext driverContext ;
51
- private final int channel ;
53
+ private final int metricChannel ;
54
+ private final Optional <Integer > partitionChannel ;
52
55
private final String sourceText ;
53
56
private final int sourceLine ;
54
57
private final int sourceColumn ;
@@ -60,9 +63,10 @@ public String describe() {
60
63
61
64
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
62
65
// (by modularizing esql-core) and use that instead of the individual fields.
63
- public ChangePointOperator (DriverContext driverContext , int channel , String sourceText , int sourceLine , int sourceColumn ) {
66
+ public ChangePointOperator (DriverContext driverContext , int metricChannel , Optional < Integer > partitionChannel , String sourceText , int sourceLine , int sourceColumn ) {
64
67
this .driverContext = driverContext ;
65
- this .channel = channel ;
68
+ this .metricChannel = metricChannel ;
69
+ this .partitionChannel = partitionChannel ;
66
70
this .sourceText = sourceText ;
67
71
this .sourceLine = sourceLine ;
68
72
this .sourceColumn = sourceColumn ;
@@ -105,61 +109,134 @@ public Page getOutput() {
105
109
}
106
110
107
111
private void createOutputPages () {
112
+ int maxValuesCount = checkValueCounts ();
113
+ List <MlAggsHelper .DoubleBucketValues > bucketValuesPerPartition = checkNullAndMultivalued (maxValuesCount );
114
+
115
+ List <ChangeType > changeTypes = new ArrayList <>();
116
+ for (MlAggsHelper .DoubleBucketValues bucketValues : bucketValuesPerPartition ) {
117
+ ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
118
+ if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
119
+ warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
120
+ }
121
+ changeTypes .add (changeType );
122
+ }
123
+
124
+ insertChangePoints (changeTypes );
125
+ }
126
+
127
+ private int checkValueCounts () {
128
+ int maxValuesCount = 0 ;
108
129
int valuesCount = 0 ;
109
- for (Page page : inputPages ) {
110
- valuesCount += page .getPositionCount ();
130
+ String lastPartitionFieldValue = null ;
131
+ for (Page inputPage : inputPages ) {
132
+ String currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
133
+ if (lastPartitionFieldValue != null ) {
134
+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
135
+ valuesCount = 0 ;
136
+ }
137
+ }
138
+ lastPartitionFieldValue = currentPartitionFieldValue ;
139
+ valuesCount += inputPage .getPositionCount ();
140
+ maxValuesCount = Math .max (maxValuesCount , valuesCount );
111
141
}
112
- boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT ;
142
+ boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT ;
113
143
if (tooManyValues ) {
114
- valuesCount = INPUT_VALUE_COUNT_LIMIT ;
144
+ warnings (true ).registerException (
145
+ new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
146
+ );
147
+ maxValuesCount = INPUT_VALUE_COUNT_LIMIT ;
115
148
}
149
+ return maxValuesCount ;
150
+ }
116
151
117
- List <Double > values = new ArrayList <>(valuesCount );
118
- List <Integer > bucketIndexes = new ArrayList <>(valuesCount );
119
- int valuesIndex = 0 ;
152
+ private List <MlAggsHelper .DoubleBucketValues > checkNullAndMultivalued (int maxValuesCount ) {
153
+ List <MlAggsHelper .DoubleBucketValues > result = new ArrayList <>();
154
+ List <Double > values = new ArrayList <>(maxValuesCount );
155
+ List <Integer > bucketIndexes = new ArrayList <>(maxValuesCount );
120
156
boolean hasNulls = false ;
121
157
boolean hasMultivalued = false ;
158
+ String lastPartitionFieldValue = null ;
122
159
for (Page inputPage : inputPages ) {
123
- Block inputBlock = inputPage .getBlock (channel );
124
- for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < valuesCount ; i ++) {
160
+ String currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
161
+ if (lastPartitionFieldValue != null ) {
162
+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
163
+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
164
+ null ,
165
+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
166
+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
167
+ );
168
+ result .add (bucketValues );
169
+
170
+ values = new ArrayList <>(maxValuesCount );
171
+ bucketIndexes = new ArrayList <>(maxValuesCount );
172
+ }
173
+ }
174
+ lastPartitionFieldValue = currentPartitionFieldValue ;
175
+ Block inputBlock = inputPage .getBlock (metricChannel );
176
+ for (int i = 0 , valuesIndex = 0 ; i < inputBlock .getPositionCount () && valuesIndex < maxValuesCount ; i ++, valuesIndex ++) {
125
177
Object value = BlockUtils .toJavaObject (inputBlock , i );
126
178
if (value == null ) {
127
179
hasNulls = true ;
128
- valuesIndex ++;
129
180
} else if (value instanceof List <?>) {
130
181
hasMultivalued = true ;
131
- valuesIndex ++;
132
182
} else {
133
183
values .add (((Number ) value ).doubleValue ());
134
- bucketIndexes .add (valuesIndex ++ );
184
+ bucketIndexes .add (valuesIndex );
135
185
}
136
186
}
137
187
}
188
+ // Handle last partition separately
189
+ if (lastPartitionFieldValue != null ) {
190
+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
191
+ null ,
192
+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
193
+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
194
+ );
195
+ result .add (bucketValues );
196
+ }
138
197
139
- MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
140
- null ,
141
- values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
142
- bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
143
- );
144
- ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
145
- int changePointIndex = changeType .changePoint ();
198
+ if (hasNulls ) {
199
+ warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
200
+ }
201
+ if (hasMultivalued ) {
202
+ warnings (true ).registerException (
203
+ new IllegalArgumentException (
204
+ "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
205
+ )
206
+ );
207
+ }
208
+ return result ;
209
+ }
146
210
211
+ private void insertChangePoints (Iterable <ChangeType > changeTypes ) {
212
+ Iterator <ChangeType > changeTypesIterator = changeTypes .iterator ();
213
+ ChangeType changeType = changeTypesIterator .next ();
147
214
BlockFactory blockFactory = driverContext .blockFactory ();
148
215
int pageStartIndex = 0 ;
216
+ String lastPartitionFieldValue = null ;
149
217
while (inputPages .isEmpty () == false ) {
150
218
Page inputPage = inputPages .peek ();
151
219
Page outputPage ;
152
220
Block changeTypeBlock = null ;
153
221
Block changePvalueBlock = null ;
154
222
boolean success = false ;
223
+
224
+ String currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
225
+ if (lastPartitionFieldValue != null ) {
226
+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
227
+ pageStartIndex = 0 ;
228
+ }
229
+ }
230
+ lastPartitionFieldValue = currentPartitionFieldValue ;
231
+
155
232
try {
156
- if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage .getPositionCount ()) {
233
+ if (pageStartIndex <= changeType . changePoint () && changeType . changePoint () < pageStartIndex + inputPage .getPositionCount ()) {
157
234
try (
158
235
BytesRefBlock .Builder changeTypeBlockBuilder = blockFactory .newBytesRefBlockBuilder (inputPage .getPositionCount ());
159
236
DoubleBlock .Builder pvalueBlockBuilder = blockFactory .newDoubleBlockBuilder (inputPage .getPositionCount ())
160
237
) {
161
238
for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
162
- if (pageStartIndex + i == changePointIndex ) {
239
+ if (pageStartIndex + i == changeType . changePoint () ) {
163
240
changeTypeBlockBuilder .appendBytesRef (new BytesRef (changeType .getWriteableName ()));
164
241
pvalueBlockBuilder .appendDouble (changeType .pValue ());
165
242
} else {
@@ -170,6 +247,9 @@ private void createOutputPages() {
170
247
changeTypeBlock = changeTypeBlockBuilder .build ();
171
248
changePvalueBlock = pvalueBlockBuilder .build ();
172
249
}
250
+ if (changeTypesIterator .hasNext ()) {
251
+ changeType = changeTypesIterator .next ();
252
+ }
173
253
} else {
174
254
changeTypeBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
175
255
changePvalueBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
@@ -187,24 +267,14 @@ private void createOutputPages() {
187
267
outputPages .add (outputPage );
188
268
pageStartIndex += inputPage .getPositionCount ();
189
269
}
270
+ }
190
271
191
- if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
192
- warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
193
- }
194
- if (tooManyValues ) {
195
- warnings (true ).registerException (
196
- new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
197
- );
198
- }
199
- if (hasNulls ) {
200
- warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
201
- }
202
- if (hasMultivalued ) {
203
- warnings (true ).registerException (
204
- new IllegalArgumentException (
205
- "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206
- )
207
- );
272
+ private String getCurrentPartitionFieldValue (Page inputPage ) {
273
+ assert partitionChannel .isPresent ();
274
+ assert inputPage .getPositionCount () > 0 ;
275
+ try (var block = inputPage .getBlock (partitionChannel .get ()).filter (0 )) {
276
+ BytesRef partition = ((BytesRefBlock ) block ).getBytesRef (0 , new BytesRef ());
277
+ return partition .utf8ToString ();
208
278
}
209
279
}
210
280
@@ -220,7 +290,11 @@ public void close() {
220
290
221
291
@ Override
222
292
public String toString () {
223
- return "ChangePointOperator[channel=" + channel + "]" ;
293
+ return describe (metricChannel , partitionChannel );
294
+ }
295
+
296
+ private static String describe (int metricChannel , Optional <Integer > partitionChannel ) {
297
+ return "ChangePointOperator[metricChannel=" + metricChannel + (partitionChannel .isPresent () ? ", partitionChannel=" + partitionChannel .get () : "" ) + "]" ;
224
298
}
225
299
226
300
private Warnings warnings (boolean onlyWarnings ) {
0 commit comments