Skip to content

Adding _source and schema merging to index_mappings #1101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
518e1c4
Fix antlr4 parser issues (#1094)
LantaoJin Apr 2, 2025
cab2bc6
adding _source to index_mappings
ahkcs Apr 9, 2025
985aa67
syntax fix
ahkcs Apr 11, 2025
729f560
Apply scalafmt
ahkcs Apr 14, 2025
62acaa4
Added index_mapping as an option in index.md, applied scalafmtAll
ahkcs Apr 14, 2025
388efda
improve readability
ahkcs Apr 14, 2025
453f389
Removed index_mappings from FlintMetaData.scala, Modified index.md
ahkcs Apr 15, 2025
2d6b207
removed indexMappingsSourceEnabled from FlintMetadata.scala
ahkcs Apr 15, 2025
7018252
removed indexMappingsSourceEnabled from FlintMetadata.scala
ahkcs Apr 15, 2025
9574fe4
removed indexMappingsSourceEnabled from FlintMetadata.scala
ahkcs Apr 15, 2025
2e2f75e
Removed indexMappingsSourceEnabled from FlintMetadata.scala and remov…
ahkcs Apr 15, 2025
4c60e6d
Added some test cases to test serialzie() and fixed some formatting i…
ahkcs Apr 16, 2025
50e822d
Added some test cases for FlintOpenSearchIndexMetadataServiceSuite.scala
ahkcs Apr 16, 2025
567883b
Added schema merging to index_mappings, added some test cases
ahkcs Apr 16, 2025
3d77b65
updated test cases
ahkcs Apr 18, 2025
cefb025
Minor format fix
ahkcs Apr 18, 2025
fb33ac0
minor fixes
ahkcs Apr 21, 2025
e24846f
added nested schema merging logic, moved mergeSchema to serialize, up…
ahkcs Apr 24, 2025
cc42e9d
updated some comments
ahkcs Apr 24, 2025
9609c5f
fixed some formatting issues based on the comments
ahkcs May 5, 2025
dac419c
fixed syntax issue
ahkcs May 5, 2025
c4f1f3a
syntax issue
ahkcs May 6, 2025
b8618f2
syntax issue
ahkcs May 6, 2025
2e91150
fixed the FlintSparkSkippingIndexITSuite
ahkcs May 6, 2025
3daf1a5
fixing schema merging limitation
ahkcs May 8, 2025
9f2532d
less scala/java conversion
ahkcs May 12, 2025
200e7b5
style fix
ahkcs May 12, 2025
db83f0f
fix unnecessary casting
ahkcs May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `index_mappings`: A JSON string specifying additional OpenSearch index mappings, such as metadata fields (e.g., _source) or mapping parameters (e.g., enabled, index, etc.). This allows customizing certain parts of the index mappings. The base mappings are automatically generated; if unspecified, only the defaults will be applied. Refer to [OpenSearch metadata fields](https://docs.opensearch.org/docs/latest/field-types/metadata-fields/source/) and [mapping parameters](https://docs.opensearch.org/docs/latest/field-types/mapping-parameters/index/) for supported options.
+ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Expand All @@ -407,6 +408,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
index_mappings = '{ "_source": { "enabled": false } }',
id_expression = "sha1(concat_ws('\0',startTime,status))",
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ package org.opensearch.flint.core.storage

import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse, PutMappingRequest}
import org.opensearch.common.xcontent.XContentType
import org.opensearch.common.xcontent.{XContentParser, XContentType}
import org.opensearch.flint.common.FlintVersion
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.IRestHighLevelClient
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.slf4j.LoggerFactory

import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -99,7 +101,7 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions)
}

object FlintOpenSearchIndexMetadataService {

private val logger = LoggerFactory.getLogger(this.getClass)
def serialize(metadata: FlintMetadata): String = {
serialize(metadata, true)
}
Expand Down Expand Up @@ -134,9 +136,21 @@ object FlintOpenSearchIndexMetadataService {
optionalObjectField(builder, "properties", metadata.properties)
}
}
// Add _source field
val indexMappingsOpt =
Option(metadata.options.get("index_mappings")).map(_.asInstanceOf[String])
val sourceEnabled = extractSourceEnabled(indexMappingsOpt)
if (!sourceEnabled) {
objectField(builder, "_source") {
builder.field("enabled", sourceEnabled)
}
}

// Add properties (schema) field
builder.field("properties", metadata.schema)
val tempSchema = metadata.schema.asScala.toMap
val tempOptions = metadata.options.asScala.toMap
val schema = mergeSchema(tempSchema, tempOptions).asJava
builder.field("properties", schema)
})
} catch {
case e: Exception =>
Expand Down Expand Up @@ -191,6 +205,7 @@ object FlintOpenSearchIndexMetadataService {
}
}
}
case "_source" => parser.skipChildren()
case "properties" =>
builder.schema(parser.map())
case _ => // Ignore other fields, for instance, dynamic.
Expand All @@ -203,4 +218,169 @@ object FlintOpenSearchIndexMetadataService {
throw new IllegalStateException("Failed to parse metadata JSON", e)
}
}

