Skip to content

Commit 58de7eb

Browse files
committed
Support Range aggregation
1 parent 28d81b7 commit 58de7eb

File tree

9 files changed

+410
-3
lines changed

9 files changed

+410
-3
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
id: elastic_aggregation_range
3+
title: "Range Aggregation"
4+
---
5+
6+
The `Range` aggregation is a multi-value aggregation enables the user to define a set of ranges. During the aggregation process, the values extraced from each document will be checked against each bucket range.
7+
8+
In order to use the `Range` aggregation import the following:
9+
```scala
10+
import zio.elasticsearch.aggregation.RangeAggregation
11+
import zio.elasticsearch.ElasticAggregation.RangeAggregation
12+
```
13+
14+
You can create a `Range` aggregation using the `rangeAggregation` method this way:
15+
```scala
16+
val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = "testField", range = SingleRange.to(23.9))
17+
```
18+
19+
You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Range` aggregation using the `rangeAggregation` method this way:
20+
```scala
21+
// Document.intField must be number value, because of Min aggregation
22+
val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = Document.intField, range = SingleRange.to(23.9))
23+
```
24+
25+
// TODO: check this
26+
If you want to change the `missing` parameter, you can use `missing` method:
27+
```scala
28+
val aggregationWithMissing: MinAggregation = minAggregation(name = "minAggregation", field = Document.intField).missing(10.0)
29+
```
30+
// TODO: check this
31+
If you want to add aggregation (on the same level), you can use `withAgg` method:
32+
```scala
33+
val multipleAggregations: MultipleAggregations = minAggregation(name = "minAggregation1", field = Document.intField).withAgg(minAggregation(name = "minAggregation2", field = Document.doubleField))
34+
```
35+
36+
You can find more information about `Range` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket-range-aggregation.html).

modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import zio.elasticsearch.ElasticAggregation._
2121
import zio.elasticsearch.ElasticHighlight.highlight
2222
import zio.elasticsearch.ElasticQuery.{script => _, _}
2323
import zio.elasticsearch.ElasticSort.sortBy
24-
import zio.elasticsearch.aggregation.AggregationOrder
24+
import zio.elasticsearch.aggregation.{AggregationOrder, SingleRange}
2525
import zio.elasticsearch.data.GeoPoint
2626
import zio.elasticsearch.domain.{PartialTestDocument, TestDocument, TestSubDocument}
2727
import zio.elasticsearch.executor.Executor
@@ -33,7 +33,7 @@ import zio.elasticsearch.query.sort.SortOrder._
3333
import zio.elasticsearch.query.sort.SourceType.NumberType
3434
import zio.elasticsearch.query.{Distance, FunctionScoreBoostMode, FunctionScoreFunction, InnerHits}
3535
import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome}
36-
import zio.elasticsearch.result.{FilterAggregationResult, Item, MaxAggregationResult, UpdateByQueryResult}
36+
import zio.elasticsearch.result._
3737
import zio.elasticsearch.script.{Painless, Script}
3838
import zio.json.ast.Json.{Arr, Str}
3939
import zio.schema.codec.JsonCodec
@@ -256,6 +256,47 @@ object HttpExecutorSpec extends IntegrationSpec {
256256
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
257257
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
258258
),
259+
test("aggregate using range aggregation") {
260+
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
261+
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
262+
for {
263+
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
264+
_ <- Executor.execute(
265+
ElasticRequest
266+
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 120))
267+
)
268+
_ <-
269+
Executor.execute(
270+
ElasticRequest
271+
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 180))
272+
.refreshTrue
273+
)
274+
aggregation = rangeAggregation(
275+
name = "aggregationInt",
276+
field = TestDocument.intField,
277+
range = SingleRange(from = 100.0, to = 200.0)
278+
)
279+
aggsRes <-
280+
Executor
281+
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
282+
.asRangeAggregation("aggregationInt")
283+
} yield assert(aggsRes.head)(
284+
equalTo(
285+
RangeAggregationResult(
286+
bucket = RangeAggregationBucketResult(
287+
key = "100.0-200.0",
288+
from = Some(100.0),
289+
to = Some(200.0),
290+
docCount = 2
291+
)
292+
)
293+
)
294+
)
295+
}
296+
} @@ around(
297+
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
298+
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
299+
),
259300
test("aggregate using percentile ranks aggregation") {
260301
val expectedResult = Map("500.0" -> 55.55555555555555, "600.0" -> 100.0)
261302
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {

modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package zio.elasticsearch
1818

1919
import zio.Chunk
20+
//import zio.elasticsearch.aggregation.{Range => IRange, SingleRange => Range, _}
2021
import zio.elasticsearch.aggregation._
2122
import zio.elasticsearch.query.ElasticQuery
2223
import zio.elasticsearch.script.Script
@@ -216,6 +217,34 @@ object ElasticAggregation {
216217
final def minAggregation(name: String, field: String): MinAggregation =
217218
Min(name = name, field = field, missing = None)
218219

220+
// TODO: Add docs
221+
final def rangeAggregation[A: Numeric](
222+
name: String,
223+
field: Field[_, A],
224+
range: SingleRange,
225+
ranges: SingleRange*
226+
): RangeAggregation =
227+
Range(
228+
name = name,
229+
field = field.toString,
230+
ranges = Chunk.fromIterable(ranges.prepended(range)),
231+
keyed = None
232+
)
233+
234+
// TODO: Add docs
235+
final def rangeAggregation(
236+
name: String,
237+
field: String,
238+
range: SingleRange,
239+
ranges: SingleRange*
240+
): RangeAggregation =
241+
Range(
242+
name = name,
243+
field = field,
244+
ranges = Chunk.fromIterable(ranges.prepended(range)),
245+
keyed = None
246+
)
247+
219248
/**
220249
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MissingAggregation]] using the specified
221250
* parameters.

modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,64 @@ private[elasticsearch] final case class Min(name: String, field: String, missing
248248
}
249249
}
250250

251+
private[elasticsearch] final case class SingleRange(
252+
from: Option[Double],
253+
to: Option[Double],
254+
key: Option[String]
255+
) { self =>
256+
def from(value: Double): SingleRange = self.copy(from = Some(value))
257+
def to(value: Double): SingleRange = self.copy(to = Some(value))
258+
}
259+
260+
object SingleRange {
261+
262+
def from(value: Double): SingleRange =
263+
SingleRange(from = Some(value), to = None, key = None)
264+
265+
def to(value: Double): SingleRange =
266+
SingleRange(from = None, to = Some(value), key = None)
267+
268+
def apply(from: Double, to: Double): SingleRange =
269+
SingleRange(from = Some(from), to = Some(to), key = None)
270+
271+
}
272+
273+
sealed trait RangeAggregation extends SingleElasticAggregation with WithAgg {
274+
275+
def keyed(value: Boolean): Range
276+
}
277+
278+
private[elasticsearch] final case class Range(
279+
name: String,
280+
field: String,
281+
ranges: Chunk[SingleRange],
282+
keyed: Option[Boolean]
283+
) extends RangeAggregation { self =>
284+
285+
def keyed(value: Boolean): Range =
286+
self.copy(keyed = Some(value))
287+
288+
def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
289+
multipleAggregations.aggregations(self, agg)
290+
291+
private[elasticsearch] def toJson: Json = {
292+
val keyedJson: Json = keyed.fold(Obj())(m => Obj("keyed" -> m.toJson))
293+
294+
Obj(
295+
name -> Obj(
296+
"range" -> (Obj(
297+
"field" -> field.toJson,
298+
"ranges" -> Arr(ranges.map { r =>
299+
r.from.fold(Obj())(m => Obj("from" -> m.toJson)) merge
300+
r.to.fold(Obj())(m => Obj("to" -> m.toJson)) merge
301+
r.key.fold(Obj())(m => Obj("key" -> m.toJson))
302+
})
303+
) merge keyedJson)
304+
)
305+
)
306+
}
307+
}
308+
251309
sealed trait MissingAggregation extends SingleElasticAggregation with WithAgg
252310

253311
private[elasticsearch] final case class Missing(name: String, field: String) extends MissingAggregation { self =>

modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
package zio.elasticsearch.executor.response
1818

1919
import zio.Chunk
20+
import zio.elasticsearch.executor.response.RangeAggregationBucket.{
21+
KeyedRangeAggregationBuckets,
22+
RegulaRangeAggregationBuckets
23+
}
2024
import zio.elasticsearch.result._
2125
import zio.json.ast.Json
2226
import zio.json.ast.Json.Obj
23-
import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}
27+
import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonDiscriminator, jsonField, jsonHint}
2428

2529
private[elasticsearch] sealed trait AggregationBucket
2630

@@ -81,6 +85,10 @@ object AggregationResponse {
8185
MaxAggregationResult(value)
8286
case MinAggregationResponse(value) =>
8387
MinAggregationResult(value)
88+
case RangeAggregationResponse(RegulaRangeAggregationBuckets(value)) =>
89+
RegularRangeAggregationResult(value)
90+
case RangeAggregationResponse(KeyedRangeAggregationBuckets(value)) =>
91+
KeyedRangeAggregationResult(value)
8492
case MissingAggregationResponse(value) =>
8593
MissingAggregationResult(value)
8694
case PercentileRanksAggregationResponse(values) =>
@@ -297,6 +305,42 @@ private[elasticsearch] object MinAggregationResponse {
297305
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
298306
}
299307

308+
private[elasticsearch] sealed trait RangeAggregationBucket extends AggregationBucket
309+
private[elasticsearch] object RangeAggregationBucket {
310+
case class RegularRangeAggregationBucket(
311+
key: Option[String],
312+
from: Option[Double],
313+
to: Option[Double],
314+
@jsonField("doc_count")
315+
docCount: Int
316+
)
317+
318+
case class KeyedRangeAggregationBucket(
319+
from: Option[Double],
320+
to: Option[Double],
321+
@jsonField("doc_count")
322+
docCount: Int
323+
)
324+
325+
implicit val RegularRangeAggregationBucketDecoder: JsonDecoder[RegularRangeAggregationBucket] =
326+
DeriveJsonDecoder.gen[RegularRangeAggregationBucket]
327+
implicit val KeyedRangeAggregationBucketDecoder: JsonDecoder[KeyedRangeAggregationBucket] =
328+
DeriveJsonDecoder.gen[KeyedRangeAggregationBucket]
329+
330+
case class RegulaRangeAggregationBuckets(chunks: Chunk[RegularRangeAggregationBucket]) extends RangeAggregationBucket
331+
332+
case class KeyedRangeAggregationBuckets(chunks: Map[String, KeyedRangeAggregationBucket])
333+
extends RangeAggregationBucket
334+
}
335+
336+
private[elasticsearch] final case class RangeAggregationResponse(
337+
buckets: RangeAggregationBucket
338+
) extends AggregationResponse
339+
340+
private[elasticsearch] object RangeAggregationResponse {
341+
implicit val decoder: JsonDecoder[RangeAggregationResponse] = DeriveJsonDecoder.gen[RangeAggregationResponse]
342+
}
343+
300344
private[elasticsearch] final case class MissingAggregationResponse(@jsonField("doc_count") docCount: Int)
301345
extends AggregationResponse
302346

modules/library/src/main/scala/zio/elasticsearch/package.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
168168
def asMinAggregation(name: String): RIO[R, Option[MinAggregationResult]] =
169169
aggregationAs[MinAggregationResult](name)
170170

171+
def asRangeAggregation(name: String): RIO[R, Option[RangeAggregationResult[_]]] =
172+
aggregationAs[RangeAggregationResult[_]](name)
173+
171174
/**
172175
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
173176
*

modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package zio.elasticsearch.result
1818

1919
import zio.Chunk
20+
import zio.elasticsearch.executor.response.RangeAggregationBucket.{
21+
KeyedRangeAggregationBucket,
22+
RegularRangeAggregationBucket
23+
}
2024

2125
sealed trait AggregationResult
2226

@@ -63,6 +67,41 @@ final case class MaxAggregationResult private[elasticsearch] (value: Double) ext
6367

6468
final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult
6569

70+
final case class RegularRangeAggregationBucketResult private[elasticsearch] (
71+
key: String,
72+
to: Option[Double],
73+
from: Option[Double],
74+
docCount: Int
75+
) extends AggregationResult
76+
77+
final case class KeyedRangeAggregationBucketResult private[elasticsearch] (
78+
to: Option[Double],
79+
from: Option[Double],
80+
docCount: Int
81+
) extends AggregationResult
82+
83+
private[elasticsearch] abstract class RangeAggregationResult[A](val buckets: A) extends AggregationResult
84+
85+
final case class RegularRangeAggregationResult private[elasticsearch] (
86+
override val buckets: Chunk[RegularRangeAggregationBucketResult]
87+
) extends RangeAggregationResult(buckets)
88+
private[elasticsearch] object RegularRangeAggregationResult {
89+
90+
def apply(chunks: Chunk[RegularRangeAggregationBucket]): RegularRangeAggregationResult =
91+
RegularRangeAggregationResult(chunks)
92+
93+
}
94+
95+
final case class KeyedRangeAggregationResult private[elasticsearch] (
96+
override val buckets: Map[String, KeyedRangeAggregationBucketResult]
97+
) extends RangeAggregationResult(buckets)
98+
private[elasticsearch] object KeyedRangeAggregationResult {
99+
100+
def apply(chunks: Map[String, KeyedRangeAggregationBucket]): KeyedRangeAggregationResult =
101+
KeyedRangeAggregationResult(chunks)
102+
103+
}
104+
66105
final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult
67106

68107
final case class PercentileRanksAggregationResult private[elasticsearch] (values: Map[String, Double])

0 commit comments

Comments
 (0)