Skip to content

Commit 09c510c

Browse files
(dsl): Support specifying index argument in bulk requests (#646)
1 parent e32e07e commit 09c510c

File tree

12 files changed

+800
-191
lines changed

12 files changed

+800
-191
lines changed

README.md

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,35 @@ ElasticQuery.range(User.age).gte(18).lt(100)
129129
ZIO Elastic requests like `Create`, `CreateOrUpdate`, `CreateWithId`, and `DeleteById` are bulkable requests.
130130
For bulkable requests, you can use `bulk` API that accepts request types that inherit the `Bulkable` trait.
131131

132+
When performing bulk operations, you can specify the target index at the request level for each individual operation.
133+
However, you can also define a global index for the entire bulk request if all operations within that request target
134+
the same index. If a global index is specified for the bulk request, but an individual operation
135+
within that same bulk request also explicitly defines its own index, the individual operation's index will take precedence for
136+
that specific request.
137+
132138
```scala
139+
val indexName: IndexName = IndexName("users-index")
140+
val specificIndexName: IndexName = IndexName("specific-users-index")
141+
142+
ElasticRequest.bulk(
143+
ElasticRequest.create[User](index = indexName, doc = User(1, "John Doe")),
144+
ElasticRequest.create[User](index = indexName, id = DocumentId("documentId2"), doc = User(2, "Jane Doe")),
145+
ElasticRequest.upsert[User](index = indexName, id = DocumentId("documentId3"), doc = User(3, "Richard Roe")),
146+
ElasticRequest.deleteById(index = indexName, id = DocumentId("documentId2"))
147+
)
148+
149+
ElasticRequest.bulk(
150+
index = indexName,
151+
ElasticRequest.create[User](doc = User(1, "John Doe")),
152+
ElasticRequest.upsert[User](id = DocumentId("documentId2"), doc = User(2, "Jane Doe")),
153+
ElasticRequest.deleteById(id = DocumentId("documentId3"))
154+
)
155+
133156
ElasticRequest.bulk(
134-
ElasticRequest.create[User](indexName, User(1, "John Doe")),
135-
ElasticRequest.create[User](indexName, DocumentId("documentId2"), User(2, "Jane Doe")),
136-
ElasticRequest.upsert[User](indexName, DocumentId("documentId3"), User(3, "Richard Roe")),
137-
ElasticRequest.deleteById(indexName, DocumentId("documentId2"))
157+
index = indexName,
158+
ElasticRequest.create[User](doc = User(20, "Eve")),
159+
ElasticRequest.upsert[User](index = specificIndexName, id = DocumentId("docId4"), doc = User(21, "Charlie")),
160+
ElasticRequest.deleteById(id = DocumentId("docId5"))
138161
)
139162
```
140163

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
---
2+
id: elastic_aggregation_sampler
3+
title: "Sampler Aggregation"
4+
---
5+
The Sampler aggregation is a single-bucket aggregation that returns a sample of the documents that fall into the aggregation scope. This aggregation is
6+
particularly useful when you want to run sub-aggregations on a representative sample of documents rather than on the entire dataset.
7+
8+
To use the `Sampler` aggregation, import the following:
9+
```scala
10+
import zio.elasticsearch.aggregation.SamplerAggregation
11+
import zio.elasticsearch.ElasticAggregation.samplerAggregation
12+
```
13+
14+
A Sampler aggregation must always have at least one sub-aggregation.
15+
You can create a Sampler aggregation with an initial sub-aggregation using the `samplerAggregation` method this way:
16+
```scala
17+
import zio.elasticsearch.ElasticAggregation.avgAggregation
18+
val aggregation: SamplerAggregation = samplerAggregation(
19+
name = "samplerAggregation",
20+
subAgg = avgAggregation(name = "avgRating", field = Document.intField)
21+
)
22+
```
23+
24+
If you want to add another sub-aggregation, you can use `withSubAgg` method:
25+
```scala
26+
val aggregationWithMultipleSubAggs: SamplerAggregation = samplerAggregation(
27+
name = "termsAggregation",
28+
field = Document.stringField
29+
).withSubAgg(maxAggregation(name = "maxAggregation", field = Document.intField))
30+
```
31+
By default, the `shard_size` parameter for a Sampler aggregation is set to 100. This means that each shard will return a maximum of 100 documents to be
32+
sampled.
33+
If you want to change the `shard_size`, you can use the `maxDocumentsPerShard` method:
34+
```scala
35+
val aggregationWithShardSize: SamplerAggregation = samplerAggregation(
36+
name = "samplerAggregation",
37+
subAgg = avgAggregation(name = "avgRating", field = Document.intField)
38+
).maxDocumentsPerShard(500)
39+
```
40+
You can find more detailed information about the `Sampler` aggregation in the official Elasticsearch documentation [here](https://www.elastic.co/docs/reference/aggregations/search-aggregations-bucket-sampler-aggregation).

docs/overview/requests/elastic_request_bulk.md

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,58 @@ In order to use the `Bulk` request import the following:
99
```scala
1010
import zio.elasticsearch.ElasticRequest.BulkRequest
1111
import zio.elasticsearch.ElasticRequest.bulk
12+
// this import is required for using `IndexName` and `DocumentId`
13+
import zio.elasticsearch._
1214
```
1315

14-
You can create a `Bulk` request using the `bulk` method this way:
16+
You can create a `Bulk` request using the `bulk` method, which offers two main ways to handle indices:
17+
1. With a global index:
18+
This approach is ideal when most (or all) of your bulk operations target the same index. You provide a default index for the entire bulk request as
19+
the first argument. Any individual operation within this `bulk` request that doesn't explicitly specify its own index will automatically use this
20+
global index.
21+
_**Important: If an individual operation does specify its own index, that individual index will always take precedence over the global one for that
22+
specific operation.**_
1523
```scala
16-
// this import is required for using `IndexName` and `DocumentId`
17-
import zio.elasticsearch._
24+
val index = IndexName("my-global-index")
1825

19-
val index = Index("index")
26+
val document1 = new Document(id = DocumentId("111"), intField = 1, stringField = "stringField1")
27+
val document2 = new Document(id = DocumentId("222"), intField = 2, stringField = "stringField2")
28+
29+
val request: BulkRequest = bulk(
30+
index = index,
31+
requests = create(doc = document1),
32+
upsert(id = DocumentId("111"), doc = document2)
33+
)
34+
```
35+
2. Without a global index:
36+
Choose this method when your bulk operations frequently target different indices, or when you prefer to explicitly define the index for every single
37+
operation. When using this variant, each individual `BulkableRequest` must specify its own index.
38+
```scala
39+
val index1 = IndexName("first-index")
40+
val index2 = IndexName("second-index")
2041

2142
val document1 = new Document(id = DocumentId("111"), intField = 1, stringField = "stringField1")
2243
val document2 = new Document(id = DocumentId("222"), intField = 2, stringField = "stringField2")
2344

24-
val request: BulkRequest = bulk(create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2))
45+
val request: BulkRequest = bulk(
46+
requests = create(index = index1, doc = document1),
47+
upsert(index = index2, id = DocumentId("111"), doc = document2)
48+
)
2549
```
2650

2751
If you want to change the `refresh`, you can use `refresh`, `refreshFalse` or `refreshTrue` method:
2852
```scala
29-
val requestWithRefresh: BulkRequest = bulk(create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).refresh(true)
30-
val requestWithRefreshFalse: BulkRequest = bulk(create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).refreshFalse
31-
val requestWithRefreshTrue: BulkRequest = bulk(create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).refreshTrue
53+
val requestWithRefresh: BulkRequest = bulk(requests = create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).refresh(true)
54+
val requestWithRefreshFalse: BulkRequest = bulk(requests = create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).refreshFalse
55+
val requestWithRefreshTrue: BulkRequest = bulk(requests = create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).refreshTrue
3256
```
3357

3458
If you want to change the `routing`, you can use the `routing` method:
3559
```scala
3660
// this import is required for using `Routing` also
3761
import zio.elasticsearch._
3862

39-
val requestWithRouting: BulkRequest = bulk(create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).routing(Routing("routing"))
63+
val requestWithRouting: BulkRequest = bulk(requests = create(index = index, doc = document1), upsert(index = index, id = DocumentId("111"), doc = document2)).routing(Routing("routing"))
4064
```
4165

4266
You can find more information about `Bulk` request [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html).

modules/example/src/main/scala/example/RepositoriesElasticsearch.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {
5555
)
5656
} yield ()
5757

58+
def upsertBulk(organization: String, repositories: Chunk[GitHubRepo]): Task[Unit] =
59+
for {
60+
routing <- routingOf(organization)
61+
bulkRequests = repositories.map(repo => ElasticRequest.upsert(DocumentId(repo.id), repo).routing(routing))
62+
_ <- elasticsearch.execute(ElasticRequest.bulk(Index, bulkRequests: _*))
63+
} yield ()
64+
5865
def upsert(id: String, repository: GitHubRepo): Task[Unit] =
5966
for {
6067
routing <- routingOf(repository.organization)
@@ -74,7 +81,6 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {
7481

7582
private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] =
7683
Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e))
77-
7884
}
7985

