|
74 | 74 | import java.util.function.BiConsumer; |
75 | 75 | import java.util.function.Consumer; |
76 | 76 | import java.util.stream.LongStream; |
| 77 | +import java.util.stream.Stream; |
77 | 78 |
|
78 | 79 | import static com.facebook.airlift.json.JsonCodec.jsonCodec; |
79 | 80 | import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN; |
|
106 | 107 | import static com.facebook.presto.hive.HiveQueryRunner.TPCH_SCHEMA; |
107 | 108 | import static com.facebook.presto.hive.HiveQueryRunner.createBucketedSession; |
108 | 109 | import static com.facebook.presto.hive.HiveQueryRunner.createMaterializeExchangesSession; |
| 110 | +import static com.facebook.presto.hive.HiveSessionProperties.COMPRESSION_CODEC; |
109 | 111 | import static com.facebook.presto.hive.HiveSessionProperties.FILE_RENAMING_ENABLED; |
110 | 112 | import static com.facebook.presto.hive.HiveSessionProperties.MANIFEST_VERIFICATION_ENABLED; |
111 | 113 | import static com.facebook.presto.hive.HiveSessionProperties.OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED; |
|
159 | 161 | import static io.airlift.tpch.TpchTable.PART_SUPPLIER; |
160 | 162 | import static java.lang.String.format; |
161 | 163 | import static java.nio.charset.StandardCharsets.UTF_8; |
| 164 | +import static java.util.Locale.ROOT; |
162 | 165 | import static java.util.Objects.requireNonNull; |
163 | 166 | import static java.util.stream.Collectors.joining; |
164 | 167 | import static org.assertj.core.api.Assertions.assertThat; |
@@ -5267,17 +5270,6 @@ public void testPageFileFormatSmallSplitSize() |
5267 | 5270 | assertUpdate("DROP TABLE test_pagefile_small_split"); |
5268 | 5271 | } |
5269 | 5272 |
|
5270 | | - @Test |
5271 | | - public void testPageFileCompression() |
5272 | | - { |
5273 | | - for (HiveCompressionCodec compression : HiveCompressionCodec.values()) { |
5274 | | - if (!compression.isSupportedStorageFormat(PAGEFILE)) { |
5275 | | - continue; |
5276 | | - } |
5277 | | - testPageFileCompression(compression.name()); |
5278 | | - } |
5279 | | - } |
5280 | | - |
5281 | 5273 | @Test |
5282 | 5274 | public void testPartialAggregatePushdownORC() |
5283 | 5275 | { |
@@ -5703,31 +5695,35 @@ public void testParquetSelectivePageSourceFails() |
5703 | 5695 | assertQueryFails(parquetFilterPushdownSession, "SELECT a FROM test_parquet_filter_pushdoown WHERE b = false", "Parquet reader doesn't support filter pushdown yet"); |
5704 | 5696 | } |
5705 | 5697 |
|
5706 | | - private void testPageFileCompression(String compression) |
| 5698 | + @DataProvider(name = "testFormatAndCompressionCodecs") |
| 5699 | + public Object[][] compressionCodecs() |
5707 | 5700 | { |
5708 | | - Session testSession = Session.builder(getQueryRunner().getDefaultSession()) |
5709 | | - .setCatalogSessionProperty(catalog, "compression_codec", compression) |
5710 | | - .setCatalogSessionProperty(catalog, "pagefile_writer_max_stripe_size", "100B") |
5711 | | - .setCatalogSessionProperty(catalog, "max_split_size", "1kB") |
5712 | | - .setCatalogSessionProperty(catalog, "max_initial_split_size", "1kB") |
5713 | | - .build(); |
5714 | | - |
5715 | | - assertUpdate( |
5716 | | - testSession, |
5717 | | - "CREATE TABLE test_pagefile_compression\n" + |
5718 | | - "WITH (\n" + |
5719 | | - "format = 'PAGEFILE'\n" + |
5720 | | - ") AS\n" + |
5721 | | - "SELECT\n" + |
5722 | | - "*\n" + |
5723 | | - "FROM tpch.orders", |
5724 | | - "SELECT count(*) FROM orders"); |
5725 | | - |
5726 | | - assertQuery(testSession, "SELECT count(*) FROM test_pagefile_compression", "SELECT count(*) FROM orders"); |
5727 | | - |
5728 | | - assertQuery(testSession, "SELECT sum(custkey) FROM test_pagefile_compression", "SELECT sum(custkey) FROM orders"); |
| 5701 | + return Stream.of(PARQUET, ORC, PAGEFILE) |
| 5702 | + .flatMap(format -> Arrays.stream(HiveCompressionCodec.values()) |
| 5703 | + .map(codec -> new Object[] {codec, format})) |
| 5704 | + .toArray(Object[][]::new); |
| 5705 | + } |
5729 | 5706 |
|
5730 | | - assertUpdate("DROP TABLE test_pagefile_compression"); |
| 5707 | + @Test(dataProvider = "testFormatAndCompressionCodecs") |
| 5708 | + public void testFormatAndCompressionCodecs(HiveCompressionCodec codec, HiveStorageFormat format) |
| 5709 | + { |
| 5710 | + String tableName = "test_" + format.name().toLowerCase(ROOT) + "_compression_codec_" + codec.name().toLowerCase(ROOT); |
| 5711 | + Session session = Session.builder(getSession()) |
| 5712 | + .setCatalogSessionProperty("hive", COMPRESSION_CODEC, codec.name()).build(); |
| 5713 | + if (codec.isSupportedStorageFormat(format == PARQUET ? HiveStorageFormat.PARQUET : HiveStorageFormat.ORC)) { |
| 5714 | + assertUpdate(session, |
| 5715 | + format("CREATE TABLE %s WITH (format = '%s') AS SELECT * FROM orders", |
| 5716 | + tableName, format.name()), |
| 5717 | + "SELECT count(*) FROM orders"); |
| 5718 | + assertQuery(format("SELECT count(*) FROM %s", tableName), "SELECT count(*) FROM orders"); |
| 5719 | + assertQuery(format("SELECT sum(custkey) FROM %s", tableName), "SELECT sum(custkey) FROM orders"); |
| 5720 | + assertQuerySucceeds(format("DROP TABLE %s", tableName)); |
| 5721 | + } |
| 5722 | + else { |
| 5723 | + assertQueryFails(session, format("CREATE TABLE %s WITH (format = '%s') AS SELECT * FROM orders", |
| 5724 | + tableName, format.name()), |
| 5725 | + format("%s compression is not supported with %s", codec, format)); |
| 5726 | + } |
5731 | 5727 | } |
5732 | 5728 |
|
5733 | 5729 | private static Consumer<Plan> assertTableWriterMergeNodeIsPresent() |
|
0 commit comments