Skip to content

Commit afb8619

Browse files
authored
Merge pull request #48 from cloudsufi/feature/PLUGIN-1883
PLUGIN-1883: SnowFlake Plugin - Fetch schema using Named Table
2 parents 2dc3fe5 + 9d8bc2c commit afb8619

File tree

13 files changed

+368
-25
lines changed

13 files changed

+368
-25
lines changed

docs/Snowflake-batchsource.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ log in to Snowflake, minus the "snowflakecomputing.com"). E.g. "myaccount.us-cen
2525

2626
**Role:** Role to use (e.g. `ACCOUNTADMIN`).
2727

28-
**Import Query:** Query for data import.
28+
**Import Query Type** - Method used to retrieve schema from the source.
29+
* **Table Name**: The name of the table to retrieve the schema.
30+
* **Import Query**: Query for data import.
2931

3032
### Credentials
3133

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
3030
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3131
<!-- version properties -->
32-
<cdap.version>6.11.0-SNAPSHOT</cdap.version>
33-
<hydrator.version>2.13.0-SNAPSHOT</hydrator.version>
32+
<cdap.version>6.11.0</cdap.version>
33+
<hydrator.version>2.13.0</hydrator.version>
3434
<commons.csv.version>1.6</commons.csv.version>
3535
<hadoop.version>3.3.6</hadoop.version>
3636
<spark3.version>3.3.2</spark3.version>

src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.IOException;
3838
import java.lang.reflect.Field;
3939
import java.sql.Connection;
40+
import java.sql.DatabaseMetaData;
4041
import java.sql.PreparedStatement;
4142
import java.sql.ResultSet;
4243
import java.sql.ResultSetMetaData;
@@ -74,6 +75,37 @@ public void runSQL(String query) {
7475
}
7576
}
7677

78+
/**
79+
* Returns field descriptors for specified table name
80+
* @param schemaName The name of schema containing the table
81+
* @param tableName The name of table whose metadata needs to be retrieved
82+
* @return list of field descriptors
83+
*/
84+
public List<SnowflakeFieldDescriptor> getFieldDescriptors(String schemaName, String tableName) {
85+
List<SnowflakeFieldDescriptor> fieldDescriptors = new ArrayList<>();
86+
try (Connection connection = dataSource.getConnection()) {
87+
DatabaseMetaData dbMetaData = connection.getMetaData();
88+
try (ResultSet columns = dbMetaData.getColumns(null, schemaName, tableName, null)) {
89+
while (columns.next()) {
90+
String columnName = columns.getString("COLUMN_NAME");
91+
int columnType = columns.getInt("DATA_TYPE");
92+
boolean nullable = columns.getInt("NULLABLE") == DatabaseMetaData.columnNullable;
93+
fieldDescriptors.add(new SnowflakeFieldDescriptor(columnName, columnType, nullable));
94+
}
95+
}
96+
} catch (SQLException e) {
97+
String errorMessage = String.format(
98+
"Failed to retrieve table metadata with SQL State %s and error code %s with message: %s.",
99+
e.getSQLState(), e.getErrorCode(), e.getMessage()
100+
);
101+
String errorReason = String.format("Failed to retrieve table metadata with SQL State %s and error " +
102+
"code %s. For more details %s", e.getSQLState(), e.getErrorCode(),
103+
DocumentUrlUtil.getSupportedDocumentUrl());
104+
throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
105+
}
106+
return fieldDescriptors;
107+
}
108+
77109
/**
78110
* Returns field descriptors for specified import query.
79111
*
@@ -193,4 +225,13 @@ private static String writeTextToTmpFile(String text) {
193225
throw new RuntimeException("Cannot write key to temporary file", e);
194226
}
195227
}
228+
229+
/**
230+
* Retrieves schema name from the configuration
231+
*
232+
* @return The schema name
233+
*/
234+
public String getSchema() {
235+
return config.getSchemaName();
236+
}
196237
}

src/main/java/io/cdap/plugin/snowflake/common/util/QueryUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.plugin.snowflake.common.util;
1818

19+
import com.google.common.base.Strings;
20+
1921
/**
2022
* Transforms import query.
2123
*/
@@ -29,6 +31,9 @@ private QueryUtil() {
2931
}
3032

