5
5
6
6
package org .opensearch .flint .spark
7
7
8
- import java .util .concurrent .Executors
9
-
10
8
import scala .collection .JavaConverters ._
11
- import scala .concurrent .{ExecutionContext , Future }
12
9
13
10
import org .opensearch .action .admin .indices .settings .put .UpdateSettingsRequest
14
11
import org .opensearch .client .RequestOptions
@@ -21,7 +18,6 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
21
18
import org .scalatest .matchers .should .Matchers
22
19
23
20
import org .apache .spark .sql .Row
24
- import org .apache .spark .sql .functions .col
25
21
26
22
class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
27
23
@@ -38,13 +34,6 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
38
34
private val testMvIndex = s " spark_catalog.default. $testMvIndexShortName"
39
35
private val testMvFlintIndex = FlintSparkMaterializedView .getFlintIndexName(testMvIndex)
40
36
41
- /**
42
- * Creates a custom `ExecutionContext` with a fixed thread pool for handling asynchronous
43
- * operations in tests. TODO: move to base class if more tests require this.
44
- */
45
- implicit val executionContext : ExecutionContext =
46
- ExecutionContext .fromExecutor(Executors .newFixedThreadPool(1 ))
47
-
48
37
override def beforeAll (): Unit = {
49
38
super .beforeAll()
50
39
createTimeSeriesTable(testTableQualifiedName)
@@ -135,7 +124,7 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
135
124
outputError shouldBe empty
136
125
137
126
// Trigger next micro batch after 5 seconds with index readonly
138
- Future {
127
+ new Thread (() => {
139
128
Thread .sleep(5000 )
140
129
openSearchClient
141
130
.indices()
@@ -145,7 +134,7 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
145
134
RequestOptions .DEFAULT )
146
135
sql(
147
136
s " INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver') " )
148
- }
137
+ }).start()
149
138
150
139
// Await to store exception and verify if it's as expected
151
140
flint.flintIndexMonitor.awaitMonitor(Some (testSkippingFlintIndex))
0 commit comments