From f9c059a6b57bc3454950704ec235909782f30a47 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 13 May 2025 17:28:43 +0000 Subject: [PATCH] Adding _source and schema merging to index_mappings (#1101) * Fix antlr4 parser issues (#1094) * Fix antlr4 parser issues Signed-off-by: Lantao Jin * Case insensitive lexer Signed-off-by: Lantao Jin * revert useless change Signed-off-by: Lantao Jin * remove tokens file Signed-off-by: Lantao Jin --------- Signed-off-by: Lantao Jin Signed-off-by: Kai Huang * adding _source to index_mappings Signed-off-by: Kai Huang * syntax fix Signed-off-by: Kai Huang * Apply scalafmt Signed-off-by: Kai Huang * Added index_mapping as an option in index.md, applied scalafmtAll Signed-off-by: Kai Huang * improve readability Signed-off-by: Kai Huang * Removed index_mappings from FlintMetaData.scala, Modified index.md Signed-off-by: Kai Huang * removed indexMappingsSourceEnabled from FlintMetadata.scala Signed-off-by: Kai Huang * removed indexMappingsSourceEnabled from FlintMetadata.scala Signed-off-by: Kai Huang * removed indexMappingsSourceEnabled from FlintMetadata.scala Signed-off-by: Kai Huang * Removed indexMappingsSourceEnabled from FlintMetadata.scala and removed unnecessary code Signed-off-by: Kai Huang * Added some test cases to test serialzie() and fixed some formatting issues Signed-off-by: Kai Huang * Added some test cases for FlintOpenSearchIndexMetadataServiceSuite.scala Signed-off-by: Kai Huang * Added schema merging to index_mappings, added some test cases Signed-off-by: Kai Huang * updated test cases Signed-off-by: Kai Huang * Minor format fix Signed-off-by: Kai Huang * minor fixes Signed-off-by: Kai Huang * added nested schema merging logic, moved mergeSchema to serialize, updated test cases, fixed some minor issues Signed-off-by: Kai Huang * updated some comments Signed-off-by: Kai Huang * fixed some formatting issues based on the comments Signed-off-by: Kai Huang * fixed syntax issue Signed-off-by: Kai Huang * syntax issue Signed-off-by: Kai Huang * syntax issue Signed-off-by: Kai Huang * fixed the FlintSparkSkippingIndexITSuite Signed-off-by: Kai Huang * fixing schema merging limitation Signed-off-by: Kai Huang * less scala/java conversion Signed-off-by: Kai Huang * style fix Signed-off-by: Kai Huang * fix unnecessary casting Signed-off-by: Kai Huang --------- Signed-off-by: Lantao Jin Signed-off-by: Kai Huang Co-authored-by: Lantao Jin (cherry picked from commit 76d35e23b2598e6c73960ca7e6a6013ca8bbd19e) Signed-off-by: github-actions[bot] --- docs/index.md | 2 + .../FlintOpenSearchIndexMetadataService.scala | 186 +++++++++++++- ...tOpenSearchIndexMetadataServiceSuite.scala | 230 +++++++++++++++++- .../flint/spark/FlintSparkIndexOptions.scala | 3 +- .../covering/FlintSparkCoveringIndex.scala | 1 - .../spark/mv/FlintSparkMaterializedView.scala | 1 - .../skipping/FlintSparkSkippingIndex.scala | 1 - .../FlintSparkCoveringIndexSqlITSuite.scala | 55 ++++- ...FlintSparkMaterializedViewSqlITSuite.scala | 103 +++++++- .../FlintSparkSkippingIndexITSuite.scala | 6 +- .../FlintSparkSkippingIndexSqlITSuite.scala | 57 ++++- 11 files changed, 627 insertions(+), 18 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7155396cc..1dc430140 100644 --- a/docs/index.md +++ b/docs/index.md @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement: + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `index_mappings`: A JSON string specifying additional OpenSearch index mappings, such as metadata fields (e.g., _source) or mapping parameters (e.g., enabled, index, etc.). This allows customizing certain parts of the index mappings. The base mappings are automatically generated; if unspecified, only the defaults will be applied. Refer to [OpenSearch metadata fields](https://docs.opensearch.org/docs/latest/field-types/metadata-fields/source/) and [mapping parameters](https://docs.opensearch.org/docs/latest/field-types/mapping-parameters/index/) for supported options. + `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' @@ -407,6 +408,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + index_mappings = '{ "_source": { "enabled": false } }', id_expression = "sha1(concat_ws('\0',startTime,status))", extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala index 765460da7..1886e7dee 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala @@ -7,11 +7,11 @@ package org.opensearch.flint.core.storage import java.util -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters._ import org.opensearch.client.RequestOptions import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse, PutMappingRequest} -import org.opensearch.common.xcontent.XContentType +import org.opensearch.common.xcontent.{XContentParser, XContentType} import org.opensearch.flint.common.FlintVersion import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} import org.opensearch.flint.core.FlintOptions @@ -98,8 +98,7 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions) override def deleteIndexMetadata(indexName: String): Unit = {} } -object FlintOpenSearchIndexMetadataService { - +object FlintOpenSearchIndexMetadataService extends Logging { def serialize(metadata: FlintMetadata): String = { serialize(metadata, true) } @@ -134,9 +133,19 @@ object FlintOpenSearchIndexMetadataService { optionalObjectField(builder, "properties", metadata.properties) } } + // Add _source field + val indexMappingsOpt = + Option(metadata.options.get("index_mappings")).map(_.asInstanceOf[String]) + val sourceEnabled = extractSourceEnabled(indexMappingsOpt) + if (!sourceEnabled) { + objectField(builder, "_source") { + builder.field("enabled", sourceEnabled) + } + } // Add properties (schema) field - builder.field("properties", metadata.schema) + val schema = mergeSchema(metadata.schema, metadata.options) + builder.field("properties", schema) }) } catch { case e: Exception => @@ -191,6 +200,7 @@ object FlintOpenSearchIndexMetadataService { } } } + case "_source" => parser.skipChildren() case "properties" => builder.schema(parser.map()) case _ => // Ignore other fields, for instance, dynamic. @@ -203,4 +213,170 @@ object FlintOpenSearchIndexMetadataService { throw new IllegalStateException("Failed to parse metadata JSON", e) } } + + def extractSourceEnabled(indexMappingsJsonOpt: Option[String]): Boolean = { + var sourceEnabled: Boolean = true + + indexMappingsJsonOpt.foreach { jsonStr => + try { + parseJson(jsonStr) { (parser, fieldName) => + fieldName match { + case "_source" => + parseObjectField(parser) { (parser, innerFieldName) => + innerFieldName match { + case "enabled" => + sourceEnabled = parser.booleanValue() + return sourceEnabled + case _ => // Ignore + } + } + case _ => // Ignore + } + } + } catch { + case e: Exception => + logError("Error extracting _source", e) + } + } + + sourceEnabled + } + + /** + * Merges the mapping parameters from FlintSparkIndexOptions into the existing schema. If the + * options contain mapping parameters that exist in allFieldTypes, those configurations are + * merged. + * + * @param allFieldTypes + * Map of field names to their type/configuration details + * @param options + * FlintMetadata containing potential mapping parameters + * @return + * Merged map with combined mapping parameters + */ + def mergeSchema( + allFieldTypes: java.util.Map[String, AnyRef], + options: java.util.Map[String, AnyRef]): java.util.Map[String, AnyRef] = { + val indexMappingsOpt = Option(options.get("index_mappings")).flatMap { + case s: String => Some(s) + case _ => None + } + + var result = new java.util.HashMap[String, AnyRef](allFieldTypes) + + // Track mappings from leaf name to configuration properties + var fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]() + + indexMappingsOpt.foreach { jsonStr => + try { + // Extract nested field configurations - key is the leaf name + parseJson(jsonStr) { (parser, fieldName) => + fieldName match { + case "_source" => + parser.skipChildren() // Skip _source section + + case "properties" => + // Process properties recursively to extract field configs + fieldConfigs = extractNestedProperties(parser) + + case _ => + parser.skipChildren() // Skip other fields + } + } + + // Apply extracted configurations to schema while preserving structure + val newResult = new java.util.HashMap[String, AnyRef]() + result.forEach { (fullFieldName, fieldType) => + val leafFieldName = extractLeafFieldName(fullFieldName) + + if (fieldConfigs.containsKey(leafFieldName)) { + // We have config for this leaf field name + fieldType match { + case existingConfig: java.util.Map[String, AnyRef] => + val mergedConfig = new java.util.HashMap[String, AnyRef](existingConfig) + + // Add/overwrite with new config values + fieldConfigs.get(leafFieldName).forEach { (k, v) => + mergedConfig.put(k, v) + } + + // Return the updated field with its original key + newResult.put(fullFieldName, mergedConfig) + + case _ => + // If field type isn't a map, keep it unchanged + newResult.put(fullFieldName, fieldType) + } + } else { + // No config for this field, keep it unchanged + newResult.put(fullFieldName, fieldType) + } + } + result = newResult + } catch { + case ex: Exception => + logError("Error merging schema", ex) + } + } + + result + } + + /** + * Recursively extracts mapping parameters from nested properties structure. Returns a map of + * field name to its configuration. + */ + private def extractNestedProperties( + parser: XContentParser): java.util.HashMap[String, java.util.Map[String, AnyRef]] = { + + val fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]() + + parseObjectField(parser) { (parser, fieldName) => + val fieldConfig = new java.util.HashMap[String, AnyRef]() + var hasNestedProperties = false + + parseObjectField(parser) { (parser, propName) => + if (propName == "properties") { + hasNestedProperties = true + val nestedConfigs = extractNestedProperties(parser) + nestedConfigs.forEach { (k, v) => + fieldConfigs.put(k, v) + } + } else { + val value = parser.currentToken() match { + case XContentParser.Token.VALUE_STRING => parser.text() + case XContentParser.Token.VALUE_NUMBER => parser.numberValue() + case XContentParser.Token.VALUE_BOOLEAN => parser.booleanValue() + case XContentParser.Token.VALUE_NULL => null + case _ => + parser.skipChildren() + null + } + + if (value != null) { + fieldConfig.put(propName, value.asInstanceOf[AnyRef]) + } + } + } + + if (!hasNestedProperties && !fieldConfig.isEmpty) { + fieldConfigs.put(fieldName, fieldConfig) + } + } + + fieldConfigs + } + + /** + * Extracts the leaf field name from a potentially nested field path. For example: + * "aws.vpc.count" -> "count" + */ + private def extractLeafFieldName(fullFieldPath: String): String = { + val lastDotIndex = fullFieldPath.lastIndexOf('.') + if (lastDotIndex >= 0) { + fullFieldPath.substring(lastDotIndex + 1) + } else { + fullFieldPath + } + } } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala index f1ed09531..cb0b204f1 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.core.storage +import java.util + import scala.collection.JavaConverters.mapAsJavaMapConverter import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson @@ -16,7 +18,7 @@ import org.scalatest.matchers.should.Matchers class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers { /** Test Flint index meta JSON string */ - val testMetadataJson: String = s""" + val testMetadataJsonWithoutIndexMapping: String = s""" | { | "_meta": { | "version": "${current()}", @@ -38,6 +40,133 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers | } |""".stripMargin + val testMetadataJsonWithIndexMappingSourceEnabledFalse: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"enabled\\": false } }" + | }, + | "properties": {} + | }, + | "_source": { + | "enabled": false + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonWithIndexMappingSourceEnabledTrue: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"enabled\\": true } }" + | }, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonOtherThanSource: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"boost\\": 1.0 }" + | }, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonOtherThanEnabled: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"includes\\": \\"meta.*\\" } }" + | }, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonWithSourceEnabledFalseAndSchemaMerging: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"enabled\\": false }, \\"properties\\": { \\"test_field\\": {\\"index\\": false} } }" + | }, + | "properties": {} + | }, + | "_source": { + | "enabled": false + | }, + | "properties": { + | "test_field": { + | "type": "os_type", + | "index": false + | } + | } + | } + |""".stripMargin + val testDynamic: String = s""" | { | "dynamic": "strict", @@ -77,7 +206,7 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers |""".stripMargin "constructor" should "deserialize the given JSON and assign parsed value to field" in { - Seq(testMetadataJson, testDynamic).foreach(mapping => { + Seq(testMetadataJsonWithoutIndexMapping, testDynamic).foreach(mapping => { val metadata = FlintOpenSearchIndexMetadataService.deserialize(mapping, testIndexSettingsJson) metadata.version shouldBe current() @@ -89,7 +218,74 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers }) } - "serialize" should "serialize all fields to JSON" in { + "serialize" should "serialize all fields (no _source) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithoutIndexMapping) + } + + "serialize" should "serialize all fields (include _source false) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val optionsMap: java.util.Map[String, AnyRef] = + Map[String, AnyRef]("index_mappings" -> """{ "_source": { "enabled": false } }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithIndexMappingSourceEnabledFalse) + } + + "serialize" should "serialize all fields (include _source true) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val optionsMap: java.util.Map[String, AnyRef] = + Map[String, AnyRef]("index_mappings" -> """{ "_source": { "enabled": true } }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithIndexMappingSourceEnabledTrue) + } + + "serialize" should "serialize all fields (include other things than _source) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val optionsMap: java.util.Map[String, AnyRef] = + Map[String, AnyRef]("index_mappings" -> """{ "boost": 1.0 }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonOtherThanSource) + } + + "serialize" should "serialize all fields (include other things than enabled) to JSON" in { val builder = new FlintMetadata.Builder builder.name("test_index") builder.kind("test_kind") @@ -97,8 +293,34 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + val optionsMap: java.util.Map[String, AnyRef] = Map[String, AnyRef]( + "index_mappings" -> """{ "_source": { "includes": "meta.*" } }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonOtherThanEnabled) + } + + "serialize" should "serialize all fields (include _source and schema merging) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + val schema = Map[String, AnyRef]( + "test_field" -> Map("type" -> "os_type", "index" -> false).asJava).asJava + builder.schema(schema) + + val optionsMap: java.util.Map[String, AnyRef] = Map[String, AnyRef]( + "index_mappings" -> """{ "_source": { "enabled": false }, "properties": { "test_field": {"index": false} } }""").asJava + + builder.options(optionsMap) + val metadata = builder.build() - FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson(testMetadataJson) + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithSourceEnabledFalseAndSchemaMerging) } "serialize without spec" should "serialize all fields to JSON without adding _meta field" in { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 1ad88de6d..ab130b90c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -10,7 +10,7 @@ import java.util.{Collections, UUID} import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_MAPPINGS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser @@ -195,6 +195,7 @@ object FlintSparkIndexOptions { val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + val INDEX_MAPPINGS: OptionName.Value = Value("index_mappings") val ID_EXPRESSION: OptionName.Value = Value("id_expression") val EXTRA_OPTIONS: OptionName.Value = Value("extra_options") } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 901c3006c..f5127fb68 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -54,7 +54,6 @@ case class FlintSparkCoveringIndex( }.toArray } val schema = generateSchema(indexedColumns).asJava - val builder = metadataBuilder(this) .name(indexName) .source(tableName) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index e3b09661a..d849f7d7f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -64,7 +64,6 @@ case class FlintSparkMaterializedView( Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava }.toArray val schema = generateSchema(outputSchema).asJava - // Convert Scala Array to Java ArrayList for consistency with OpenSearch JSON parsing. // OpenSearch uses Jackson, which deserializes JSON arrays into ArrayLists. val sourceTablesProperty = new java.util.ArrayList[String](sourceTables.toSeq.asJava) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index b6f21e455..248a81858 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -66,7 +66,6 @@ case class FlintSparkSkippingIndex( .flatMap(_.outputSchema()) .toMap + (FILE_PATH_COLUMN -> "string") val schema = generateSchema(fieldTypes).asJava - metadataBuilder(this) .name(name()) .source(tableName) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0791f9b7a..cf0614694 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -11,8 +11,10 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined @@ -144,6 +146,57 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } + test("create covering index with index mappings _source") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable ( name ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false } }' + | ) + |""".stripMargin) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + } + + test("create covering index with index mappings schema merging") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable ( name ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": { "name": {"index": false} } }' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testFlintIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("name").asInstanceOf[java.util.Map[String, Any]] + countMap.get("index") shouldBe false + } + test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 71356bca6..9598afe2e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -13,8 +13,10 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -38,6 +40,15 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | GROUP BY TUMBLE(time, '10 Minutes') |""".stripMargin + private val testNestedQuery = + s""" + | SELECT + | window.start AS startTime, + | COUNT(*) AS `aws.vpc.count` + | FROM $testTable + | GROUP BY TUMBLE(time, '10 Minutes') + |""".stripMargin + override def beforeEach(): Unit = { super.beforeAll() createTimeSeriesTable(testTable) @@ -226,6 +237,96 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create materialized view with index mappings _source") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | index_mappings = '{ "_source": { "enabled": false } }' + | ) + |""".stripMargin) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + } + + test("create materialized view with index mappings schema merging") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": { "count": {"index": false} } }' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testFlintIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("count").asInstanceOf[java.util.Map[String, Any]] + countMap.get("index") shouldBe false + } + + test("create materialized view with index mappings nested schema merging") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testNestedQuery + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": {"aws":{"properties":{"vpc":{"properties":{"count":{"index":false}}}}}}}' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testFlintIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("aws").asInstanceOf[java.util.Map[String, Any]] + + val countMap2 = countMap.get("properties").asInstanceOf[java.util.Map[String, Any]] + val countMap3 = countMap2.get("vpc").asInstanceOf[java.util.Map[String, Any]] + val countMap4 = countMap3.get("properties").asInstanceOf[java.util.Map[String, Any]] + val countMap5 = countMap4.get("count").asInstanceOf[java.util.Map[String, Any]] + countMap5.get("index") shouldBe false + } + test("create materialized view with full refresh") { sql(s""" | CREATE MATERIALIZED VIEW $testMvName diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index a2a7c9799..f84083f60 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -157,7 +157,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", "checkpoint_location" -> checkpointDir.getAbsolutePath, - "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")), + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}", + "index_mappings" -> "{\"_source\": {\"enabled\": false}}")), testIndex) .create() @@ -176,7 +177,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "incremental_refresh": "false", | "refresh_interval": "1 Minute", | "checkpoint_location": "${checkpointDir.getAbsolutePath}", - | "index_settings": "{\\"number_of_shards\\": 3,\\"number_of_replicas\\": 2}" + | "index_settings": "{\\"number_of_shards\\": 3,\\"number_of_replicas\\": 2}", + | "index_mappings" = "{\\"_source\\": {\\"enabled\\": false}}" | } |""".stripMargin) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 2e5b4d2ab..3f30b8b3f 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -12,8 +12,10 @@ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -209,6 +211,59 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create skipping index with index mappings _source") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false } }' + | ) + |""".stripMargin) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + } + + test("create skipping index with index mappings schema merging") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": { "year": {"index": false} } }' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("year").asInstanceOf[java.util.Map[String, Any]] + countMap.get("index") shouldBe false + } + Seq( "struct_col.field1.subfield VALUE_SET, struct_col.field2 MIN_MAX", "`struct_col.field1.subfield` VALUE_SET, `struct_col.field2` MIN_MAX", // ensure previous hack still works