3133
public static String removeSemicolon(String importQuery) {
34+
if (Strings.isNullOrEmpty(importQuery)) {
35+
return null;
36+
}
3237
if (importQuery.endsWith(";")) {
3338
importQuery = importQuery.substring(0, importQuery.length() - 1);
3439
}

src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
2828
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
2929
import java.io.IOException;
30+
import java.sql.SQLException;
3031
import java.sql.Types;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Objects;
3435
import java.util.stream.Collectors;
36+
import javax.annotation.Nullable;
3537

3638
/**
3739
* Resolves schema.
@@ -58,23 +60,46 @@ public class SchemaHelper {
5860
private SchemaHelper() {
5961
}
6062

63+
/**
64+
* Retrieves schema for the Snowflake batch source based on the given configuration.
65+
*
66+
* @param config The configuration for Snowflake batch source
67+
* @param collector The failure collector to capture any schema retrieval errors.
68+
* @return The resolved schema for Snowflake source
69+
*/
6170
public static Schema getSchema(SnowflakeBatchSourceConfig config, FailureCollector collector) {
6271
if (!config.canConnect()) {
6372
return getParsedSchema(config.getSchema());
6473
}
6574

6675
SnowflakeSourceAccessor snowflakeSourceAccessor =
6776
new SnowflakeSourceAccessor(config, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
68-
return getSchema(snowflakeSourceAccessor, config.getSchema(), collector, config.getImportQuery());
77+
return getSchema(
78+
snowflakeSourceAccessor,
79+
config.getSchema(),
80+
collector,
81+
config.getTableName(),
82+
config.getImportQuery()
83+
);
6984
}
7085

86+
/**
87+
* Retrieves schema for a Snowflake source based on the provided parameters.
88+
*
89+
* @param snowflakeAccessor The {@link SnowflakeSourceAccessor} used to connect to Snowflake.
90+
* @param schema A JSON-format schema string
91+
* @param collector The {@link FailureCollector} to collect errors if schema retrieval fails.
92+
* @param tableName The name of the table in Snowflake.
93+
* @param importQuery The query to fetch data from Snowflake, used when `tableName` is not provided.
94+
* @return The parsed {@link Schema} if successful, or {@code null} if an error occurs.
95+
*/
7196
public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String schema,
72-
FailureCollector collector, String importQuery) {
97+
FailureCollector collector, String tableName, String importQuery) {
7398
try {
7499
if (!Strings.isNullOrEmpty(schema)) {
75100
return getParsedSchema(schema);
76101
}
77-
return Strings.isNullOrEmpty(importQuery) ? null : getSchema(snowflakeAccessor, importQuery);
102+
return getSchema(snowflakeAccessor, tableName, importQuery);
78103
} catch (SchemaParseException e) {
79104
collector.addFailure(String.format("Unable to retrieve output schema. Reason: '%s'", e.getMessage()),
80105
null)
@@ -84,7 +109,7 @@ public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String
84109
}
85110
}
86111

87-
private static Schema getParsedSchema(String schema) {
112+
public static Schema getParsedSchema(String schema) {
88113
if (Strings.isNullOrEmpty(schema)) {
89114
return null;
90115
}
@@ -95,6 +120,26 @@ private static Schema getParsedSchema(String schema) {
95120
}
96121
}
97122

123+
private static Schema getSchema(SnowflakeAccessor snowflakeAccessor, @Nullable String tableName,
124+
@Nullable String importQuery) {
125+
try {
126+
List<SnowflakeFieldDescriptor> result;
127+
if (!Strings.isNullOrEmpty(tableName)) {
128+
result = snowflakeAccessor.getFieldDescriptors(snowflakeAccessor.getSchema(), tableName);
129+
} else if (!Strings.isNullOrEmpty(importQuery)) {
130+
result = snowflakeAccessor.describeQuery(importQuery);
131+
} else {
132+
return null;
133+
}
134+
List<Schema.Field> fields = result.stream()
135+
.map(fieldDescriptor -> Schema.Field.of(fieldDescriptor.getName(), getSchema(fieldDescriptor)))
136+
.collect(Collectors.toList());
137+
return Schema.recordOf("data", fields);
138+
} catch (IOException e) {
139+
throw new SchemaParseException(e);
140+
}
141+
}
142+
98143
public static Schema getSchema(SnowflakeAccessor snowflakeAccessor, String importQuery) {
99144
try {
100145
List<SnowflakeFieldDescriptor> result = snowflakeAccessor.describeQuery(importQuery);
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.snowflake.source.batch;
18+
19+
20+
/**
21+
* Enum to specify the import query type used in Snowflake Batch Source.
22+
*/
23+
public enum ImportQueryType {
24+
IMPORT_QUERY ("importQuery"),
25+
TABLE_NAME ("tableName");
26+
27+
private String value;
28+
29+
ImportQueryType(String value) {
30+
this.value = value;
31+
}
32+
33+
public String getValue() {
34+
return value;
35+
}
36+
37+
public static ImportQueryType fromString(String value) {
38+
if (value == null) {
39+
return ImportQueryType.IMPORT_QUERY;
40+
}
41+
42+
for (ImportQueryType type : ImportQueryType.values()) {
43+
if (type.value.equalsIgnoreCase(value)) {
44+
return type;
45+
}
46+
}
47+
return ImportQueryType.IMPORT_QUERY;
48+
}
49+
}

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.snowflake.source.batch;
1818

19+
import com.google.common.base.Strings;
1920
import io.cdap.cdap.api.annotation.Description;
2021
import io.cdap.cdap.api.annotation.Macro;
2122
import io.cdap.cdap.api.annotation.Name;
@@ -34,6 +35,8 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
3435
public static final String PROPERTY_IMPORT_QUERY = "importQuery";
3536
public static final String PROPERTY_MAX_SPLIT_SIZE = "maxSplitSize";
3637
public static final String PROPERTY_SCHEMA = "schema";
38+
public static final String PROPERTY_TABLE_NAME = "tableName";
39+
public static final String PROPERTY_IMPORT_QUERY_TYPE = "importQueryType";
3740

3841
@Name(PROPERTY_REFERENCE_NAME)
3942
@Description("This will be used to uniquely identify this source/sink for lineage, annotating metadata, etc.")
@@ -42,6 +45,7 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
4245
@Name(PROPERTY_IMPORT_QUERY)
4346
@Description("Query for import data.")
4447
@Macro
48+
@Nullable
4549
private String importQuery;
4650

4751
@Name(PROPERTY_MAX_SPLIT_SIZE)
@@ -55,19 +59,40 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
5559
@Macro
5660
private String schema;
5761

62+
63+
@Name(PROPERTY_TABLE_NAME)
64+
@Description("The name of the table used to retrieve the schema.")
65+
@Macro
66+
@Nullable
67+
private final String tableName;
68+
69+
@Name(PROPERTY_IMPORT_QUERY_TYPE)
70+
@Description("Whether to select Table Name or Import Query to extract the data.")
71+
@Macro
72+
@Nullable
73+
private final String importQueryType;
74+
75+
5876
public SnowflakeBatchSourceConfig(String referenceName, String accountName, String database,
59-
String schemaName, String importQuery, String username, String password,
77+
String schemaName, @Nullable String importQuery, String username, String password,
6078
@Nullable Boolean keyPairEnabled, @Nullable String path,
6179
@Nullable String passphrase, @Nullable Boolean oauth2Enabled,
6280
@Nullable String clientId, @Nullable String clientSecret,
6381
@Nullable String refreshToken, Long maxSplitSize,
64-
@Nullable String connectionArguments, @Nullable String schema) {
65-
super(accountName, database, schemaName, username, password,
66-
keyPairEnabled, path, passphrase, oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments);
82+
@Nullable String connectionArguments,
83+
@Nullable String schema,
84+
@Nullable String tableName,
85+
@Nullable String importQueryType) {
86+
super(
87+
accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase,
88+
oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments
89+
);
6790
this.referenceName = referenceName;
6891
this.importQuery = importQuery;
6992
this.maxSplitSize = maxSplitSize;
7093
this.schema = schema;
94+
this.tableName = tableName;
95+
this.importQueryType = getImportQueryType();
7196
}
7297

7398
public String getImportQuery() {
@@ -87,6 +112,16 @@ public String getSchema() {
87112
return schema;
88113
}
89114

115+
@Nullable
116+
public String getTableName() {
117+
return tableName;
118+
}
119+
120+
@Nullable
121+
public String getImportQueryType() {
122+
return importQueryType == null ? ImportQueryType.IMPORT_QUERY.name() : importQueryType;
123+
}
124+
90125
public void validate(FailureCollector collector) {
91126
super.validate(collector);
92127

@@ -95,5 +130,28 @@ public void validate(FailureCollector collector) {
95130
collector.addFailure("Maximum Slit Size cannot be a negative number.", null)
96131
.withConfigProperty(PROPERTY_MAX_SPLIT_SIZE);
97132
}
133+
134+
if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
135+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(importQueryType);
136+
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(importQueryType);
137+
138+
if (isImportQuerySelected && !containsMacro(PROPERTY_IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
139+
collector.addFailure("Import Query cannot be empty", null)
140+
.withConfigProperty(PROPERTY_IMPORT_QUERY);
141+
142+
} else if (isTableNameSelected && !containsMacro(PROPERTY_TABLE_NAME) && Strings.isNullOrEmpty(tableName)) {
143+
collector.addFailure("Table Name cannot be empty", null)
144+
.withConfigProperty(PROPERTY_TABLE_NAME);
145+
}
146+
} else {
147+
boolean isImportQueryMissing = !containsMacro(PROPERTY_IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery);
148+
boolean isTableNameMissing = !containsMacro(PROPERTY_TABLE_NAME) && Strings.isNullOrEmpty(tableName);
149+
150+
if (isImportQueryMissing && isTableNameMissing) {
151+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
152+
.withConfigProperty(PROPERTY_IMPORT_QUERY)
153+
.withConfigProperty(PROPERTY_TABLE_NAME);
154+
}
155+
}
98156
}
99157
}

0 commit comments

Comments
 (0)