Skip to content

Commit 8f18277

Browse files
Adding fast refresh setting during index creation (#1074)
* Adding fast refresh setting during index creation Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * abstracting refresh setting outside of OSclient implementation Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * adjusting settings to match that of opensearch index settings Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * addressing comment Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * adding malformed settings unit test Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * fix formatting issues Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * misc dev changes (unpolished, for testing purposes) Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * adding fast refresh check Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * formatting Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * adding inspect exception Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * formatting Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * adjusting setting name Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * cleanup Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * even more cleanup Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * even more cleanup Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * runback 1 Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * runback 2 Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * adding malformed settings docker image test Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * polishing malformed settings and mappings docker image tests Signed-off-by: Dennis Toepker <toepkerd@amazon.com> * cleanup Signed-off-by: Dennis Toepker <toepkerd@amazon.com> --------- Signed-off-by: Dennis Toepker <toepkerd@amazon.com> Co-authored-by: Dennis Toepker <toepkerd@amazon.com>
1 parent f236ec7 commit 8f18277

File tree

5 files changed

+128
-6
lines changed

5 files changed

+128
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.apache.spark.opensearch.index
7+
8+
import scala.collection.JavaConverters._
9+
10+
import org.opensearch.flint.core.FlintOptions
11+
12+
import org.apache.spark.opensearch.table.OpenSearchCatalogSuite
13+
import org.apache.spark.sql.{FlintJob, OSClient}
14+
15+
class OpenSearchIndexITSuite extends OpenSearchCatalogSuite {
16+
17+
var osClient: OSClient = _
18+
val indexName = "test_index"
19+
20+
override def beforeAll(): Unit = {
21+
super.beforeAll()
22+
osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
23+
}
24+
25+
protected override def afterEach(): Unit = {
26+
super.afterEach()
27+
deleteTestIndex(indexName)
28+
}
29+
30+
test("FlintJobExecutor creating index with malformed settings should fail") {
31+
val mappings =
32+
"""{
33+
| "properties": {
34+
| "accountId": {
35+
| "type": "keyword"
36+
| },
37+
| "eventName": {
38+
| "type": "keyword"
39+
| },
40+
| "eventSource": {
41+
| "type": "keyword"
42+
| }
43+
| }
44+
|}""".stripMargin
45+
46+
// invalid JSON
47+
val settings =
48+
"""{
49+
| "index": {
50+
| "refresh_interval": "1s"
51+
|}""".stripMargin
52+
53+
val result = FlintJob.createResultIndex(osClient, indexName, mappings, settings)
54+
result match {
55+
case Left(str) => assert(str == s"Failed to create result index $indexName")
56+
case Right(_) => fail("passing in invalid settings did not fail")
57+
}
58+
}
59+
60+
test("FlintJobExecutor creating index with malformed mappings should fail") {
61+
val osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
62+
63+
// invalid JSON
64+
val mappings =
65+
"""{
66+
| "properties": {
67+
| "accountId": {
68+
| "type": "keyword"
69+
| },
70+
| "eventName": {
71+
| "type": "keyword"
72+
| },
73+
| "eventSource": {
74+
| "type": "keyword"
75+
| }
76+
|}""".stripMargin
77+
78+
val settings =
79+
"""{
80+
| "index": {
81+
| "refresh_interval": "1s"
82+
| }
83+
|}""".stripMargin
84+
85+
val result = FlintJob.createResultIndex(osClient, indexName, mappings, settings)
86+
result match {
87+
case Left(str) => assert(str == s"Failed to create result index $indexName")
88+
case Right(_) => fail("passing in invalid mappings did not fail")
89+
}
90+
}
91+
}

integ-test/src/integration/scala/org/apache/spark/sql/JobTest.scala

+10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ trait JobTest extends Logging { self: OpenSearchSuite =>
6565

6666
Thread.sleep(2000) // 2 seconds
6767
}
68+
verifyFastRefresh(resultIndex)
6869
if (System.currentTimeMillis() - startTime >= timeoutMillis) {
6970
assert(
7071
false,
@@ -85,4 +86,13 @@ trait JobTest extends Logging { self: OpenSearchSuite =>
8586
// \\s+ is a regular expression that matches one or more whitespace characters, including spaces, tabs, and newlines.
8687
s.replaceAll("\\s+", " ")
8788
} // Replace all whitespace characters with empty string
89+
90+
def verifyFastRefresh(resultIndex: String): Unit = {
91+
val fastRefreshSettingKey = "index.refresh_interval"
92+
val fastRefreshSettingVal = "1s"
93+
94+
val settings = getIndexSettings(resultIndex)
95+
assert(settings.hasValue(fastRefreshSettingKey))
96+
assert(settings.get(fastRefreshSettingKey) == fastRefreshSettingVal)
97+
}
8898
}

integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala

+9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.opensearch.action.index.IndexRequest
1414
import org.opensearch.action.support.WriteRequest.RefreshPolicy
1515
import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient}
1616
import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest}
17+
import org.opensearch.common.settings.Settings
1718
import org.opensearch.common.xcontent.XContentType
1819
import org.opensearch.testcontainers.OpenSearchContainer
1920
import org.scalatest.{BeforeAndAfterAll, Suite}
@@ -309,4 +310,12 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
309310
s"bulk index docs to $index failed: ${response.buildFailureMessage()}")
310311
}
311312
}
313+
314+
def getIndexSettings(index: String): Settings = {
315+
val getIndexResponse =
316+
openSearchClient.indices().get(new GetIndexRequest(index), RequestOptions.DEFAULT)
317+
val indexToSettings = getIndexResponse.getSettings
318+
assume(indexToSettings.containsKey(index), s"index $index not found")
319+
indexToSettings.get(index)
320+
}
312321
}

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

