Skip to content

Commit 3e41e72

Browse files
authored
[FLINK-38365][table] Make invalid operations with MATERIALIZED TABLEs returning more user friendly message
1 parent 2e7cac1 commit 3e41e72

File tree

6 files changed

+86
-16
lines changed

6 files changed

+86
-16
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.table.api.TableException;
2424
import org.apache.flink.table.api.internal.TableResultInternal;
25-
import org.apache.flink.table.catalog.CatalogBaseTable;
2625
import org.apache.flink.table.catalog.CatalogManager;
2726
import org.apache.flink.table.catalog.ContextResolvedTable;
2827
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -54,10 +53,6 @@ public TableResultInternal execute(Context ctx) {
5453

5554
CatalogManager catalogManager = ctx.getCatalogManager();
5655
ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(tableIdentifier);
57-
CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
58-
if (catalogBaseTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
59-
throw new TableException("TRUNCATE TABLE statement is not supported for view.");
60-
}
6156

6257
ResolvedCatalogTable resolvedTable = contextResolvedTable.getResolvedTable();
6358
ObjectIdentifier objectIdentifier = contextResolvedTable.getIdentifier();
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.operations.utils;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.api.ValidationException;
23+
import org.apache.flink.table.catalog.CatalogBaseTable;
24+
25+
import java.util.Locale;
26+
27+
/** Utility class for validation of operations. */
28+
@Internal
29+
public final class ValidationUtils {
30+
31+
private ValidationUtils() {}
32+
33+
public static void validateTableKind(
34+
CatalogBaseTable baseTable, CatalogBaseTable.TableKind expected, String operationName) {
35+
final CatalogBaseTable.TableKind kind = baseTable.getTableKind();
36+
if (kind == expected) {
37+
return;
38+
}
39+
40+
throw new ValidationException(
41+
String.format(
42+
"%s for a %s is not allowed",
43+
operationName.toUpperCase(Locale.ROOT),
44+
kind.name().toLowerCase(Locale.ROOT).replace('_', ' ')));
45+
}
46+
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.flink.table.api.ValidationException;
8080
import org.apache.flink.table.catalog.Catalog;
8181
import org.apache.flink.table.catalog.CatalogBaseTable;
82+
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
8283
import org.apache.flink.table.catalog.CatalogDatabase;
8384
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
8485
import org.apache.flink.table.catalog.CatalogFunction;
@@ -88,7 +89,6 @@
8889
import org.apache.flink.table.catalog.CatalogPartitionImpl;
8990
import org.apache.flink.table.catalog.CatalogPartitionSpec;
9091
import org.apache.flink.table.catalog.CatalogTable;
91-
import org.apache.flink.table.catalog.CatalogView;
9292
import org.apache.flink.table.catalog.Column;
9393
import org.apache.flink.table.catalog.ContextResolvedTable;
9494
import org.apache.flink.table.catalog.FunctionLanguage;
@@ -161,6 +161,7 @@
161161
import org.apache.flink.table.operations.ddl.DropViewOperation;
162162
import org.apache.flink.table.operations.utils.LikeType;
163163
import org.apache.flink.table.operations.utils.ShowLikeOperator;
164+
import org.apache.flink.table.operations.utils.ValidationUtils;
164165
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
165166
import org.apache.flink.table.planner.hint.FlinkHints;
166167
import org.apache.flink.table.planner.operations.converters.SqlNodeConverters;
@@ -419,9 +420,8 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
419420
"Table %s doesn't exist or is a temporary table.", tableIdentifier));
420421
}
421422
CatalogBaseTable baseTable = optionalCatalogTable.get().getResolvedTable();
422-
if (baseTable instanceof CatalogView) {
423-
throw new ValidationException("ALTER TABLE for a view is not allowed");
424-
}
423+
ValidationUtils.validateTableKind(baseTable, TableKind.TABLE, "alter table");
424+
425425
ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) baseTable;
426426
if (sqlAlterTable instanceof SqlAlterTableRename) {
427427
UnresolvedIdentifier newUnresolvedIdentifier =
@@ -1059,9 +1059,8 @@ private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
10591059
"Table %s doesn't exist or is a temporary table.", tableIdentifier));
10601060
}
10611061
CatalogBaseTable baseTable = optionalCatalogTable.get().getResolvedTable();
1062-
if (baseTable instanceof CatalogView) {
1063-
throw new ValidationException("ANALYZE TABLE for a view is not allowed.");
1064-
}
1062+
ValidationUtils.validateTableKind(baseTable, TableKind.TABLE, "analyze table");
1063+
10651064
CatalogTable table = (CatalogTable) baseTable;
10661065
ResolvedSchema schema =
10671066
baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
import org.apache.flink.table.api.Schema;
2424
import org.apache.flink.table.api.ValidationException;
2525
import org.apache.flink.table.catalog.CatalogBaseTable;
26-
import org.apache.flink.table.catalog.CatalogTable;
26+
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
2727
import org.apache.flink.table.catalog.CatalogView;
2828
import org.apache.flink.table.catalog.ContextResolvedTable;
2929
import org.apache.flink.table.catalog.ObjectIdentifier;
3030
import org.apache.flink.table.catalog.ResolvedCatalogView;
3131
import org.apache.flink.table.catalog.ResolvedSchema;
3232
import org.apache.flink.table.catalog.UnresolvedIdentifier;
33+
import org.apache.flink.table.operations.utils.ValidationUtils;
3334
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
3435
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
3536

