diff --git a/docs/index.md b/docs/index.md index 7155396cc..1dc430140 100644 --- a/docs/index.md +++ b/docs/index.md @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement: + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `index_mappings`: A JSON string specifying additional OpenSearch index mappings, such as metadata fields (e.g., _source) or mapping parameters (e.g., enabled, index, etc.). This allows customizing certain parts of the index mappings. The base mappings are automatically generated; if unspecified, only the defaults will be applied. Refer to [OpenSearch metadata fields](https://docs.opensearch.org/docs/latest/field-types/metadata-fields/source/) and [mapping parameters](https://docs.opensearch.org/docs/latest/field-types/mapping-parameters/index/) for supported options. + `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' @@ -407,6 +408,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + index_mappings = '{ "_source": { "enabled": false } }', id_expression = "sha1(concat_ws('\0',startTime,status))", extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala index 765460da7..1886e7dee 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala @@ -7,11 +7,11 @@ package org.opensearch.flint.core.storage import java.util -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters._ import org.opensearch.client.RequestOptions import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse, PutMappingRequest} -import org.opensearch.common.xcontent.XContentType +import org.opensearch.common.xcontent.{XContentParser, XContentType} import org.opensearch.flint.common.FlintVersion import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} import org.opensearch.flint.core.FlintOptions @@ -98,8 +98,7 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions) override def deleteIndexMetadata(indexName: String): Unit = {} } -object FlintOpenSearchIndexMetadataService { - +object FlintOpenSearchIndexMetadataService extends Logging { def serialize(metadata: FlintMetadata): String = { serialize(metadata, true) } @@ -134,9 +133,19 @@ object FlintOpenSearchIndexMetadataService { optionalObjectField(builder, "properties", metadata.properties) } } + // Add _source field + val indexMappingsOpt = + Option(metadata.options.get("index_mappings")).map(_.asInstanceOf[String]) + val sourceEnabled = extractSourceEnabled(indexMappingsOpt) + if (!sourceEnabled) { + objectField(builder, "_source") { + builder.field("enabled", sourceEnabled) + } + } // Add properties (schema) field - builder.field("properties", metadata.schema) + val schema = mergeSchema(metadata.schema, metadata.options) + builder.field("properties", schema) }) } catch { case e: Exception => @@ -191,6 +200,7 @@ object FlintOpenSearchIndexMetadataService { } } } + case "_source" => parser.skipChildren() case "properties" => builder.schema(parser.map()) case _ => // Ignore other fields, for instance, dynamic. @@ -203,4 +213,170 @@ object FlintOpenSearchIndexMetadataService { throw new IllegalStateException("Failed to parse metadata JSON", e) } } + + def extractSourceEnabled(indexMappingsJsonOpt: Option[String]): Boolean = { + var sourceEnabled: Boolean = true + + indexMappingsJsonOpt.foreach { jsonStr => + try { + parseJson(jsonStr) { (parser, fieldName) => + fieldName match { + case "_source" => + parseObjectField(parser) { (parser, innerFieldName) => + innerFieldName match { + case "enabled" => + sourceEnabled = parser.booleanValue() + return sourceEnabled + case _ => // Ignore + } + } + case _ => // Ignore + } + } + } catch { + case e: Exception => + logError("Error extracting _source", e) + } + } + + sourceEnabled + } + + /** + * Merges the mapping parameters from FlintSparkIndexOptions into the existing schema. If the + * options contain mapping parameters that exist in allFieldTypes, those configurations are + * merged. + * + * @param allFieldTypes + * Map of field names to their type/configuration details + * @param options + * FlintMetadata containing potential mapping parameters + * @return + * Merged map with combined mapping parameters + */ + def mergeSchema( + allFieldTypes: java.util.Map[String, AnyRef], + options: java.util.Map[String, AnyRef]): java.util.Map[String, AnyRef] = { + val indexMappingsOpt = Option(options.get("index_mappings")).flatMap { + case s: String => Some(s) + case _ => None + } + + var result = new java.util.HashMap[String, AnyRef](allFieldTypes) + + // Track mappings from leaf name to configuration properties + var fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]() + + indexMappingsOpt.foreach { jsonStr => + try { + // Extract nested field configurations - key is the leaf name + parseJson(jsonStr) { (parser, fieldName) => + fieldName match { + case "_source" => + parser.skipChildren() // Skip _source section + + case "properties" => + // Process properties recursively to extract field configs + fieldConfigs = extractNestedProperties(parser) + + case _ => + parser.skipChildren() // Skip other fields + } + } + + // Apply extracted configurations to schema while preserving structure + val newResult = new java.util.HashMap[String, AnyRef]() + result.forEach { (fullFieldName, fieldType) => + val leafFieldName = extractLeafFieldName(fullFieldName) + + if (fieldConfigs.containsKey(leafFieldName)) { + // We have config for this leaf field name + fieldType match { + case existingConfig: java.util.Map[String, AnyRef] => + val mergedConfig = new java.util.HashMap[String, AnyRef](existingConfig) + + // Add/overwrite with new config values + fieldConfigs.get(leafFieldName).forEach { (k, v) => + mergedConfig.put(k, v) + } + + // Return the updated field with its original key + newResult.put(fullFieldName, mergedConfig) + + case _ => + // If field type isn't a map, keep it unchanged + newResult.put(fullFieldName, fieldType) + } + } else { + // No config for this field, keep it unchanged + newResult.put(fullFieldName, fieldType) + } + } + result = newResult + } catch { + case ex: Exception => + logError("Error merging schema", ex) + } + } + + result + } + + /** + * Recursively extracts mapping parameters from nested properties structure. Returns a map of + * field name to its configuration. + */ + private def extractNestedProperties( + parser: XContentParser): java.util.HashMap[String, java.util.Map[String, AnyRef]] = { + + val fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]() + + parseObjectField(parser) { (parser, fieldName) => + val fieldConfig = new java.util.HashMap[String, AnyRef]() + var hasNestedProperties = false + + parseObjectField(parser) { (parser, propName) => + if (propName == "properties") { + hasNestedProperties = true + val nestedConfigs = extractNestedProperties(parser) + nestedConfigs.forEach { (k, v) => + fieldConfigs.put(k, v) + } + } else { + val value = parser.currentToken() match { + case XContentParser.Token.VALUE_STRING => parser.text() + case XContentParser.Token.VALUE_NUMBER => parser.numberValue() + case XContentParser.Token.VALUE_BOOLEAN => parser.booleanValue() + case XContentParser.Token.VALUE_NULL => null + case _ => + parser.skipChildren() + null + } + + if (value != null) { + fieldConfig.put(propName, value.asInstanceOf[AnyRef]) + } + } + } + + if (!hasNestedProperties && !fieldConfig.isEmpty) { + fieldConfigs.put(fieldName, fieldConfig) + } + } + + fieldConfigs + } + + /** + * Extracts the leaf field name from a potentially nested field path. For example: + * "aws.vpc.count" -> "count" + */ + private def extractLeafFieldName(fullFieldPath: String): String = { + val lastDotIndex = fullFieldPath.lastIndexOf('.') + if (lastDotIndex >= 0) { + fullFieldPath.substring(lastDotIndex + 1) + } else { + fullFieldPath + } + } } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala index f1ed09531..cb0b204f1 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.core.storage +import java.util + import scala.collection.JavaConverters.mapAsJavaMapConverter import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson @@ -16,7 +18,7 @@ import org.scalatest.matchers.should.Matchers class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers { /** Test Flint index meta JSON string */ - val testMetadataJson: String = s""" + val testMetadataJsonWithoutIndexMapping: String = s""" | { | "_meta": { | "version": "${current()}", @@ -38,6 +40,133 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers | } |""".stripMargin + val testMetadataJsonWithIndexMappingSourceEnabledFalse: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"enabled\\": false } }" + | }, + | "properties": {} + | }, + | "_source": { + | "enabled": false + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonWithIndexMappingSourceEnabledTrue: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"enabled\\": true } }" + | }, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonOtherThanSource: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"boost\\": 1.0 }" + | }, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonOtherThanEnabled: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"includes\\": \\"meta.*\\" } }" + | }, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testMetadataJsonWithSourceEnabledFalseAndSchemaMerging: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "index_mappings": "{ \\"_source\\": { \\"enabled\\": false }, \\"properties\\": { \\"test_field\\": {\\"index\\": false} } }" + | }, + | "properties": {} + | }, + | "_source": { + | "enabled": false + | }, + | "properties": { + | "test_field": { + | "type": "os_type", + | "index": false + | } + | } + | } + |""".stripMargin + val testDynamic: String = s""" | { | "dynamic": "strict", @@ -77,7 +206,7 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers |""".stripMargin "constructor" should "deserialize the given JSON and assign parsed value to field" in { - Seq(testMetadataJson, testDynamic).foreach(mapping => { + Seq(testMetadataJsonWithoutIndexMapping, testDynamic).foreach(mapping => { val metadata = FlintOpenSearchIndexMetadataService.deserialize(mapping, testIndexSettingsJson) metadata.version shouldBe current() @@ -89,7 +218,74 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers }) } - "serialize" should "serialize all fields to JSON" in { + "serialize" should "serialize all fields (no _source) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithoutIndexMapping) + } + + "serialize" should "serialize all fields (include _source false) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val optionsMap: java.util.Map[String, AnyRef] = + Map[String, AnyRef]("index_mappings" -> """{ "_source": { "enabled": false } }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithIndexMappingSourceEnabledFalse) + } + + "serialize" should "serialize all fields (include _source true) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val optionsMap: java.util.Map[String, AnyRef] = + Map[String, AnyRef]("index_mappings" -> """{ "_source": { "enabled": true } }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithIndexMappingSourceEnabledTrue) + } + + "serialize" should "serialize all fields (include other things than _source) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val optionsMap: java.util.Map[String, AnyRef] = + Map[String, AnyRef]("index_mappings" -> """{ "boost": 1.0 }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonOtherThanSource) + } + + "serialize" should "serialize all fields (include other things than enabled) to JSON" in { val builder = new FlintMetadata.Builder builder.name("test_index") builder.kind("test_kind") @@ -97,8 +293,34 @@ class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + val optionsMap: java.util.Map[String, AnyRef] = Map[String, AnyRef]( + "index_mappings" -> """{ "_source": { "includes": "meta.*" } }""").asJava + + builder.options(optionsMap) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonOtherThanEnabled) + } + + "serialize" should "serialize all fields (include _source and schema merging) to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + val schema = Map[String, AnyRef]( + "test_field" -> Map("type" -> "os_type", "index" -> false).asJava).asJava + builder.schema(schema) + + val optionsMap: java.util.Map[String, AnyRef] = Map[String, AnyRef]( + "index_mappings" -> """{ "_source": { "enabled": false }, "properties": { "test_field": {"index": false} } }""").asJava + + builder.options(optionsMap) + val metadata = builder.build() - FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson(testMetadataJson) + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson( + testMetadataJsonWithSourceEnabledFalseAndSchemaMerging) } "serialize without spec" should "serialize all fields to JSON without adding _meta field" in { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 1ad88de6d..ab130b90c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -10,7 +10,7 @@ import java.util.{Collections, UUID} import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_MAPPINGS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser @@ -195,6 +195,7 @@ object FlintSparkIndexOptions { val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + val INDEX_MAPPINGS: OptionName.Value = Value("index_mappings") val ID_EXPRESSION: OptionName.Value = Value("id_expression") val EXTRA_OPTIONS: OptionName.Value = Value("extra_options") } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 901c3006c..f5127fb68 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -54,7 +54,6 @@ case class FlintSparkCoveringIndex( }.toArray } val schema = generateSchema(indexedColumns).asJava - val builder = metadataBuilder(this) .name(indexName) .source(tableName) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index e3b09661a..d849f7d7f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -64,7 +64,6 @@ case class FlintSparkMaterializedView( Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava }.toArray val schema = generateSchema(outputSchema).asJava - // Convert Scala Array to Java ArrayList for consistency with OpenSearch JSON parsing. // OpenSearch uses Jackson, which deserializes JSON arrays into ArrayLists. val sourceTablesProperty = new java.util.ArrayList[String](sourceTables.toSeq.asJava) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index b6f21e455..248a81858 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -66,7 +66,6 @@ case class FlintSparkSkippingIndex( .flatMap(_.outputSchema()) .toMap + (FILE_PATH_COLUMN -> "string") val schema = generateSchema(fieldTypes).asJava - metadataBuilder(this) .name(name()) .source(tableName) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0791f9b7a..cf0614694 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -11,8 +11,10 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined @@ -144,6 +146,57 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } + test("create covering index with index mappings _source") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable ( name ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false } }' + | ) + |""".stripMargin) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + } + + test("create covering index with index mappings schema merging") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable ( name ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": { "name": {"index": false} } }' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testFlintIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("name").asInstanceOf[java.util.Map[String, Any]] + countMap.get("index") shouldBe false + } + test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 71356bca6..9598afe2e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -13,8 +13,10 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -38,6 +40,15 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | GROUP BY TUMBLE(time, '10 Minutes') |""".stripMargin + private val testNestedQuery = + s""" + | SELECT + | window.start AS startTime, + | COUNT(*) AS `aws.vpc.count` + | FROM $testTable + | GROUP BY TUMBLE(time, '10 Minutes') + |""".stripMargin + override def beforeEach(): Unit = { super.beforeAll() createTimeSeriesTable(testTable) @@ -226,6 +237,96 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create materialized view with index mappings _source") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | index_mappings = '{ "_source": { "enabled": false } }' + | ) + |""".stripMargin) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + } + + test("create materialized view with index mappings schema merging") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": { "count": {"index": false} } }' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testFlintIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("count").asInstanceOf[java.util.Map[String, Any]] + countMap.get("index") shouldBe false + } + + test("create materialized view with index mappings nested schema merging") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testNestedQuery + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": {"aws":{"properties":{"vpc":{"properties":{"count":{"index":false}}}}}}}' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testFlintIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testFlintIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("aws").asInstanceOf[java.util.Map[String, Any]] + + val countMap2 = countMap.get("properties").asInstanceOf[java.util.Map[String, Any]] + val countMap3 = countMap2.get("vpc").asInstanceOf[java.util.Map[String, Any]] + val countMap4 = countMap3.get("properties").asInstanceOf[java.util.Map[String, Any]] + val countMap5 = countMap4.get("count").asInstanceOf[java.util.Map[String, Any]] + countMap5.get("index") shouldBe false + } + test("create materialized view with full refresh") { sql(s""" | CREATE MATERIALIZED VIEW $testMvName diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index a2a7c9799..f84083f60 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -157,7 +157,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", "checkpoint_location" -> checkpointDir.getAbsolutePath, - "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")), + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}", + "index_mappings" -> "{\"_source\": {\"enabled\": false}}")), testIndex) .create() @@ -176,7 +177,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "incremental_refresh": "false", | "refresh_interval": "1 Minute", | "checkpoint_location": "${checkpointDir.getAbsolutePath}", - | "index_settings": "{\\"number_of_shards\\": 3,\\"number_of_replicas\\": 2}" + | "index_settings": "{\\"number_of_shards\\": 3,\\"number_of_replicas\\": 2}", + | "index_mappings" = "{\\"_source\\": {\\"enabled\\": false}}" | } |""".stripMargin) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 2e5b4d2ab..3f30b8b3f 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -12,8 +12,10 @@ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -209,6 +211,59 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create skipping index with index mappings _source") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false } }' + | ) + |""".stripMargin) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + } + + test("create skipping index with index mappings schema merging") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | index_mappings = '{ "_source": { "enabled": false }, "properties": { "year": {"index": false} } }' + | ) + |""".stripMargin) + + val options = new FlintOptions(openSearchOptions.asJava) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(options) + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(testIndex) + val response = + openSearchClient.indices().get(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT) + + val mapping = response.getMappings.get(osIndexName) + val indexMappingsOpt = mapping.source.toString + val mappings = parse(indexMappingsOpt) + (mappings \ "_source" \ "enabled").extract[Boolean] shouldBe false + + val flintMetadata = + flintIndexMetadataService.getIndexMetadata(testIndex) + val schema = flintMetadata.schema + val javaMap = schema.asInstanceOf[java.util.HashMap[String, java.util.HashMap[String, Any]]] + + val countMap = javaMap.get("year").asInstanceOf[java.util.Map[String, Any]] + countMap.get("index") shouldBe false + } + Seq( "struct_col.field1.subfield VALUE_SET, struct_col.field2 MIN_MAX", "`struct_col.field1.subfield` VALUE_SET, `struct_col.field2` MIN_MAX", // ensure previous hack still works