def extractSourceEnabled(indexMappingsJsonOpt: Option[String]): Boolean = {
var sourceEnabled: Boolean = true

indexMappingsJsonOpt.foreach { jsonStr =>
try {
parseJson(jsonStr) { (parser, fieldName) =>
fieldName match {
case "_source" =>
parseObjectField(parser) { (parser, innerFieldName) =>
innerFieldName match {
case "enabled" =>
sourceEnabled = parser.booleanValue()
return sourceEnabled
case _ => // Ignore
}
}
case _ => // Ignore
}
}
} catch {
case _: Exception => // Swallow
}
}

sourceEnabled
}

/**
* Merges the mapping parameters from FlintSparkIndexOptions into the existing schema. If the
* options contain mapping parameters that exist in allFieldTypes, those configurations are
* merged.
*
* @param allFieldTypes
* Map of field names to their type/configuration details
* @param options
* FlintMetadata containing potential mapping parameters
* @return
* Merged map with combined mapping parameters
*/
def mergeSchema(
allFieldTypes: Map[String, AnyRef],
options: Map[String, AnyRef]): Map[String, AnyRef] = {
val indexMappingsOpt = options.get("index_mappings").flatMap {
case s: String => Some(s)
case _ => None
}

var result = allFieldTypes

// Track mappings from leaf name to configuration properties
var fieldConfigs = Map.empty[String, Map[String, AnyRef]]

indexMappingsOpt.foreach { jsonStr =>
try {
// Extract nested field configurations - key is the leaf name
parseJson(jsonStr) { (parser, fieldName) =>
fieldName match {
case "_source" =>
parser.skipChildren() // Skip _source section

case "properties" =>
// Process properties recursively to extract field configs
fieldConfigs = extractNestedProperties(parser)

case _ =>
parser.skipChildren() // Skip other fields
}
}

// Apply extracted configurations to schema while preserving structure
result = result.map { case (fullFieldName, fieldType) =>
val leafFieldName = extractLeafFieldName(fullFieldName)

if (fieldConfigs.contains(leafFieldName)) {
// We have config for this leaf field name
fieldType match {
case existingConfig: java.util.Map[_, _] =>
val mergedConfig = new java.util.HashMap[String, AnyRef](
existingConfig.asInstanceOf[java.util.Map[String, AnyRef]])

// Add/overwrite with new config values
fieldConfigs(leafFieldName).foreach { case (k, v) =>
mergedConfig.put(k, v)
}

// Return the updated field with its original key
(fullFieldName, mergedConfig)

case _ =>
// If field type isn't a map, keep it unchanged
(fullFieldName, fieldType)
}
} else {
// No config for this field, keep it unchanged
(fullFieldName, fieldType)
}
}
} catch {
case ex: Exception =>
logger.error(s"Error merging schema: ${ex.getMessage}")
}
}

result
}

/**
* Recursively extracts mapping parameters from nested properties structure. Returns a map of
* field name to its configuration.
*/
private def extractNestedProperties(
parser: XContentParser): Map[String, Map[String, AnyRef]] = {
var fieldConfigs = Map.empty[String, Map[String, AnyRef]]

parseObjectField(parser) { (parser, fieldName) =>
var fieldConfig = Map.empty[String, AnyRef]
var hasNestedProperties = false

parseObjectField(parser) { (parser, propName) =>
propName match {
case "properties" =>
// This field has nested properties - recurse
hasNestedProperties = true
val nestedConfigs = extractNestedProperties(parser)
fieldConfigs ++= nestedConfigs

case "type" =>
fieldConfig += ("type" -> parser.text().asInstanceOf[AnyRef])

case "format" =>
fieldConfig += ("format" -> parser.text().asInstanceOf[AnyRef])

case "index" =>
fieldConfig += ("index" -> java.lang.Boolean
.valueOf(parser.booleanValue())
.asInstanceOf[AnyRef])

case _ =>
// Skip any unrecognized properties
parser.skipChildren()
}
}

// If this is a leaf field (no nested properties), add its config
if (!hasNestedProperties && fieldConfig.nonEmpty) {
fieldConfigs += (fieldName -> fieldConfig)
}
}

fieldConfigs
}

/**
* Extracts the leaf field name from a potentially nested field path. For example:
* "aws.vpc.count" -> "count"
*/
private def extractLeafFieldName(fullFieldPath: String): String = {
val lastDotIndex = fullFieldPath.lastIndexOf('.')
if (lastDotIndex >= 0) {
fullFieldPath.substring(lastDotIndex + 1)
} else {
fullFieldPath
}
}
}
Loading
Loading