Skip to content

[Backport 0.7] Adding _source and schema merging to index_mappings #1155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `index_mappings`: A JSON string specifying additional OpenSearch index mappings, such as metadata fields (e.g., _source) or mapping parameters (e.g., enabled, index, etc.). This allows customizing certain parts of the index mappings. The base mappings are automatically generated; if unspecified, only the defaults will be applied. Refer to [OpenSearch metadata fields](https://docs.opensearch.org/docs/latest/field-types/metadata-fields/source/) and [mapping parameters](https://docs.opensearch.org/docs/latest/field-types/mapping-parameters/index/) for supported options.
+ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Expand All @@ -407,6 +408,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
index_mappings = '{ "_source": { "enabled": false } }',
id_expression = "sha1(concat_ws('\0',startTime,status))",
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package org.opensearch.flint.core.storage

import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters._

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse, PutMappingRequest}
import org.opensearch.common.xcontent.XContentType
import org.opensearch.common.xcontent.{XContentParser, XContentType}
import org.opensearch.flint.common.FlintVersion
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.core.FlintOptions
Expand Down Expand Up @@ -98,8 +98,7 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions)
override def deleteIndexMetadata(indexName: String): Unit = {}
}

object FlintOpenSearchIndexMetadataService {

object FlintOpenSearchIndexMetadataService extends Logging {
def serialize(metadata: FlintMetadata): String = {
serialize(metadata, true)
}
Expand Down Expand Up @@ -134,9 +133,19 @@ object FlintOpenSearchIndexMetadataService {
optionalObjectField(builder, "properties", metadata.properties)
}
}
// Add _source field
val indexMappingsOpt =
Option(metadata.options.get("index_mappings")).map(_.asInstanceOf[String])
val sourceEnabled = extractSourceEnabled(indexMappingsOpt)
if (!sourceEnabled) {
objectField(builder, "_source") {
builder.field("enabled", sourceEnabled)
}
}

// Add properties (schema) field
builder.field("properties", metadata.schema)
val schema = mergeSchema(metadata.schema, metadata.options)
builder.field("properties", schema)
})
} catch {
case e: Exception =>
Expand Down Expand Up @@ -191,6 +200,7 @@ object FlintOpenSearchIndexMetadataService {
}
}
}
case "_source" => parser.skipChildren()
case "properties" =>
builder.schema(parser.map())
case _ => // Ignore other fields, for instance, dynamic.
Expand All @@ -203,4 +213,170 @@ object FlintOpenSearchIndexMetadataService {
throw new IllegalStateException("Failed to parse metadata JSON", e)
}
}

def extractSourceEnabled(indexMappingsJsonOpt: Option[String]): Boolean = {
var sourceEnabled: Boolean = true

indexMappingsJsonOpt.foreach { jsonStr =>
try {
parseJson(jsonStr) { (parser, fieldName) =>
fieldName match {
case "_source" =>
parseObjectField(parser) { (parser, innerFieldName) =>
innerFieldName match {
case "enabled" =>
sourceEnabled = parser.booleanValue()
return sourceEnabled
case _ => // Ignore
}
}
case _ => // Ignore
}
}
} catch {
case e: Exception =>
logError("Error extracting _source", e)
}
}

sourceEnabled
}

/**
* Merges the mapping parameters from FlintSparkIndexOptions into the existing schema. If the
* options contain mapping parameters that exist in allFieldTypes, those configurations are
* merged.
*
* @param allFieldTypes
* Map of field names to their type/configuration details
* @param options
* FlintMetadata containing potential mapping parameters
* @return
* Merged map with combined mapping parameters
*/
def mergeSchema(
allFieldTypes: java.util.Map[String, AnyRef],
options: java.util.Map[String, AnyRef]): java.util.Map[String, AnyRef] = {
val indexMappingsOpt = Option(options.get("index_mappings")).flatMap {
case s: String => Some(s)
case _ => None
}

var result = new java.util.HashMap[String, AnyRef](allFieldTypes)

// Track mappings from leaf name to configuration properties
var fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]()

indexMappingsOpt.foreach { jsonStr =>
try {
// Extract nested field configurations - key is the leaf name
parseJson(jsonStr) { (parser, fieldName) =>
fieldName match {
case "_source" =>
parser.skipChildren() // Skip _source section

case "properties" =>
// Process properties recursively to extract field configs
fieldConfigs = extractNestedProperties(parser)

case _ =>
parser.skipChildren() // Skip other fields
}
}

// Apply extracted configurations to schema while preserving structure
val newResult = new java.util.HashMap[String, AnyRef]()
result.forEach { (fullFieldName, fieldType) =>
val leafFieldName = extractLeafFieldName(fullFieldName)

if (fieldConfigs.containsKey(leafFieldName)) {
// We have config for this leaf field name
fieldType match {
case existingConfig: java.util.Map[String, AnyRef] =>
val mergedConfig = new java.util.HashMap[String, AnyRef](existingConfig)

// Add/overwrite with new config values
fieldConfigs.get(leafFieldName).forEach { (k, v) =>
mergedConfig.put(k, v)
}

// Return the updated field with its original key
newResult.put(fullFieldName, mergedConfig)

case _ =>
// If field type isn't a map, keep it unchanged
newResult.put(fullFieldName, fieldType)
}
} else {
// No config for this field, keep it unchanged
newResult.put(fullFieldName, fieldType)
}
}
result = newResult
} catch {
case ex: Exception =>
logError("Error merging schema", ex)
}
}

result
}

/**
* Recursively extracts mapping parameters from nested properties structure. Returns a map of
* field name to its configuration.
*/
private def extractNestedProperties(
parser: XContentParser): java.util.HashMap[String, java.util.Map[String, AnyRef]] = {

val fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]()

parseObjectField(parser) { (parser, fieldName) =>
val fieldConfig = new java.util.HashMap[String, AnyRef]()
var hasNestedProperties = false

parseObjectField(parser) { (parser, propName) =>
if (propName == "properties") {
hasNestedProperties = true
val nestedConfigs = extractNestedProperties(parser)
nestedConfigs.forEach { (k, v) =>
fieldConfigs.put(k, v)
}
} else {
val value = parser.currentToken() match {
case XContentParser.Token.VALUE_STRING => parser.text()
case XContentParser.Token.VALUE_NUMBER => parser.numberValue()
case XContentParser.Token.VALUE_BOOLEAN => parser.booleanValue()
case XContentParser.Token.VALUE_NULL => null
case _ =>
parser.skipChildren()
null
}

if (value != null) {
fieldConfig.put(propName, value.asInstanceOf[AnyRef])
}
}
}

if (!hasNestedProperties && !fieldConfig.isEmpty) {
fieldConfigs.put(fieldName, fieldConfig)
}
}

fieldConfigs
}

/**
* Extracts the leaf field name from a potentially nested field path. For example:
* "aws.vpc.count" -> "count"
*/
private def extractLeafFieldName(fullFieldPath: String): String = {
val lastDotIndex = fullFieldPath.lastIndexOf('.')
if (lastDotIndex >= 0) {
fullFieldPath.substring(lastDotIndex + 1)
} else {
fullFieldPath
}
}
}
Loading
Loading