Skip to content

Commit 20022d6

Browse files
committed
adding _source to index_mappings
Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 1a67dc6 commit 20022d6

File tree

7 files changed

+135
-6
lines changed

7 files changed

+135
-6
lines changed

flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala

+19-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ case class FlintMetadata(
4141
*/
4242
latestLogEntry: Option[FlintMetadataLogEntry] = None,
4343
/** Optional Flint index settings. TODO: move elsewhere? */
44-
indexSettings: Option[String]) {
44+
indexSettings: Option[String],
45+
/** Optional Flint index mappings.*/
46+
indexMappings: Option[String],
47+
/** Optional Flint index mappings _source field*/
48+
indexMappingsSourceEnabled: Boolean) {
4549

4650
require(version != null, "version is required")
4751
require(name != null, "name is required")
@@ -69,6 +73,8 @@ object FlintMetadata {
6973
private var latestId: Option[String] = None
7074
private var latestLogEntry: Option[FlintMetadataLogEntry] = None
7175
private var indexSettings: Option[String] = None
76+
private var indexMappings: Option[String] = None
77+
private var indexMappingsSourceEnabled = true
7278

7379
def version(version: FlintVersion): this.type = {
7480
this.version = version
@@ -131,6 +137,16 @@ object FlintMetadata {
131137
this
132138
}
133139

140+
def indexMappings(indexMappings: String): this.type = {
141+
this.indexMappings = Option(indexMappings)
142+
this
143+
}
144+
145+
def indexMappingsSourceEnabled(indexMappingsSourceEnabled: Boolean): this.type = {
146+
this.indexMappingsSourceEnabled = indexMappingsSourceEnabled
147+
this
148+
}
149+
134150
// Build method to create the FlintMetadata instance
135151
def build(): FlintMetadata = {
136152
FlintMetadata(
@@ -143,6 +159,8 @@ object FlintMetadata {
143159
properties = properties,
144160
schema = schema,
145161
indexSettings = indexSettings,
162+
indexMappings = indexMappings,
163+
indexMappingsSourceEnabled = indexMappingsSourceEnabled,
146164
latestId = latestId,
147165
latestLogEntry = latestLogEntry)
148166
}

flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala

+16-1
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,16 @@ object FlintOpenSearchIndexMetadataService {
133133
optionalObjectField(builder, "options", metadata.options)
134134
optionalObjectField(builder, "properties", metadata.properties)
135135
}
136-
}
136+
// extract index mappings options from metadata.options
137+
// turn into object from string
138+
// obj[_source] exists -> set the following
137139

140+
// mapping fields options / others
141+
}
138142
// Add properties (schema) field
143+
objectField(builder, "_source") {
144+
builder.field("enabled", metadata.indexMappingsSourceEnabled)
145+
}
139146
builder.field("properties", metadata.schema)
140147
})
141148
} catch {
@@ -191,6 +198,14 @@ object FlintOpenSearchIndexMetadataService {
191198
}
192199
}
193200
}
201+
case "_source" =>
202+
parseObjectField(parser) { (parser, innerFieldName) =>
203+
{
204+
innerFieldName match {
205+
case "enabled" => builder.indexMappingsSourceEnabled(parser.booleanValue())
206+
case _ => // Handle other fields as needed
207+
}
208+
}}
194209
case "properties" =>
195210
builder.schema(parser.map())
196211
case _ => // Ignore other fields, for instance, dynamic.

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala

+34
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,40 @@ object FlintSparkIndex extends Logging {
180180
builder.indexSettings(settings.get)
181181
}
182182

183+
val mappings = index.options.indexMappings()
184+
// parse into object
185+
// extract source enabled
186+
if(mappings.isDefined) {
187+
parseJson(mappings.getOrElse("")) { (parser, fieldName) =>
188+
{
189+
fieldName match {
190+
case "_source" =>
191+
parseObjectField(parser) { (parser, innerFieldName) =>
192+
{
193+
innerFieldName match {
194+
case "enabled" => builder.indexMappingsSourceEnabled(parser.booleanValue())
195+
// case "source" => builder.source(parser.text())
196+
// case "indexedColumns" =>
197+
// parseArrayField(parser) {
198+
// builder.addIndexedColumn(parser.map())
199+
// }
200+
// case "options" => builder.options(parser.map())
201+
// case "properties" => builder.properties(parser.map())
202+
case _ => // Handle other fields as needed
203+
}
204+
}
205+
}
206+
// case "_source" =>
207+
// builder.schema(parser.map())
208+
case _ => // Ignore other fields, for instance, dynamic.
209+
}
210+
}
211+
}
212+
}
213+
if (mappings.isDefined) {
214+
builder.indexMappings(mappings.get)
215+
}
216+
183217
// Optional latest metadata log entry
184218
val latestLogEntry = index.latestLogEntry
185219
if (latestLogEntry.isDefined) {

flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.util.{Collections, UUID}
1010
import org.json4s.{Formats, NoTypeHints}
1111
import org.json4s.native.JsonMethods._
1212
import org.json4s.native.Serialization
13-
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
13+
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_MAPPINGS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
1414
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
1515
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
1616
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser
@@ -96,6 +96,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
9696
*/
9797
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)
9898

