Skip to content

Commit c641f1a

Browse files
committed
Support Range aggregation
1 parent 09c510c commit c641f1a

File tree

9 files changed

+413
-2
lines changed

9 files changed

+413
-2
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
id: elastic_aggregation_range
3+
title: "Range Aggregation"
4+
---
5+
6+
The `Range` aggregation is a multi-value aggregation enables the user to define a set of ranges. During the aggregation process, the values extraced from each document will be checked against each bucket range.
7+
8+
In order to use the `Range` aggregation import the following:
9+
```scala
10+
import zio.elasticsearch.aggregation.RangeAggregation
11+
import zio.elasticsearch.ElasticAggregation.RangeAggregation
12+
```
13+
14+
You can create a `Range` aggregation using the `rangeAggregation` method this way:
15+
```scala
16+
val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = "testField", range = SingleRange.to(23.9))
17+
```
18+
19+
You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Range` aggregation using the `rangeAggregation` method this way:
20+
```scala
21+
// Document.intField must be number value, because of Min aggregation
22+
val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = Document.intField, range = SingleRange.to(23.9))
23+
```
24+
25+
// TODO: check this
26+
If you want to change the `missing` parameter, you can use `missing` method:
27+
```scala
28+
val aggregationWithMissing: MinAggregation = minAggregation(name = "minAggregation", field = Document.intField).missing(10.0)
29+
```
30+
// TODO: check this
31+
If you want to add aggregation (on the same level), you can use `withAgg` method:
32+
```scala
33+
val multipleAggregations: MultipleAggregations = minAggregation(name = "minAggregation1", field = Document.intField).withAgg(minAggregation(name = "minAggregation2", field = Document.doubleField))
34+
```
35+
36+
You can find more information about `Range` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket-range-aggregation.html).

modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import zio.elasticsearch.ElasticAggregation._
2121
import zio.elasticsearch.ElasticHighlight.highlight
2222
import zio.elasticsearch.ElasticQuery.{script => _, _}
2323
import zio.elasticsearch.ElasticSort.sortBy
24-
import zio.elasticsearch.aggregation.AggregationOrder
24+
import zio.elasticsearch.aggregation.{AggregationOrder, SingleRange}
2525
import zio.elasticsearch.data.GeoPoint
2626
import zio.elasticsearch.domain.{PartialTestDocument, TestDocument, TestSubDocument}
2727
import zio.elasticsearch.executor.Executor
@@ -33,7 +33,7 @@ import zio.elasticsearch.query.sort.SortOrder._
3333
import zio.elasticsearch.query.sort.SourceType.NumberType
3434
import zio.elasticsearch.query.{Distance, FunctionScoreBoostMode, FunctionScoreFunction, InnerHits}
3535
import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome}
36-
import zio.elasticsearch.result.{FilterAggregationResult, Item, MaxAggregationResult, UpdateByQueryResult}
36+
import zio.elasticsearch.result._
3737
import zio.elasticsearch.script.{Painless, Script}
3838
import zio.json.ast.Json.{Arr, Str}
3939
import zio.schema.codec.JsonCodec
@@ -256,6 +256,49 @@ object HttpExecutorSpec extends IntegrationSpec {
256256
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
257257
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
258258
),
259+
test("aggregate using range aggregation") {
260+
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
261+
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
262+
for {
263+
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
264+
_ <- Executor.execute(
265+
ElasticRequest
266+
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 120))
267+
)
268+
_ <-
269+
Executor.execute(
270+
ElasticRequest
271+
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 180))
272+
.refreshTrue
273+
)
274+
aggregation = rangeAggregation(
275+
name = "aggregationInt",
276+
field = TestDocument.intField,
277+
range = SingleRange(from = 100.0, to = 200.0)
278+
)
279+
aggsRes <-
280+
Executor
281+
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
282+
.asRangeAggregation("aggregationInt")
283+
} yield assert(aggsRes.head)(
284+
equalTo(
285+
RegularRangeAggregationResult(
286+
Chunk(
287+
RegularRangeAggregationBucketResult(
288+
key = "100.0-200.0",
289+
from = Some(100.0),
290+
to = Some(200.0),
291+
docCount = 2
292+
)
293+
)
294+
)
295+
)
296+
)
297+
}
298+
} @@ around(
299+
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
300+
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
301+
),
259302
test("aggregate using percentile ranks aggregation") {
260303
val expectedResult = Map("500.0" -> 55.55555555555555, "600.0" -> 100.0)
261304
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {

modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package zio.elasticsearch
1818

1919
import zio.Chunk
20+
//import zio.elasticsearch.aggregation.{Range => IRange, SingleRange => Range, _}
2021
import zio.elasticsearch.aggregation._
2122
import zio.elasticsearch.query.ElasticQuery
2223
import zio.elasticsearch.script.Script
@@ -216,6 +217,34 @@ object ElasticAggregation {
216217
final def minAggregation(name: String, field: String): MinAggregation =
217218
Min(name = name, field = field, missing = None)
218219

220+
// TODO: Add docs
221+
final def rangeAggregation[A: Numeric](
222+
name: String,
223+
field: Field[_, A],
224+
range: SingleRange,
225+
ranges: SingleRange*
226+
): RangeAggregation =
227+
Range(
228+
name = name,
229+
field = field.toString,
230+
ranges = Chunk.fromIterable(ranges.prepended(range)),
231+
keyed = None
232+
)
233+
234+
// TODO: Add docs
235+
final def rangeAggregation(
236+
name: String,
237+
field: String,
238+
range: SingleRange,
239+
ranges: SingleRange*
240+
): RangeAggregation =
241+
Range(
242+
name = name,
243+
field = field,
244+
ranges = Chunk.fromIterable(ranges.prepended(range)),
245+
keyed = None
246+
)
247+
219248
/**
220249
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MissingAggregation]] using the specified
221250
* parameters.

modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,64 @@ private[elasticsearch] final case class Min(name: String, field: String, missing
241241
}
242242
}
243243

244+
private[elasticsearch] final case class SingleRange(
245+
from: Option[Double],
246+
to: Option[Double],
247+
key: Option[String]
248+
) { self =>
249+
def from(value: Double): SingleRange = self.copy(from = Some(value))
250+
def to(value: Double): SingleRange = self.copy(to = Some(value))
251+
}
252+
253+
object SingleRange {
254+
255+
def from(value: Double): SingleRange =
256+
SingleRange(from = Some(value), to = None, key = None)
257+
258+
def to(value: Double): SingleRange =
259+
SingleRange(from = None, to = Some(value), key = None)
260+
261+
def apply(from: Double, to: Double): SingleRange =
262+
SingleRange(from = Some(from), to = Some(to), key = None)
263+
264+
}
265+
266+
sealed trait RangeAggregation extends SingleElasticAggregation with WithAgg {
267+
268+
def keyed(value: Boolean): Range
269+
}
270+
271+
private[elasticsearch] final case class Range(
272+
name: String,
273+
field: String,
274+
ranges: Chunk[SingleRange],
275+
keyed: Option[Boolean]
276+
) extends RangeAggregation { self =>
277+
278+
def keyed(value: Boolean): Range =
279+
self.copy(keyed = Some(value))
280+
281+
def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
282+
multipleAggregations.aggregations(self, agg)
283+
284+
private[elasticsearch] def toJson: Json = {
285+
val keyedJson: Json = keyed.fold(Obj())(m => Obj("keyed" -> m.toJson))
286+
287+
Obj(
288+
name -> Obj(
289+
"range" -> (Obj(
290+
"field" -> field.toJson,
291+
"ranges" -> Arr(ranges.map { r =>
292+
r.from.fold(Obj())(m => Obj("from" -> m.toJson)) merge
293+
r.to.fold(Obj())(m => Obj("to" -> m.toJson)) merge
294+
r.key.fold(Obj())(m => Obj("key" -> m.toJson))
295+
})
296+
) merge keyedJson)
297+
)
298+
)
299+
}
300+
}
301+
244302
sealed trait MissingAggregation extends SingleElasticAggregation with WithAgg
245303

246304
private[elasticsearch] final case class Missing(name: String, field: String) extends MissingAggregation { self =>

modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,30 @@ object AggregationResponse {
8181
MaxAggregationResult(value)
8282
case MinAggregationResponse(value) =>
8383
MinAggregationResult(value)
84+
case RegularRangeAggregationResponse(buckets) =>
85+
RegularRangeAggregationResult(
86+
buckets.map(b =>
87+
RegularRangeAggregationBucketResult(
88+
key = b.key,
89+
to = b.to,
90+
from = b.from,
91+
docCount = b.docCount
92+
)
93+
)
94+
)
95+
case KeyedRangeAggregationResponse(buckets) =>
96+
KeyedRangeAggregationResult(
97+
buckets.map { case (k, v) =>
98+
(
99+
k,
100+
KeyedRangeAggregationBucketResult(
101+
to = v.to,
102+
from = v.from,
103+
docCount = v.docCount
104+
)
105+
)
106+
}
107+
)
84108
case MissingAggregationResponse(value) =>
85109
MissingAggregationResult(value)
86110
case PercentileRanksAggregationResponse(values) =>
@@ -297,6 +321,43 @@ private[elasticsearch] object MinAggregationResponse {
297321
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
298322
}
299323

324+
private[elasticsearch] sealed trait RangeAggregationResponse extends AggregationResponse
325+
private[elasticsearch] object RangeAggregationBucketsResponse {
326+
implicit val RangeAggregationResponseDecoder: JsonDecoder[RangeAggregationResponse] =
327+
DeriveJsonDecoder.gen[RangeAggregationResponse]
328+
}
329+
330+
private[elasticsearch] final case class RegularRangeAggregationBucketResponse(
331+
key: String,
332+
to: Option[Double],
333+
from: Option[Double],
334+
@jsonField("doc_count")
335+
docCount: Int
336+
)
337+
private[elasticsearch] object RegularRangeAggregationBucketResponse {
338+
implicit val RegularRangeAggregationBucketResponseDecoder: JsonDecoder[RegularRangeAggregationBucketResponse] =
339+
DeriveJsonDecoder.gen[RegularRangeAggregationBucketResponse]
340+
}
341+
342+
private[elasticsearch] final case class KeyedRangeAggregationBucketResponse(
343+
to: Option[Double],
344+
from: Option[Double],
345+
@jsonField("doc_count")
346+
docCount: Int
347+
)
348+
private[elasticsearch] object KeyedRangeAggregationBucketResponse {
349+
implicit val KeyedRangeAggregationBucketResponseDecoder: JsonDecoder[KeyedRangeAggregationBucketResponse] =
350+
DeriveJsonDecoder.gen[KeyedRangeAggregationBucketResponse]
351+
}
352+
353+
private[elasticsearch] final case class RegularRangeAggregationResponse(
354+
buckets: Chunk[RegularRangeAggregationBucketResponse]
355+
) extends RangeAggregationResponse
356+
357+
private[elasticsearch] final case class KeyedRangeAggregationResponse(
358+
buckets: Map[String, KeyedRangeAggregationBucketResponse]
359+
) extends RangeAggregationResponse
360+
300361
private[elasticsearch] final case class MissingAggregationResponse(@jsonField("doc_count") docCount: Int)
301362
extends AggregationResponse
302363

modules/library/src/main/scala/zio/elasticsearch/package.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
120120
def asMinAggregation(name: String): RIO[R, Option[MinAggregationResult]] =
121121
aggregationAs[MinAggregationResult](name)
122122

123+
def asRangeAggregation(name: String): RIO[R, Option[RangeAggregationResult]] =
124+
aggregationAs[RangeAggregationResult](name)
125+
123126
/**
124127
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
125128
*

modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,30 @@ final case class MaxAggregationResult private[elasticsearch] (value: Double) ext
5757

5858
final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult
5959

60+
private[elasticsearch] sealed trait RangeAggregationResult extends AggregationResult
61+
62+
private[elasticsearch] final case class RegularRangeAggregationBucketResult(
63+
key: String,
64+
to: Option[Double],
65+
from: Option[Double],
66+
docCount: Int
67+
)
68+
69+
private[elasticsearch] final case class KeyedRangeAggregationBucketResult(
70+
to: Option[Double],
71+
from: Option[Double],
72+
docCount: Int
73+
)
74+
75+
private[elasticsearch] final case class RegularRangeAggregationResult(
76+
buckets: Chunk[RegularRangeAggregationBucketResult]
77+
) extends RangeAggregationResult
78+
79+
private[elasticsearch] final case class KeyedRangeAggregationResult(
80+
buckets: Map[String, KeyedRangeAggregationBucketResult]
81+
) extends RangeAggregationResult
82+
83+
6084
final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult
6185

6286
final case class PercentileRanksAggregationResult private[elasticsearch] (values: Map[String, Double])

0 commit comments

Comments
 (0)