@@ -63,15 +63,15 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
63
63
}
64
64
65
65
test(" build disabled metadata cache writer" ) {
66
- FlintMetadataCacheWriterBuilder
67
- .build(FlintSparkConf ()) shouldBe a[FlintDisabledMetadataCacheWriter ]
66
+ withMetadataCacheWriteDisabled {
67
+ FlintMetadataCacheWriterBuilder
68
+ .build(FlintSparkConf ()) shouldBe a[FlintDisabledMetadataCacheWriter ]
69
+ }
68
70
}
69
71
70
72
test(" build opensearch metadata cache writer" ) {
71
- withMetadataCacheWriteEnabled {
72
- FlintMetadataCacheWriterBuilder
73
- .build(FlintSparkConf ()) shouldBe a[FlintOpenSearchMetadataCacheWriter ]
74
- }
73
+ FlintMetadataCacheWriterBuilder
74
+ .build(FlintSparkConf ()) shouldBe a[FlintOpenSearchMetadataCacheWriter ]
75
75
}
76
76
77
77
test(" write metadata cache to index mappings" ) {
@@ -345,67 +345,62 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
345
345
| """ .stripMargin)).foreach { case (refreshMode, optionsMap, expectedJson) =>
346
346
test(s " write metadata cache for $refreshMode" ) {
347
347
withExternalSchedulerEnabled {
348
- withMetadataCacheWriteEnabled {
349
- val flint : FlintSpark = new FlintSpark (spark)
350
- withTempDir { checkpointDir =>
351
- // update checkpoint_location if available in optionsMap
352
- val indexOptions = FlintSparkIndexOptions (
353
- optionsMap
354
- .get(" checkpoint_location" )
355
- .map(_ =>
356
- optionsMap.updated(" checkpoint_location" , checkpointDir.getAbsolutePath))
357
- .getOrElse(optionsMap))
358
-
359
- flint
360
- .skippingIndex()
361
- .onTable(testTable)
362
- .addMinMax(" age" )
363
- .options(indexOptions, testFlintIndex)
364
- .create()
365
-
366
- val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex)))
367
- propertiesJson should matchJson(expectedJson)
368
-
369
- flint.refreshIndex(testFlintIndex)
370
- val lastRefreshTime =
371
- compact(render(getPropertiesJValue(testFlintIndex) \ " lastRefreshTime" )).toLong
372
- lastRefreshTime should be > 0L
373
- }
348
+ val flint : FlintSpark = new FlintSpark (spark)
349
+ withTempDir { checkpointDir =>
350
+ // update checkpoint_location if available in optionsMap
351
+ val indexOptions = FlintSparkIndexOptions (
352
+ optionsMap
353
+ .get(" checkpoint_location" )
354
+ .map(_ => optionsMap.updated(" checkpoint_location" , checkpointDir.getAbsolutePath))
355
+ .getOrElse(optionsMap))
356
+
357
+ flint
358
+ .skippingIndex()
359
+ .onTable(testTable)
360
+ .addMinMax(" age" )
361
+ .options(indexOptions, testFlintIndex)
362
+ .create()
363
+
364
+ val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex)))
365
+ propertiesJson should matchJson(expectedJson)
366
+
367
+ flint.refreshIndex(testFlintIndex)
368
+ val lastRefreshTime =
369
+ compact(render(getPropertiesJValue(testFlintIndex) \ " lastRefreshTime" )).toLong
370
+ lastRefreshTime should be > 0L
374
371
}
375
372
}
376
373
}
377
374
}
378
375
379
376
test(" write metadata cache for auto refresh index with internal scheduler" ) {
380
- withMetadataCacheWriteEnabled {
381
- val flint : FlintSpark = new FlintSpark (spark)
382
- withTempDir { checkpointDir =>
383
- flint
384
- .skippingIndex()
385
- .onTable(testTable)
386
- .addMinMax(" age" )
387
- .options(
388
- FlintSparkIndexOptions (
389
- Map (
390
- " auto_refresh" -> " true" ,
391
- " scheduler_mode" -> " internal" ,
392
- " refresh_interval" -> " 10 Minute" ,
393
- " checkpoint_location" -> checkpointDir.getAbsolutePath)),
394
- testFlintIndex)
395
- .create()
396
-
397
- val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex)))
398
- propertiesJson should matchJson(s """
377
+ val flint : FlintSpark = new FlintSpark (spark)
378
+ withTempDir { checkpointDir =>
379
+ flint
380
+ .skippingIndex()
381
+ .onTable(testTable)
382
+ .addMinMax(" age" )
383
+ .options(
384
+ FlintSparkIndexOptions (
385
+ Map (
386
+ " auto_refresh" -> " true" ,
387
+ " scheduler_mode" -> " internal" ,
388
+ " refresh_interval" -> " 10 Minute" ,
389
+ " checkpoint_location" -> checkpointDir.getAbsolutePath)),
390
+ testFlintIndex)
391
+ .create()
392
+
393
+ val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex)))
394
+ propertiesJson should matchJson(s """
399
395
| {
400
396
| "metadataCacheVersion": " ${FlintMetadataCache .metadataCacheVersion}",
401
397
| "refreshInterval": 600,
402
398
| "sourceTables": [" $testTable"]
403
399
| }
404
400
| """ .stripMargin)
405
401
406
- flint.refreshIndex(testFlintIndex)
407
- compact(render(getPropertiesJValue(testFlintIndex))) should not include " lastRefreshTime"
408
- }
402
+ flint.refreshIndex(testFlintIndex)
403
+ compact(render(getPropertiesJValue(testFlintIndex))) should not include " lastRefreshTime"
409
404
}
410
405
}
411
406
0 commit comments