8086
object RepositoriesElasticsearch {
@@ -91,6 +97,9 @@ object RepositoriesElasticsearch {
9197
def createAll(repositories: Chunk[GitHubRepo]): RIO[RepositoriesElasticsearch, Unit] =
9298
ZIO.serviceWithZIO[RepositoriesElasticsearch](_.createAll(repositories))
9399

100+
def upsertBulk(organization: String, repositories: Chunk[GitHubRepo]): RIO[RepositoriesElasticsearch, Unit] =
101+
ZIO.serviceWithZIO[RepositoriesElasticsearch](_.upsertBulk(organization, repositories))
102+
94103
def upsert(id: String, repository: GitHubRepo): RIO[RepositoriesElasticsearch, Unit] =
95104
ZIO.serviceWithZIO[RepositoriesElasticsearch](_.upsert(id, repository))
96105

modules/example/src/main/scala/example/api/Repositories.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package example.api
1818

1919
import example.{GitHubRepo, RepositoriesElasticsearch}
20-
import zio.ZIO
2120
import zio.elasticsearch._
2221
import zio.elasticsearch.query.ElasticQuery
2322
import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome}
@@ -29,7 +28,9 @@ import zio.http.Status.{
2928
}
3029
import zio.http.{Method, _}
3130
import zio.json.EncoderOps
31+
import zio.schema.Schema
3232
import zio.schema.codec.JsonCodec.{Configuration => JsonCodecConfig, JsonDecoder}
33+
import zio.{Chunk, ZIO}
3334

3435
import CompoundOperator._
3536
import FilterOperator._
@@ -69,6 +70,26 @@ object Repositories {
6970
}
7071
}
7172
}.orDie,
73+
Method.POST / BasePath / string("organization") / "bulk-upsert" -> handler {
74+
(organization: String, req: Request) =>
75+
req.body.asString
76+
.map(JsonDecoder.decode[Chunk[GitHubRepo]](Schema.chunk(GitHubRepo.schema), _, JsonCodecConfig.default))
77+
.flatMap {
78+
case Left(jsonError) =>
79+
ZIO.succeed(Response.json(ErrorResponse.fromReasons(jsonError.message).toJson).status(HttpBadRequest))
80+
case Right(repositories) =>
81+
RepositoriesElasticsearch
82+
.upsertBulk(organization, repositories)
83+
.map(_ => Response.status(HttpNoContent))
84+
.catchAll(e =>
85+
ZIO.succeed(
86+
Response
87+
.json(ErrorResponse.fromReasons(s"Bulk operation failed: ${e.getMessage}").toJson)
88+
.status(HttpBadRequest)
89+
)
90+
)
91+
}
92+
}.orDie,
7293
Method.POST / BasePath / "search" -> handler { (req: Request) =>
7394
req.body.asString
7495
.map(JsonDecoder.decode[Criteria](Criteria.schema, _, JsonCodecConfig.default))

modules/example/zio-elasticsearch-example.postman_collection.json

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,42 @@
5757
}
5858
]
5959
},
60+
{
61+
"name": "Bulk Upsert Repositories",
62+
"request": {
63+
"method": "POST",
64+
"header": [
65+
{
66+
"key": "Content-Type",
67+
"value": "application/json"
68+
}
69+
],
70+
"body": {
71+
"mode": "raw",
72+
"raw": "[\n {\n \"id\": \"repo-bulk-1\",\n \"organization\": \"lambdaworks\",\n \"name\": \"bulk-repo-one\",\n \"url\": \"https://github.com/lambdaworks/bulk-repo-one\",\n \"description\": \"First repository added via bulk upsert.\",\n \"lastCommitAt\": \"2023-01-01T10:00:00.000\",\n \"stars\": 100,\n \"forks\": 10\n },\n {\n \"id\": \"repo-bulk-2\",\n \"organization\": \"lambdaworks\",\n \"name\": \"bulk-repo-two\",\n \"url\": \"https://github.com/lambdaworks/bulk-repo-two\",\n \"description\": \"Second repository added via bulk upsert.\",\n \"lastCommitAt\": \"2023-01-02T11:00:00.000\",\n \"stars\": 200,\n \"forks\": 20\n },\n {\n \"id\": \"1234567\",\n \"organization\": \"lambdaworks\",\n \"name\": \"scurl-detector-updated\",\n \"url\": \"https://github.com/lambdaworks/scurl-detector\",\n \"description\": \"Scala library that detects and extracts URLs from text (UPDATED via bulk).\",\n \"lastCommitAt\": \"2023-01-03T12:00:00.000\",\n \"stars\": 150,\n \"forks\": 15\n }\n]",
73+
"options": {
74+
"raw": {
75+
"language": "json"
76+
}
77+
}
78+
},
79+
"url": {
80+
"raw": "http://localhost:{{HTTP_PORT}}/api/repositories/lambdaworks/bulk-upsert",
81+
"protocol": "http",
82+
"host": [
83+
"localhost"
84+
],
85+
"port": "{{HTTP_PORT}}",
86+
"path": [
87+
"api",
88+
"repositories",
89+
"lambdaworks",
90+
"bulk-upsert"
91+
]
92+
}
93+
},
94+
"response": []
95+
},
6096
{
6197
"name": "Checking app health",
6298
"request": {

0 commit comments

Comments
 (0)