Skip to content

Commit bfcaf5b

Browse files
committed
adding _source to index_mappings
Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent bf36855 commit bfcaf5b

File tree

7 files changed

+120
-6
lines changed

7 files changed

+120
-6
lines changed

flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala

+19-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ case class FlintMetadata(
4141
*/
4242
latestLogEntry: Option[FlintMetadataLogEntry] = None,
4343
/** Optional Flint index settings. TODO: move elsewhere? */
44-
indexSettings: Option[String]) {
44+
indexSettings: Option[String],
45+
/** Optional Flint index mappings.*/
46+
indexMappings: Option[String],
47+
/** Optional Flint index mappings _source field*/
48+
indexMappingsSourceEnabled: Boolean) {
4549

4650
require(version != null, "version is required")
4751
require(name != null, "name is required")
@@ -69,6 +73,8 @@ object FlintMetadata {
6973
private var latestId: Option[String] = None
7074
private var latestLogEntry: Option[FlintMetadataLogEntry] = None
7175
private var indexSettings: Option[String] = None
76+
private var indexMappings: Option[String] = None
77+
private var indexMappingsSourceEnabled = true
7278

7379
def version(version: FlintVersion): this.type = {
7480
this.version = version
@@ -131,6 +137,16 @@ object FlintMetadata {
131137
this
132138
}
133139

140+
def indexMappings(indexMappings: String): this.type = {
141+
this.indexMappings = Option(indexMappings)
142+
this
143+
}
144+
145+
def indexMappingsSourceEnabled(indexMappingsSourceEnabled: Boolean): this.type = {
146+
this.indexMappingsSourceEnabled = indexMappingsSourceEnabled
147+
this
148+
}
149+
134150
// Build method to create the FlintMetadata instance
135151
def build(): FlintMetadata = {
136152
FlintMetadata(
@@ -143,6 +159,8 @@ object FlintMetadata {
143159
properties = properties,
144160
schema = schema,
145161
indexSettings = indexSettings,
162+
indexMappings = indexMappings,
163+
indexMappingsSourceEnabled = indexMappingsSourceEnabled,
146164
latestId = latestId,
147165
latestLogEntry = latestLogEntry)
148166
}

flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala

+12-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,10 @@ object FlintOpenSearchIndexMetadataService {
134134
optionalObjectField(builder, "properties", metadata.properties)
135135
}
136136
}
137-
137+
// add _source field
138+
objectField(builder, "_source") {
139+
builder.field("enabled", metadata.indexMappingsSourceEnabled)
140+
}
138141
// Add properties (schema) field
139142
builder.field("properties", metadata.schema)
140143
})
@@ -191,6 +194,14 @@ object FlintOpenSearchIndexMetadataService {
191194
}
192195
}
193196
}
197+
case "_source" =>
198+
parseObjectField(parser) { (parser, innerFieldName) =>
199+
{
200+
innerFieldName match {
201+
case "enabled" => builder.indexMappingsSourceEnabled(parser.booleanValue())
202+
case _ => // Handle other fields as needed
203+
}
204+
}}
194205
case "properties" =>
195206
builder.schema(parser.map())
196207
case _ => // Ignore other fields, for instance, dynamic.

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala

+26
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,32 @@ object FlintSparkIndex extends Logging {
180180
builder.indexSettings(settings.get)
181181
}
182182

183+
//optional index mappings
184+
val mappings = index.options.indexMappings()
185+
// parse from json to object
186+
// extract sourceEnabled
187+
if(mappings.isDefined) {
188+
parseJson(mappings.getOrElse("")) { (parser, fieldName) =>
189+
{
190+
fieldName match {
191+
case "_source" =>
192+
parseObjectField(parser) { (parser, innerFieldName) =>
193+
{
194+
innerFieldName match {
195+
case "enabled" => builder.indexMappingsSourceEnabled(parser.booleanValue())
196+
case _ => // Handle other fields as needed
197+
}
198+
}
199+
}
200+
case _ => // Ignore other fields, for instance, dynamic.
201+
}
202+
}
203+
}
204+
}
205+
if (mappings.isDefined) {
206+
builder.indexMappings(mappings.get)
207+
}
208+
183209
// Optional latest metadata log entry
184210
val latestLogEntry = index.latestLogEntry
185211
if (latestLogEntry.isDefined) {

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.util.{Collections, UUID}
1010
import org.json4s.{Formats, NoTypeHints}
1111
import org.json4s.native.JsonMethods._
1212
import org.json4s.native.Serialization
13-
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
13+
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_MAPPINGS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
1414
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
1515
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
1616
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser
@@ -96,6 +96,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
9696
*/
9797
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)
9898