@@ -138,9 +139,8 @@ static CatalogView validateAlterView(SqlAlterView alterView, ConvertContext cont
138139
}
139140
// check the view is exactly a view
140141
CatalogBaseTable baseTable = optionalCatalogTable.get().getResolvedTable();
141-
if (baseTable instanceof CatalogTable) {
142-
throw new ValidationException("ALTER VIEW for a table is not allowed");
143-
}
142+
ValidationUtils.validateTableKind(baseTable, TableKind.VIEW, "alter view");
143+
144144
return (CatalogView) baseTable;
145145
}
146146

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package org.apache.flink.table.planner.operations.converters;
2020

2121
import org.apache.flink.sql.parser.dml.SqlTruncateTable;
22+
import org.apache.flink.table.catalog.CatalogBaseTable;
23+
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
2224
import org.apache.flink.table.catalog.CatalogManager;
25+
import org.apache.flink.table.catalog.ContextResolvedTable;
2326
import org.apache.flink.table.catalog.ObjectIdentifier;
2427
import org.apache.flink.table.catalog.UnresolvedIdentifier;
2528
import org.apache.flink.table.operations.Operation;
2629
import org.apache.flink.table.operations.TruncateTableOperation;
30+
import org.apache.flink.table.operations.utils.ValidationUtils;
2731

2832
/** A converter for {@link SqlTruncateTable}. */
2933
public class SqlTruncateTableConverter implements SqlNodeConverter<SqlTruncateTable> {
@@ -34,6 +38,11 @@ public Operation convertSqlNode(SqlTruncateTable sqlTruncateTable, ConvertContex
3438
UnresolvedIdentifier.of(sqlTruncateTable.fullTableName());
3539
CatalogManager catalogManager = context.getCatalogManager();
3640
ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
41+
42+
ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(tableIdentifier);
43+
CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
44+
ValidationUtils.validateTableKind(catalogBaseTable, TableKind.TABLE, "truncate table");
45+
3746
return new TruncateTableOperation(tableIdentifier);
3847
}
3948
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2625,6 +2625,27 @@ void testCreateViewWithDuplicateFieldName() {
26252625
"A column with the same name `id` has been defined at line 5, column 8."));
26262626
}
26272627

2628+
@Test
2629+
void testAlterInvalidOperationsForMaterializedTables() throws Exception {
2630+
prepareMaterializedTable("my_materialized_table", false, 1, null, "SELECT 1");
2631+
2632+
assertThatThrownBy(() -> parse("alter table my_materialized_table RENAME to new_name"))
2633+
.isInstanceOf(ValidationException.class)
2634+
.hasMessage("ALTER TABLE for a materialized table is not allowed");
2635+
2636+
assertThatThrownBy(() -> parse("analyze table my_materialized_table compute statistics"))
2637+
.isInstanceOf(ValidationException.class)
2638+
.hasMessage("ANALYZE TABLE for a materialized table is not allowed");
2639+
2640+
assertThatThrownBy(() -> parse("alter view my_materialized_table RENAME to new_name"))
2641+
.isInstanceOf(ValidationException.class)
2642+
.hasMessage("ALTER VIEW for a materialized table is not allowed");
2643+
2644+
assertThatThrownBy(() -> parse("truncate table my_materialized_table"))
2645+
.isInstanceOf(ValidationException.class)
2646+
.hasMessage("TRUNCATE TABLE for a materialized table is not allowed");
2647+
}
2648+
26282649
// ~ Tool Methods ----------------------------------------------------------
26292650

26302651
private static TestItem createTestItem(Object... args) {

0 commit comments

Comments
 (0)