Skip to content

Commit f9c059a

Browse files
Adding _source and schema merging to index_mappings (#1101)
* Fix antlr4 parser issues (#1094) * Fix antlr4 parser issues Signed-off-by: Lantao Jin <ltjin@amazon.com> * Case insensitive lexer Signed-off-by: Lantao Jin <ltjin@amazon.com> * revert useless change Signed-off-by: Lantao Jin <ltjin@amazon.com> * remove tokens file Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * adding _source to index_mappings Signed-off-by: Kai Huang <ahkcs@amazon.com> * syntax fix Signed-off-by: Kai Huang <ahkcs@amazon.com> * Apply scalafmt Signed-off-by: Kai Huang <ahkcs@amazon.com> * Added index_mapping as an option in index.md, applied scalafmtAll Signed-off-by: Kai Huang <ahkcs@amazon.com> * improve readability Signed-off-by: Kai Huang <ahkcs@amazon.com> * Removed index_mappings from FlintMetaData.scala, Modified index.md Signed-off-by: Kai Huang <ahkcs@amazon.com> * removed indexMappingsSourceEnabled from FlintMetadata.scala Signed-off-by: Kai Huang <ahkcs@amazon.com> * removed indexMappingsSourceEnabled from FlintMetadata.scala Signed-off-by: Kai Huang <ahkcs@amazon.com> * removed indexMappingsSourceEnabled from FlintMetadata.scala Signed-off-by: Kai Huang <ahkcs@amazon.com> * Removed indexMappingsSourceEnabled from FlintMetadata.scala and removed unnecessary code Signed-off-by: Kai Huang <ahkcs@amazon.com> * Added some test cases to test serialzie() and fixed some formatting issues Signed-off-by: Kai Huang <ahkcs@amazon.com> * Added some test cases for FlintOpenSearchIndexMetadataServiceSuite.scala Signed-off-by: Kai Huang <ahkcs@amazon.com> * Added schema merging to index_mappings, added some test cases Signed-off-by: Kai Huang <ahkcs@amazon.com> * updated test cases Signed-off-by: Kai Huang <ahkcs@amazon.com> * Minor format fix Signed-off-by: Kai Huang <ahkcs@amazon.com> * minor fixes Signed-off-by: Kai Huang <ahkcs@amazon.com> * added nested schema merging logic, moved mergeSchema to serialize, updated test cases, fixed some minor issues Signed-off-by: Kai Huang <ahkcs@amazon.com> * updated some comments Signed-off-by: Kai Huang <ahkcs@amazon.com> * fixed some formatting issues based on the comments Signed-off-by: Kai Huang <ahkcs@amazon.com> * fixed syntax issue Signed-off-by: Kai Huang <ahkcs@amazon.com> * syntax issue Signed-off-by: Kai Huang <ahkcs@amazon.com> * syntax issue Signed-off-by: Kai Huang <ahkcs@amazon.com> * fixed the FlintSparkSkippingIndexITSuite Signed-off-by: Kai Huang <ahkcs@amazon.com> * fixing schema merging limitation Signed-off-by: Kai Huang <ahkcs@amazon.com> * less scala/java conversion Signed-off-by: Kai Huang <ahkcs@amazon.com> * style fix Signed-off-by: Kai Huang <ahkcs@amazon.com> * fix unnecessary casting Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> Co-authored-by: Lantao Jin <ltjin@amazon.com> (cherry picked from commit 76d35e2) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent a1233a4 commit f9c059a

File tree

11 files changed

+627
-18
lines changed

11 files changed

+627
-18
lines changed

docs/index.md

+2
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
394394
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
395395
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
396396
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
397+
+ `index_mappings`: A JSON string specifying additional OpenSearch index mappings, such as metadata fields (e.g., _source) or mapping parameters (e.g., enabled, index, etc.). This allows customizing certain parts of the index mappings. The base mappings are automatically generated; if unspecified, only the defaults will be applied. Refer to [OpenSearch metadata fields](https://docs.opensearch.org/docs/latest/field-types/metadata-fields/source/) and [mapping parameters](https://docs.opensearch.org/docs/latest/field-types/mapping-parameters/index/) for supported options.
397398
+ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
398399
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'
399400

@@ -407,6 +408,7 @@ WITH (
407408
watermark_delay = '1 Second',
408409
output_mode = 'complete',
409410
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
411+
index_mappings = '{ "_source": { "enabled": false } }',
410412
id_expression = "sha1(concat_ws('\0',startTime,status))",
411413
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
412414
)

flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala

+181-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ package org.opensearch.flint.core.storage
77

88
import java.util
99

10-
import scala.collection.JavaConverters.mapAsJavaMapConverter
10+
import scala.collection.JavaConverters._
1111

1212
import org.opensearch.client.RequestOptions
1313
import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse, PutMappingRequest}
14-
import org.opensearch.common.xcontent.XContentType
14+
import org.opensearch.common.xcontent.{XContentParser, XContentType}
1515
import org.opensearch.flint.common.FlintVersion
1616
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
1717
import org.opensearch.flint.core.FlintOptions
@@ -98,8 +98,7 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions)
9898
override def deleteIndexMetadata(indexName: String): Unit = {}
9999
}
100100