99+
/**
100+
* The index mappings for OpenSearch index created.
101+
*
102+
* @return
103+
* index mapping JSON
104+
*/
105+
def indexMappings(): Option[String] = getOptionValue(INDEX_MAPPINGS)
106+
99107
/**
100108
* An expression that generates unique value as index data row ID.
101109
*
@@ -195,6 +203,7 @@ object FlintSparkIndexOptions {
195203
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
196204
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
197205
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
206+
val INDEX_MAPPINGS: OptionName.Value = Value("index_mappings")
198207
val ID_EXPRESSION: OptionName.Value = Value("id_expression")
199208
val EXTRA_OPTIONS: OptionName.Value = Value("extra_options")
200209
}

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala

+16
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,22 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
144144
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
145145
}
146146

147+
test("create covering index with index mappings") {
148+
sql(s"""
149+
| CREATE INDEX $testIndex ON $testTable ( name )
150+
| WITH (
151+
| index_mappings = '{ "_source": { "enabled": false } }',
152+
| )
153+
|""".stripMargin)
154+
155+
// Check if the _source in index mappings option is set to OS index mappings
156+
val flintIndexMetadataService =
157+
new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava))
158+
159+
val mappingsSourceEnabled = flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexMappingsSourceEnabled
160+
mappingsSourceEnabled shouldBe false
161+
}
162+
147163
test("create covering index with invalid option") {
148164
the[IllegalArgumentException] thrownBy
149165
sql(s"""

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala

+20-3
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
211211
| CREATE MATERIALIZED VIEW $testMvName
212212
| AS $testQuery
213213
| WITH (
214-
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
214+
| index_settings = '{"number_of_shards": 4, "number_of_replicas": 3}'
215215
| )
216216
|""".stripMargin)
217217

@@ -222,8 +222,25 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
222222
implicit val formats: Formats = Serialization.formats(NoTypeHints)
223223
val settings =
224224
parse(flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexSettings.get)
225-
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
226-
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
225+
(settings \ "index.number_of_shards").extract[String] shouldBe "4"
226+
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
227+
}
228+
229+
test("create materialized view with index mappings") {
230+
sql(s"""
231+
| CREATE MATERIALIZED VIEW $testMvName
232+
| AS $testQuery
233+
| WITH (
234+
| index_mappings = '{ "_source": { "enabled": false } }',
235+
| )
236+
|""".stripMargin)
237+
238+
// Check if the _source in index mappings option is set to OS index mappings
239+
val flintIndexMetadataService =
240+
new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava))
241+
242+
val mappingsSourceEnabled = flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexMappingsSourceEnabled
243+
mappingsSourceEnabled shouldBe false
227244
}
228245

229246
test("create materialized view with full refresh") {

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala

+17
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,23 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
209209
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
210210
}
211211

212+
test("create skipping index with index mappings") {
213+
sql(s"""
214+
| CREATE SKIPPING INDEX ON $testTable
215+
| ( year PARTITION )
216+
| WITH (
217+
| index_mappings = '{ "_source": { "enabled": false } }',
218+
| )
219+
|""".stripMargin)
220+
221+
// Check if the _source in index mappings option is set to OS index mappings
222+
val flintIndexMetadataService =
223+
new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava))
224+
225+
val mappingsSourceEnabled = flintIndexMetadataService.getIndexMetadata(testIndex).indexMappingsSourceEnabled
226+
mappingsSourceEnabled shouldBe false
227+
}
228+
212229
Seq(
213230
"struct_col.field1.subfield VALUE_SET, struct_col.field2 MIN_MAX",
214231
"`struct_col.field1.subfield` VALUE_SET, `struct_col.field2` MIN_MAX", // ensure previous hack still works

0 commit comments

Comments
 (0)