Skip to content

Commit bf54c00

Browse files
committed
PLUGIN- 1883: Rework
1 parent f1e33c0 commit bf54c00

File tree

4 files changed

+50
-21
lines changed

4 files changed

+50
-21
lines changed

src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String
100100
return getParsedSchema(schema);
101101
}
102102
return getSchema(snowflakeAccessor, tableName, importQuery);
103-
} catch (SchemaParseException | IllegalArgumentException e) {
103+
} catch (SchemaParseException e) {
104104
collector.addFailure(String.format("Unable to retrieve output schema. Reason: '%s'", e.getMessage()),
105105
null)
106106
.withStacktrace(e.getStackTrace())
@@ -115,7 +115,7 @@ private static Schema getParsedSchema(String schema) {
115115
}
116116
try {
117117
return Schema.parseJson(schema);
118-
} catch (IOException | IllegalStateException e) {
118+
} catch (IOException e) {
119119
throw new SchemaParseException(e);
120120
}
121121
}
@@ -126,8 +126,10 @@ private static Schema getSchema(SnowflakeAccessor snowflakeAccessor, @Nullable S
126126
List<SnowflakeFieldDescriptor> result;
127127
if (!Strings.isNullOrEmpty(tableName)) {
128128
result = snowflakeAccessor.describeTable(snowflakeAccessor.getSchema(), tableName);
129-
} else {
129+
} else if (!Strings.isNullOrEmpty(importQuery)) {
130130
result = snowflakeAccessor.describeQuery(importQuery);
131+
} else {
132+
throw new SchemaParseException("Either 'Import Query' or 'Table Name' must be provided");
131133
}
132134
List<Schema.Field> fields = result.stream()
133135
.map(fieldDescriptor -> Schema.Field.of(fieldDescriptor.getName(), getSchema(fieldDescriptor)))
@@ -136,7 +138,7 @@ private static Schema getSchema(SnowflakeAccessor snowflakeAccessor, @Nullable S
136138
} catch (SQLException e) {
137139
throw new SchemaParseException(e);
138140
} catch (IOException e) {
139-
throw new RuntimeException(e);
141+
throw new SchemaParseException(e);
140142
}
141143
}
142144