+14-4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ trait FlintJobExecutor {
108108
}
109109
}""".stripMargin
110110

111+
// Fast refresh index setting for OpenSearch index. Eliminates index refresh as a source
112+
// of latency for interactive queries.
113+
val resultIndexSettings =
114+
"""{
115+
"index": {
116+
"refresh_interval": "1s"
117+
}
118+
}""".stripMargin
119+
111120
// Define the data schema
112121
val schema = StructType(
113122
Seq(
@@ -199,7 +208,7 @@ trait FlintJobExecutor {
199208
if (osClient.doesIndexExist(resultIndex)) {
200209
writeData(resultData, resultIndex, refreshPolicy)
201210
} else {
202-
createResultIndex(osClient, resultIndex, resultIndexMapping)
211+
createResultIndex(osClient, resultIndex, resultIndexMapping, resultIndexSettings)
203212
writeData(resultData, resultIndex, refreshPolicy)
204213
}
205214
}
@@ -375,7 +384,7 @@ trait FlintJobExecutor {
375384
case e: IllegalStateException
376385
if e.getCause != null &&
377386
e.getCause.getMessage.contains("index_not_found_exception") =>
378-
createResultIndex(osClient, resultIndex, resultIndexMapping)
387+
createResultIndex(osClient, resultIndex, resultIndexMapping, resultIndexSettings)
379388
case e: InterruptedException =>
380389
val error = s"Interrupted by the main thread: ${e.getMessage}"
381390
Thread.currentThread().interrupt() // Preserve the interrupt status
@@ -391,10 +400,11 @@ trait FlintJobExecutor {
391400
def createResultIndex(
392401
osClient: OSClient,
393402
resultIndex: String,
394-
mapping: String): Either[String, Unit] = {
403+
mapping: String,
404+
settings: String): Either[String, Unit] = {
395405
try {
396406
logInfo(s"create $resultIndex")
397-
osClient.createIndex(resultIndex, mapping)
407+
osClient.createIndex(resultIndex, mapping, settings)
398408
logInfo(s"create $resultIndex successfully")
399409
Right(())
400410
} catch {

spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,18 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
6161
* the name of the index
6262
* @param mapping
6363
* the mapping of the index
64+
* @param settings
65+
* * the index settings as a JSON string
6466
* @return
6567
* use Either for representing success or failure. A Right value indicates success, while a
6668
* Left value indicates an error.
6769
*/
68-
def createIndex(osIndexName: String, mapping: String): Unit = {
70+
def createIndex(osIndexName: String, mapping: String, settings: String): Unit = {
6971
logInfo(s"create $osIndexName")
7072

7173
using(flintClient.createClient()) { client =>
7274
val request = new CreateIndexRequest(osIndexName)
73-
request.mapping(mapping, XContentType.JSON)
75+
request.mapping(mapping, XContentType.JSON).settings(settings, XContentType.JSON)
7476

7577
try {
7678
client.createIndex(request, RequestOptions.DEFAULT)

0 commit comments

Comments
 (0)