101-
object FlintOpenSearchIndexMetadataService {
102-
101+
object FlintOpenSearchIndexMetadataService extends Logging {
103102
def serialize(metadata: FlintMetadata): String = {
104103
serialize(metadata, true)
105104
}
@@ -134,9 +133,19 @@ object FlintOpenSearchIndexMetadataService {
134133
optionalObjectField(builder, "properties", metadata.properties)
135134
}
136135
}
136+
// Add _source field
137+
val indexMappingsOpt =
138+
Option(metadata.options.get("index_mappings")).map(_.asInstanceOf[String])
139+
val sourceEnabled = extractSourceEnabled(indexMappingsOpt)
140+
if (!sourceEnabled) {
141+
objectField(builder, "_source") {
142+
builder.field("enabled", sourceEnabled)
143+
}
144+
}
137145

138146
// Add properties (schema) field
139-
builder.field("properties", metadata.schema)
147+
val schema = mergeSchema(metadata.schema, metadata.options)
148+
builder.field("properties", schema)
140149
})
141150
} catch {
142151
case e: Exception =>
@@ -191,6 +200,7 @@ object FlintOpenSearchIndexMetadataService {
191200
}
192201
}
193202
}
203+
case "_source" => parser.skipChildren()
194204
case "properties" =>
195205
builder.schema(parser.map())
196206
case _ => // Ignore other fields, for instance, dynamic.
@@ -203,4 +213,170 @@ object FlintOpenSearchIndexMetadataService {
203213
throw new IllegalStateException("Failed to parse metadata JSON", e)
204214
}
205215
}
216+
217+
def extractSourceEnabled(indexMappingsJsonOpt: Option[String]): Boolean = {
218+
var sourceEnabled: Boolean = true
219+
220+
indexMappingsJsonOpt.foreach { jsonStr =>
221+
try {
222+
parseJson(jsonStr) { (parser, fieldName) =>
223+
fieldName match {
224+
case "_source" =>
225+
parseObjectField(parser) { (parser, innerFieldName) =>
226+
innerFieldName match {
227+
case "enabled" =>
228+
sourceEnabled = parser.booleanValue()
229+
return sourceEnabled
230+
case _ => // Ignore
231+
}
232+
}
233+
case _ => // Ignore
234+
}
235+
}
236+
} catch {
237+
case e: Exception =>
238+
logError("Error extracting _source", e)
239+
}
240+
}
241+
242+
sourceEnabled
243+
}
244+
245+
/**
246+
* Merges the mapping parameters from FlintSparkIndexOptions into the existing schema. If the
247+
* options contain mapping parameters that exist in allFieldTypes, those configurations are
248+
* merged.
249+
*
250+
* @param allFieldTypes
251+
* Map of field names to their type/configuration details
252+
* @param options
253+
* FlintMetadata containing potential mapping parameters
254+
* @return
255+
* Merged map with combined mapping parameters
256+
*/
257+
def mergeSchema(
258+
allFieldTypes: java.util.Map[String, AnyRef],
259+
options: java.util.Map[String, AnyRef]): java.util.Map[String, AnyRef] = {
260+
val indexMappingsOpt = Option(options.get("index_mappings")).flatMap {
261+
case s: String => Some(s)
262+
case _ => None
263+
}
264+
265+
var result = new java.util.HashMap[String, AnyRef](allFieldTypes)
266+
267+
// Track mappings from leaf name to configuration properties
268+
var fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]()
269+
270+
indexMappingsOpt.foreach { jsonStr =>
271+
try {
272+
// Extract nested field configurations - key is the leaf name
273+
parseJson(jsonStr) { (parser, fieldName) =>
274+
fieldName match {
275+
case "_source" =>
276+
parser.skipChildren() // Skip _source section
277+
278+
case "properties" =>
279+
// Process properties recursively to extract field configs
280+
fieldConfigs = extractNestedProperties(parser)
281+
282+
case _ =>
283+
parser.skipChildren() // Skip other fields
284+
}
285+
}
286+
287+
// Apply extracted configurations to schema while preserving structure
288+
val newResult = new java.util.HashMap[String, AnyRef]()
289+
result.forEach { (fullFieldName, fieldType) =>
290+
val leafFieldName = extractLeafFieldName(fullFieldName)
291+
292+
if (fieldConfigs.containsKey(leafFieldName)) {
293+
// We have config for this leaf field name
294+
fieldType match {
295+
case existingConfig: java.util.Map[String, AnyRef] =>
296+
val mergedConfig = new java.util.HashMap[String, AnyRef](existingConfig)
297+
298+
// Add/overwrite with new config values
299+
fieldConfigs.get(leafFieldName).forEach { (k, v) =>
300+
mergedConfig.put(k, v)
301+
}
302+
303+
// Return the updated field with its original key
304+
newResult.put(fullFieldName, mergedConfig)
305+
306+
case _ =>
307+
// If field type isn't a map, keep it unchanged
308+
newResult.put(fullFieldName, fieldType)
309+
}
310+
} else {
311+
// No config for this field, keep it unchanged
312+
newResult.put(fullFieldName, fieldType)
313+
}
314+
}
315+
result = newResult
316+
} catch {
317+
case ex: Exception =>
318+
logError("Error merging schema", ex)
319+
}
320+
}
321+
322+
result
323+
}
324+
325+
/**
326+
* Recursively extracts mapping parameters from nested properties structure. Returns a map of
327+
* field name to its configuration.
328+
*/
329+
private def extractNestedProperties(
330+
parser: XContentParser): java.util.HashMap[String, java.util.Map[String, AnyRef]] = {
331+
332+
val fieldConfigs = new java.util.HashMap[String, java.util.Map[String, AnyRef]]()
333+
334+
parseObjectField(parser) { (parser, fieldName) =>
335+
val fieldConfig = new java.util.HashMap[String, AnyRef]()
336+
var hasNestedProperties = false
337+
338+
parseObjectField(parser) { (parser, propName) =>
339+
if (propName == "properties") {
340+
hasNestedProperties = true
341+
val nestedConfigs = extractNestedProperties(parser)
342+
nestedConfigs.forEach { (k, v) =>
343+
fieldConfigs.put(k, v)
344+
}
345+
} else {
346+
val value = parser.currentToken() match {
347+
case XContentParser.Token.VALUE_STRING => parser.text()
348+
case XContentParser.Token.VALUE_NUMBER => parser.numberValue()
349+
case XContentParser.Token.VALUE_BOOLEAN => parser.booleanValue()
350+
case XContentParser.Token.VALUE_NULL => null
351+
case _ =>
352+
parser.skipChildren()
353+
null
354+
}
355+
356+
if (value != null) {
357+
fieldConfig.put(propName, value.asInstanceOf[AnyRef])
358+
}
359+
}
360+
}
361+
362+
if (!hasNestedProperties && !fieldConfig.isEmpty) {
363+
fieldConfigs.put(fieldName, fieldConfig)
364+
}
365+
}
366+
367+
fieldConfigs
368+
}
369+
370+
/**
371+
* Extracts the leaf field name from a potentially nested field path. For example:
372+
* "aws.vpc.count" -> "count"
373+
*/
374+
private def extractLeafFieldName(fullFieldPath: String): String = {
375+
val lastDotIndex = fullFieldPath.lastIndexOf('.')
376+
if (lastDotIndex >= 0) {
377+
fullFieldPath.substring(lastDotIndex + 1)
378+
} else {
379+
fullFieldPath
380+
}
381+
}
206382
}

0 commit comments

Comments
 (0)