src/main/java/io/cdap/plugin/snowflake/source/batch/ImportQueryType.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,29 @@
2121
* Enum to specify the import query type used in Snowflake Batch Source.
2222
*/
2323
public enum ImportQueryType {
24-
IMPORT_QUERY,
25-
TABLE_NAME;
24+
IMPORT_QUERY ("importQuery"),
25+
TABLE_NAME ("tableName");
26+
27+
private String value;
28+
29+
ImportQueryType(String value) {
30+
this.value = value;
31+
}
32+
33+
public String getValue() {
34+
return value;
35+
}
2636

2737
public static ImportQueryType fromString(String value) {
2838
if (value == null) {
2939
return ImportQueryType.IMPORT_QUERY;
3040
}
3141

3242
for (ImportQueryType type : ImportQueryType.values()) {
33-
if (type.name().equalsIgnoreCase(value)) {
43+
if (type.value.equalsIgnoreCase(value)) {
3444
return type;
3545
}
3646
}
37-
3847
return ImportQueryType.IMPORT_QUERY;
3948
}
4049
}

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
3636
public static final String PROPERTY_MAX_SPLIT_SIZE = "maxSplitSize";
3737
public static final String PROPERTY_SCHEMA = "schema";
3838
public static final String PROPERTY_TABLE_NAME = "tableName";
39+
public static final String PROPERTY_IMPORT_QUERY_TYPE = "importQueryType";
3940

4041
@Name(PROPERTY_REFERENCE_NAME)
4142
@Description("This will be used to uniquely identify this source/sink for lineage, annotating metadata, etc.")
@@ -65,11 +66,11 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
6566
@Nullable
6667
private final String tableName;
6768

68-
@Name("importQueryType")
69+
@Name(PROPERTY_IMPORT_QUERY_TYPE)
6970
@Description("Whether to select Table Name or Import Query to extract the data.")
7071
@Macro
7172
@Nullable
72-
private final ImportQueryType importQueryType;
73+
private final String importQueryType;
7374

7475

7576
public SnowflakeBatchSourceConfig(String referenceName, String accountName, String database,
@@ -81,7 +82,7 @@ public SnowflakeBatchSourceConfig(String referenceName, String accountName, Stri
8182
@Nullable String connectionArguments,
8283
@Nullable String schema,
8384
@Nullable String tableName,
84-
@Nullable ImportQueryType importQueryType) {
85+
@Nullable String importQueryType) {
8586
super(
8687
accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase,
8788
oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments
@@ -91,7 +92,7 @@ public SnowflakeBatchSourceConfig(String referenceName, String accountName, Stri
9192
this.maxSplitSize = maxSplitSize;
9293
this.schema = schema;
9394
this.tableName = tableName;
94-
this.importQueryType = importQueryType;
95+
this.importQueryType = getImportQueryType();
9596
}
9697

9798
public String getImportQuery() {
@@ -117,8 +118,8 @@ public String getTableName() {
117118
}
118119

119120
@Nullable
120-
public ImportQueryType getImportQueryType() {
121-
return importQueryType == null ? ImportQueryType.IMPORT_QUERY : importQueryType;
121+
public String getImportQueryType() {
122+
return importQueryType == null ? ImportQueryType.IMPORT_QUERY.name() : importQueryType;
122123
}
123124

124125
public void validate(FailureCollector collector) {
@@ -130,10 +131,27 @@ public void validate(FailureCollector collector) {
130131
.withConfigProperty(PROPERTY_MAX_SPLIT_SIZE);
131132
}
132133

133-
if (Strings.isNullOrEmpty(importQuery) && Strings.isNullOrEmpty(tableName)) {
134-
collector.addFailure("Either 'Schema' or 'Table Name' must be provided.", null)
135-
.withConfigProperty(PROPERTY_IMPORT_QUERY)
136-
.withConfigProperty(PROPERTY_TABLE_NAME);
134+
if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
135+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(importQueryType);
136+
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(importQueryType);
137+
138+
if (isImportQuerySelected && !containsMacro(PROPERTY_IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
139+
collector.addFailure("Import Query cannot be empty", null)
140+
.withConfigProperty(PROPERTY_IMPORT_QUERY);
141+
142+
} else if (isTableNameSelected && !containsMacro(PROPERTY_TABLE_NAME) && Strings.isNullOrEmpty(tableName)) {
143+
collector.addFailure("Table Name cannot be empty", null)
144+
.withConfigProperty(PROPERTY_TABLE_NAME);
145+
}
146+
} else {
147+
boolean isImportQueryMissing = !containsMacro(PROPERTY_IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery);
148+
boolean isTableNameMissing = !containsMacro(PROPERTY_TABLE_NAME) && Strings.isNullOrEmpty(tableName);
149+
150+
if (isImportQueryMissing && isTableNameMissing) {
151+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
152+
.withConfigProperty(PROPERTY_IMPORT_QUERY)
153+
.withConfigProperty(PROPERTY_TABLE_NAME);
154+
}
137155
}
138156
}
139157
}

src/test/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfigBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class SnowflakeBatchSourceConfigBuilder {
4040
"",
4141
"",
4242
"",
43-
ImportQueryType.IMPORT_QUERY);
43+
ImportQueryType.IMPORT_QUERY.name());
4444

4545
private String referenceName;
4646
private String accountName;
@@ -60,7 +60,7 @@ public class SnowflakeBatchSourceConfigBuilder {
6060
private String connectionArguments;
6161
private String schema;
6262
private String tableName;
63-
private ImportQueryType importQueryType;
63+
private String importQueryType;
6464

6565
public SnowflakeBatchSourceConfigBuilder() {
6666
}
@@ -177,7 +177,7 @@ public SnowflakeBatchSourceConfigBuilder setTableName(String tableName) {
177177
return this;
178178
}
179179

180-
public SnowflakeBatchSourceConfigBuilder setImportQueryType(ImportQueryType importQueryType) {
180+
public SnowflakeBatchSourceConfigBuilder setImportQueryType(String importQueryType) {
181181
this.importQueryType = importQueryType;
182182
return this;
183183
}

0 commit comments

Comments
 (0)