Skip to content

Commit aa0a0e9

Browse files
committed
Add IT
Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent dbb2d87 commit aa0a0e9

File tree

1 file changed

+44
-1
lines changed

1 file changed

+44
-1
lines changed

integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,24 @@
55

66
package org.opensearch.flint.spark
77

8+
import scala.collection.JavaConverters._
9+
import scala.concurrent.ExecutionContext.Implicits.global
10+
import scala.concurrent.Future
11+
12+
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
813
import org.opensearch.client.RequestOptions
914
import org.opensearch.client.indices.CreateIndexRequest
1015
import org.opensearch.common.xcontent.XContentType
1116
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
1217
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
1318
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
1419
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
20+
import org.scalatest.matchers.should.Matchers
1521

1622
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.functions.col
1724

18-
class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
25+
class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
1926

2027
private val testTableName = "index_test"
2128
private val testTableQualifiedName = s"spark_catalog.default.$testTableName"
@@ -99,6 +106,42 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
99106
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2"))
100107
}
101108

109+
test("show flint indexes with extended information") {
110+
// Create and refresh with all existing data
111+
flint
112+
.skippingIndex()
113+
.onTable(testTableQualifiedName)
114+
.addValueSet("name")
115+
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
116+
.create()
117+
flint.refreshIndex(testSkippingFlintIndex)
118+
val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
119+
awaitStreamingComplete(activeJob.get.id.toString)
120+
121+
// Trigger next micro batch after 5 seconds with index readonly
122+
Future {
123+
Thread.sleep(5000)
124+
openSearchClient
125+
.indices()
126+
.putSettings(
127+
new UpdateSettingsRequest(testSkippingFlintIndex).settings(
128+
Map("index.blocks.write" -> true).asJava),
129+
RequestOptions.DEFAULT)
130+
sql(
131+
s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')")
132+
}
133+
134+
// Await and store exception as expected
135+
flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))
136+
137+
// Assert output contains error message
138+
val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog")
139+
df.columns should contain("error")
140+
df.select(col("error")).collect().head.getString(0) should include("OpenSearchException")
141+
142+
deleteTestIndex(testSkippingFlintIndex)
143+
}
144+
102145
test("should return empty when show flint index in empty database") {
103146
checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty)
104147
}

0 commit comments

Comments
 (0)