99+
/**
100+
* The index mappings for OpenSearch index created.
101+
*
102+
* @return
103+
* index mapping JSON
104+
*/
105+
def indexMappings(): Option[String] = getOptionValue(INDEX_MAPPINGS)
106+
99107
/**
100108
* An expression that generates unique value as index data row ID.
101109
*
@@ -195,6 +203,7 @@ object FlintSparkIndexOptions {
195203
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
196204
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
197205
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
206+
val INDEX_MAPPINGS: OptionName.Value = Value("index_mappings")
198207
val ID_EXPRESSION: OptionName.Value = Value("id_expression")
199208
val EXTRA_OPTIONS: OptionName.Value = Value("extra_options")
200209
}

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala

+17
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,23 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
144144
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
145145
}
146146

147+
test("create covering index with index mappings") {
148+
sql(s"""
149+
| CREATE INDEX $testIndex ON $testTable ( name )
150+
| WITH (
151+
| index_mappings = '{ "_source": { "enabled": false } }',
152+
| index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}'
153+
| )
154+
|""".stripMargin)
155+
156+
// Check if the index setting option is set to OS index setting
157+
val flintIndexMetadataService =
158+
new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava))
159+
160+
val mappingsSourceEnabled = flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexMappingsSourceEnabled
161+
mappingsSourceEnabled shouldBe false
162+
}
163+
147164
test("create covering index with invalid option") {
148165
the[IllegalArgumentException] thrownBy
149166
sql(s"""

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala

+21-3
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
211211
| CREATE MATERIALIZED VIEW $testMvName
212212
| AS $testQuery
213213
| WITH (
214-
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
214+
| index_settings = '{"number_of_shards": 4, "number_of_replicas": 3}'
215215
| )
216216
|""".stripMargin)
217217

@@ -222,8 +222,26 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
222222
implicit val formats: Formats = Serialization.formats(NoTypeHints)
223223
val settings =
224224
parse(flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexSettings.get)
225-
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
226-
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
225+
(settings \ "index.number_of_shards").extract[String] shouldBe "4"
226+
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
227+
}
228+
229+
test("create materialized view with index mappings") {
230+
sql(s"""
231+
| CREATE MATERIALIZED VIEW $testMvName
232+
| AS $testQuery
233+
| WITH (
234+
| index_mappings = '{ "_source": { "enabled": false } }',
235+
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
236+
| )
237+
|""".stripMargin)
238+
239+
// Check if the index setting option is set to OS index setting
240+
val flintIndexMetadataService =
241+
new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava))
242+
243+
val mappingsSourceEnabled = flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexMappingsSourceEnabled
244+
mappingsSourceEnabled shouldBe false
227245
}
228246

229247
test("create materialized view with full refresh") {

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala

+18
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
209209
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
210210
}
211211

212+
test("create skipping index with index mappings") {
213+
sql(s"""
214+
| CREATE SKIPPING INDEX ON $testTable
215+
| ( year PARTITION )
216+
| WITH (
217+
| index_mappings = '{ "_source": { "enabled": false } }',
218+
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
219+
| )
220+
|""".stripMargin)
221+
222+
// Check if the index setting option is set to OS index setting
223+
val flintIndexMetadataService =
224+
new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava))
225+
226+
val mappingsSourceEnabled = flintIndexMetadataService.getIndexMetadata(testIndex).indexMappingsSourceEnabled
227+
mappingsSourceEnabled shouldBe false
228+
}
229+
212230
Seq(
213231
"struct_col.field1.subfield VALUE_SET, struct_col.field2 MIN_MAX",
214232
"`struct_col.field1.subfield` VALUE_SET, `struct_col.field2` MIN_MAX", // ensure previous hack still works

0 commit comments

Comments
 (0)