@@ -2454,8 +2454,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
24542454 })
24552455 }
24562456
2457- @ Test
2458- def testNestedFieldPartition (): Unit = {
2457+ @ ParameterizedTest
2458+ @ CsvSource (Array (" COW" , " MOR" ))
2459+ def testNestedFieldPartition (tableType : String ): Unit = {
24592460 // Define schema with nested_record containing level field
24602461 val nestedSchema = StructType (Seq (
24612462 StructField (" nested_int" , IntegerType , nullable = false ),
@@ -2473,48 +2474,180 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
24732474
24742475 // Create test data where top-level 'level' and 'nested_record.level' have DIFFERENT values
24752476 // This helps verify we're correctly partitioning/filtering on the nested field
2476- val records = Seq (
2477+ val recordsCommit1 = Seq (
24772478 Row (" key1" , 1L , " L1" , 1 , " str1" , Row (10 , " INFO" )),
24782479 Row (" key2" , 2L , " L2" , 2 , " str2" , Row (20 , " ERROR" )),
24792480 Row (" key3" , 3L , " L3" , 3 , " str3" , Row (30 , " INFO" )),
24802481 Row (" key4" , 4L , " L4" , 4 , " str4" , Row (40 , " DEBUG" )),
24812482 Row (" key5" , 5L , " L5" , 5 , " str5" , Row (50 , " INFO" ))
24822483 )
24832484
2484- val inputDF = spark.createDataFrame(
2485- spark.sparkContext.parallelize(records),
2486- schema
2485+ val tableTypeOptVal = if (tableType == " MOR" ) {
2486+ DataSourceWriteOptions .MOR_TABLE_TYPE_OPT_VAL
2487+ } else {
2488+ DataSourceWriteOptions .COW_TABLE_TYPE_OPT_VAL
2489+ }
2490+
2491+ val baseWriteOpts = Map (
2492+ " hoodie.insert.shuffle.parallelism" -> " 4" ,
2493+ " hoodie.upsert.shuffle.parallelism" -> " 4" ,
2494+ DataSourceWriteOptions .RECORDKEY_FIELD .key -> " key" ,
2495+ DataSourceWriteOptions .PARTITIONPATH_FIELD .key -> " nested_record.level" ,
2496+ HoodieTableConfig .ORDERING_FIELDS .key -> " ts" ,
2497+ HoodieWriteConfig .TBL_NAME .key -> " test_nested_partition" ,
2498+ DataSourceWriteOptions .TABLE_TYPE .key -> tableTypeOptVal
24872499 )
2500+ val writeOpts = if (tableType == " MOR" ) {
2501+ baseWriteOpts + (" hoodie.compact.inline" -> " false" )
2502+ } else {
2503+ baseWriteOpts
2504+ }
24882505
2489- // Write to Hudi partitioned by nested_record.level
2490- inputDF.write.format(" hudi" )
2491- .option(" hoodie.insert.shuffle.parallelism" , " 4" )
2492- .option(" hoodie.upsert.shuffle.parallelism" , " 4" )
2493- .option(DataSourceWriteOptions .RECORDKEY_FIELD .key, " key" )
2494- .option(DataSourceWriteOptions .PARTITIONPATH_FIELD .key, " nested_record.level" )
2495- .option(HoodieTableConfig .ORDERING_FIELDS .key, " ts" )
2496- .option(HoodieWriteConfig .TBL_NAME .key, " test_nested_partition" )
2506+ // Commit 1 - Initial insert
2507+ val inputDF1 = spark.createDataFrame(
2508+ spark.sparkContext.parallelize(recordsCommit1),
2509+ schema
2510+ )
2511+ inputDF1.write.format(" hudi" )
2512+ .options(writeOpts)
24972513 .option(DataSourceWriteOptions .OPERATION .key, DataSourceWriteOptions .INSERT_OPERATION_OPT_VAL )
24982514 .mode(SaveMode .Overwrite )
24992515 .save(basePath)
2516+ val commit1 = DataSourceTestUtils .latestCommitCompletionTime(storage, basePath)
2517+
2518+ // Commit 2 - Upsert: update key1 (int_field 1->100), insert key6 (INFO)
2519+ val recordsCommit2 = Seq (
2520+ Row (" key1" , 10L , " L1" , 100 , " str1" , Row (10 , " INFO" )),
2521+ Row (" key6" , 6L , " L6" , 6 , " str6" , Row (60 , " INFO" ))
2522+ )
2523+ val inputDF2 = spark.createDataFrame(
2524+ spark.sparkContext.parallelize(recordsCommit2),
2525+ schema
2526+ )
2527+ inputDF2.write.format(" hudi" )
2528+ .options(writeOpts)
2529+ .option(DataSourceWriteOptions .OPERATION .key, DataSourceWriteOptions .UPSERT_OPERATION_OPT_VAL )
2530+ .mode(SaveMode .Append )
2531+ .save(basePath)
2532+ val commit2 = DataSourceTestUtils .latestCommitCompletionTime(storage, basePath)
2533+
2534+ // Commit 3 - Upsert: update key3 (int_field 3->300), insert key7 (INFO)
2535+ val recordsCommit3 = Seq (
2536+ Row (" key3" , 30L , " L3" , 300 , " str3" , Row (30 , " INFO" )),
2537+ Row (" key7" , 7L , " L7" , 7 , " str7" , Row (70 , " INFO" ))
2538+ )
2539+ val inputDF3 = spark.createDataFrame(
2540+ spark.sparkContext.parallelize(recordsCommit3),
2541+ schema
2542+ )
2543+ inputDF3.write.format(" hudi" )
2544+ .options(writeOpts)
2545+ .option(DataSourceWriteOptions .OPERATION .key, DataSourceWriteOptions .UPSERT_OPERATION_OPT_VAL )
2546+ .mode(SaveMode .Append )
2547+ .save(basePath)
2548+ val commit3 = DataSourceTestUtils .latestCommitCompletionTime(storage, basePath)
2549+
2550+ // Snapshot read - filter on nested_record.level = 'INFO' (latest state: 5 records)
2551+ val snapshotResults = spark.read.format(" hudi" )
2552+ .load(basePath)
2553+ .filter(" nested_record.level = 'INFO'" )
2554+ .select(" key" , " ts" , " level" , " int_field" , " string_field" , " nested_record" )
2555+ .orderBy(" key" )
2556+ .collect()
2557+
2558+ val expectedSnapshot = Array (
2559+ Row (" key1" , 10L , " L1" , 100 , " str1" , Row (10 , " INFO" )),
2560+ Row (" key3" , 30L , " L3" , 300 , " str3" , Row (30 , " INFO" )),
2561+ Row (" key5" , 5L , " L5" , 5 , " str5" , Row (50 , " INFO" )),
2562+ Row (" key6" , 6L , " L6" , 6 , " str6" , Row (60 , " INFO" )),
2563+ Row (" key7" , 7L , " L7" , 7 , " str7" , Row (70 , " INFO" ))
2564+ )
2565+ assertEquals(expectedSnapshot.length, snapshotResults.length,
2566+ s " Snapshot (INFO) count mismatch for $tableType" )
2567+ expectedSnapshot.zip(snapshotResults).foreach { case (expected, actual) =>
2568+ assertEquals(expected, actual)
2569+ }
25002570
2501- // Read and filter on nested_record.level = 'INFO'
2502- val results = spark.read.format(" hudi" )
2571+ // Time travel - as of commit1 (only initial 5 records; INFO = key1, key3, key5)
2572+ val timeTravelCommit1 = spark.read.format(" hudi" )
2573+ .option(DataSourceReadOptions .TIME_TRAVEL_AS_OF_INSTANT .key, commit1)
25032574 .load(basePath)
25042575 .filter(" nested_record.level = 'INFO'" )
25052576 .select(" key" , " ts" , " level" , " int_field" , " string_field" , " nested_record" )
25062577 .orderBy(" key" )
25072578 .collect()
25082579
2509- // Expected results - 3 records with nested_record.level = 'INFO'
2510- val expectedResults = Array (
2580+ val expectedAfterCommit1 = Array (
25112581 Row (" key1" , 1L , " L1" , 1 , " str1" , Row (10 , " INFO" )),
25122582 Row (" key3" , 3L , " L3" , 3 , " str3" , Row (30 , " INFO" )),
25132583 Row (" key5" , 5L , " L5" , 5 , " str5" , Row (50 , " INFO" ))
25142584 )
2585+ assertEquals(expectedAfterCommit1.length, timeTravelCommit1.length,
2586+ s " Time travel to commit1 (INFO) count mismatch for $tableType" )
2587+ expectedAfterCommit1.zip(timeTravelCommit1).foreach { case (expected, actual) =>
2588+ assertEquals(expected, actual)
2589+ }
2590+
2591+ // Time travel - as of commit2 (after 2nd commit; INFO = key1 updated, key3, key5, key6)
2592+ val timeTravelCommit2 = spark.read.format(" hudi" )
2593+ .option(DataSourceReadOptions .TIME_TRAVEL_AS_OF_INSTANT .key, commit2)
2594+ .load(basePath)
2595+ .filter(" nested_record.level = 'INFO'" )
2596+ .select(" key" , " ts" , " level" , " int_field" , " string_field" , " nested_record" )
2597+ .orderBy(" key" )
2598+ .collect()
2599+
2600+ val expectedAfterCommit2 = Array (
2601+ Row (" key1" , 10L , " L1" , 100 , " str1" , Row (10 , " INFO" )),
2602+ Row (" key3" , 3L , " L3" , 3 , " str3" , Row (30 , " INFO" )),
2603+ Row (" key5" , 5L , " L5" , 5 , " str5" , Row (50 , " INFO" )),
2604+ Row (" key6" , 6L , " L6" , 6 , " str6" , Row (60 , " INFO" ))
2605+ )
2606+ assertEquals(expectedAfterCommit2.length, timeTravelCommit2.length,
2607+ s " Time travel to commit2 (INFO) count mismatch for $tableType" )
2608+ expectedAfterCommit2.zip(timeTravelCommit2).foreach { case (expected, actual) =>
2609+ assertEquals(expected, actual)
2610+ }
2611+
2612+ // Incremental query - from commit1 to commit2 (only key1 update and key6 insert; both INFO)
2613+ val incrementalCommit1To2 = spark.read.format(" hudi" )
2614+ .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
2615+ .option(DataSourceReadOptions .START_COMMIT .key, commit1)
2616+ .option(DataSourceReadOptions .END_COMMIT .key, commit2)
2617+ .load(basePath)
2618+ .filter(" nested_record.level = 'INFO'" )
2619+ .select(" key" , " ts" , " level" , " int_field" , " string_field" , " nested_record" )
2620+ .orderBy(" key" )
2621+ .collect()
2622+
2623+ val expectedInc1To2 = Array (
2624+ Row (" key1" , 10L , " L1" , 100 , " str1" , Row (10 , " INFO" )),
2625+ Row (" key6" , 6L , " L6" , 6 , " str6" , Row (60 , " INFO" ))
2626+ )
2627+ assertEquals(expectedInc1To2.length, incrementalCommit1To2.length,
2628+ s " Incremental (commit1->commit2, INFO) count mismatch for $tableType" )
2629+ expectedInc1To2.zip(incrementalCommit1To2).foreach { case (expected, actual) =>
2630+ assertEquals(expected, actual)
2631+ }
25152632
2516- assertEquals(expectedResults.length, results.length)
2517- expectedResults.zip(results).foreach { case (expected, actual) =>
2633+ // Incremental query - from commit2 to commit3 (only key3 update and key7 insert; both INFO)
2634+ val incrementalCommit2To3 = spark.read.format(" hudi" )
2635+ .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
2636+ .option(DataSourceReadOptions .START_COMMIT .key, commit2)
2637+ .option(DataSourceReadOptions .END_COMMIT .key, commit3)
2638+ .load(basePath)
2639+ .filter(" nested_record.level = 'INFO'" )
2640+ .select(" key" , " ts" , " level" , " int_field" , " string_field" , " nested_record" )
2641+ .orderBy(" key" )
2642+ .collect()
2643+
2644+ val expectedInc2To3 = Array (
2645+ Row (" key3" , 30L , " L3" , 300 , " str3" , Row (30 , " INFO" )),
2646+ Row (" key7" , 7L , " L7" , 7 , " str7" , Row (70 , " INFO" ))
2647+ )
2648+ assertEquals(expectedInc2To3.length, incrementalCommit2To3.length,
2649+ s " Incremental (commit2->commit3, INFO) count mismatch for $tableType" )
2650+ expectedInc2To3.zip(incrementalCommit2To3).foreach { case (expected, actual) =>
25182651 assertEquals(expected, actual)
25192652 }
25202653 }
0 commit comments