Skip to content

Commit fa1d3a8

Browse files
committed
PLUGIN-1883: Applied patch for Adding Named Table option to fetch schema
1 parent 2dc3fe5 commit fa1d3a8

File tree

14 files changed

+322
-25
lines changed

14 files changed

+322
-25
lines changed

docs/Snowflake-batchsource.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ log in to Snowflake, minus the "snowflakecomputing.com"). E.g. "myaccount.us-cen
2525

2626
**Role:** Role to use (e.g. `ACCOUNTADMIN`).
2727

28-
**Import Query:** Query for data import.
28+
**Import Query Type** - Method used to retrieve schema from the source.
29+
* **Table Name**: The name of the table to retrieve the schema.
30+
* **Import Query**: Query for data import.
2931

3032
### Credentials
3133

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
3030
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3131
<!-- version properties -->
32-
<cdap.version>6.11.0-SNAPSHOT</cdap.version>
33-
<hydrator.version>2.13.0-SNAPSHOT</hydrator.version>
32+
<cdap.version>6.11.0</cdap.version>
33+
<hydrator.version>2.13.0</hydrator.version>
3434
<commons.csv.version>1.6</commons.csv.version>
3535
<hadoop.version>3.3.6</hadoop.version>
3636
<spark3.version>3.3.2</spark3.version>

src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.IOException;
3838
import java.lang.reflect.Field;
3939
import java.sql.Connection;
40+
import java.sql.DatabaseMetaData;
4041
import java.sql.PreparedStatement;
4142
import java.sql.ResultSet;
4243
import java.sql.ResultSetMetaData;
@@ -74,6 +75,38 @@ public void runSQL(String query) {
7475
}
7576
}
7677

78+
/**
79+
* Returns field descriptors for specified table name
80+
* @param schemaName The name of schema containing the table
81+
* @param tableName The name of table whose metadata needs to be retrieved
82+
* @return list of field descriptors
83+
* @throws SQLException If an error occurs while retrieving metadata from the database
84+
*/
85+
public List<SnowflakeFieldDescriptor> describeTable(String schemaName, String tableName) throws SQLException {
86+
List<SnowflakeFieldDescriptor> fieldDescriptors = new ArrayList<>();
87+
try (Connection connection = dataSource.getConnection()) {
88+
DatabaseMetaData dbMetaData = connection.getMetaData();
89+
try (ResultSet columns = dbMetaData.getColumns(null, schemaName, tableName, null)) {
90+
while (columns.next()) {
91+
String columnName = columns.getString("COLUMN_NAME");
92+
int columnType = columns.getInt("DATA_TYPE");
93+
boolean nullable = columns.getInt("NULLABLE") == DatabaseMetaData.columnNullable;
94+
fieldDescriptors.add(new SnowflakeFieldDescriptor(columnName, columnType, nullable));
95+
}
96+
}
97+
} catch (SQLException e) {
98+
String errorMessage = String.format(
99+
"Failed to retrieve table metadata with SQL State %s and error code %s with message: %s.",
100+
e.getSQLState(), e.getErrorCode(), e.getMessage()
101+
);
102+
String errorReason = String.format("Failed to retrieve table metadata with SQL State %s and error " +
103+
"code %s. For more details %s", e.getSQLState(), e.getErrorCode(),
104+
DocumentUrlUtil.getSupportedDocumentUrl());
105+
throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
106+
}
107+
return fieldDescriptors;
108+
}
109+
77110
/**
78111
* Returns field descriptors for specified import query.
79112
*
@@ -193,4 +226,13 @@ private static String writeTextToTmpFile(String text) {
193226
throw new RuntimeException("Cannot write key to temporary file", e);
194227
}
195228
}
229+
230+
/**
231+
* Retrieves schema name from the configuration
232+
*
233+
* @return The schema name
234+
*/
235+
public String getSchema() {
236+
return config.getSchemaName();
237+
}
196238
}

src/main/java/io/cdap/plugin/snowflake/common/util/QueryUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.plugin.snowflake.common.util;
1818

19+
import com.google.common.base.Strings;
20+
1921
/**
2022
* Transforms import query.
2123
*/
@@ -29,6 +31,9 @@ private QueryUtil() {
2931
}
3032

3133
public static String removeSemicolon(String importQuery) {
34+
if (Strings.isNullOrEmpty(importQuery)) {
35+
return null;
36+
}
3237
if (importQuery.endsWith(";")) {
3338
importQuery = importQuery.substring(0, importQuery.length() - 1);
3439
}

src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
2828
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
2929
import java.io.IOException;
30+
import java.sql.SQLException;
3031
import java.sql.Types;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Objects;
3435
import java.util.stream.Collectors;
36+
import javax.annotation.Nullable;
3537

3638
/**
3739
* Resolves schema.
@@ -58,24 +60,47 @@ public class SchemaHelper {
5860
private SchemaHelper() {
5961
}
6062

63+
/**
64+
* Retrieves schema for the Snowflake batch source based on the given configuration.
65+
*
66+
* @param config The configuration for Snowflake batch source
67+
* @param collector The failure collector to capture any schema retrieval errors.
68+
* @return The resolved schema for Snowflake source
69+
*/
6170
public static Schema getSchema(SnowflakeBatchSourceConfig config, FailureCollector collector) {
6271
if (!config.canConnect()) {
6372
return getParsedSchema(config.getSchema());
6473
}
6574

6675
SnowflakeSourceAccessor snowflakeSourceAccessor =
6776
new SnowflakeSourceAccessor(config, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
68-
return getSchema(snowflakeSourceAccessor, config.getSchema(), collector, config.getImportQuery());
77+
return getSchema(
78+
snowflakeSourceAccessor,
79+
config.getSchema(),
80+
collector,
81+
config.getTableName(),
82+
config.getImportQuery()
83+
);
6984
}
7085

86+
/**
87+
* Retrieves schema for a Snowflake source based on the provided parameters.
88+
*
89+
* @param snowflakeAccessor The {@link SnowflakeSourceAccessor} used to connect to Snowflake.
90+
* @param schema A JSON-format schema string
91+
* @param collector The {@link FailureCollector} to collect errors if schema retrieval fails.
92+
* @param tableName The name of the table in Snowflake.
93+
* @param importQuery The query to fetch data from Snowflake, used when `tableName` is not provided.
94+
* @return The parsed {@link Schema} if successful, or {@code null} if an error occurs.
95+
*/
7196
public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String schema,
72-
FailureCollector collector, String importQuery) {
97+
FailureCollector collector, String tableName, String importQuery) {
7398
try {
7499
if (!Strings.isNullOrEmpty(schema)) {
75100
return getParsedSchema(schema);
76101
}
77-
return Strings.isNullOrEmpty(importQuery) ? null : getSchema(snowflakeAccessor, importQuery);
78-
} catch (SchemaParseException e) {
102+
return getSchema(snowflakeAccessor, tableName, importQuery);
103+
} catch (SchemaParseException | IllegalArgumentException e) {
79104
collector.addFailure(String.format("Unable to retrieve output schema. Reason: '%s'", e.getMessage()),
80105
null)
81106
.withStacktrace(e.getStackTrace())
@@ -95,6 +120,26 @@ private static Schema getParsedSchema(String schema) {
95120
}
96121
}
97122

123+
private static Schema getSchema(SnowflakeAccessor snowflakeAccessor, @Nullable String tableName,
124+
@Nullable String importQuery) {
125+
try {
126+
List<SnowflakeFieldDescriptor> result;
127+
if (!Strings.isNullOrEmpty(tableName)) {
128+
result = snowflakeAccessor.describeTable(snowflakeAccessor.getSchema(), tableName);
129+
} else {
130+
result = snowflakeAccessor.describeQuery(importQuery);
131+
}
132+
List<Schema.Field> fields = result.stream()
133+
.map(fieldDescriptor -> Schema.Field.of(fieldDescriptor.getName(), getSchema(fieldDescriptor)))
134+
.collect(Collectors.toList());
135+
return Schema.recordOf("data", fields);
136+
} catch (SQLException e) {
137+
throw new SchemaParseException(e);
138+
} catch (IOException e) {
139+
throw new RuntimeException(e);
140+
}
141+
}
142+
98143
public static Schema getSchema(SnowflakeAccessor snowflakeAccessor, String importQuery) {
99144
try {
100145
List<SnowflakeFieldDescriptor> result = snowflakeAccessor.describeQuery(importQuery);

src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.cdap.plugin.snowflake.common.BaseSnowflakeConfig;
2424
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
2525
import io.cdap.plugin.snowflake.common.util.SchemaHelper;
26-
2726
import javax.annotation.Nullable;
2827

2928
/**
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.snowflake.source.batch;
18+
19+
20+
/**
21+
* Enum to specify the import query type used in Snowflake Batch Source.
22+
*/
23+
public enum ImportQueryType {
24+
IMPORT_QUERY,
25+
TABLE_NAME;
26+
27+
public static ImportQueryType fromString(String value) {
28+
for (ImportQueryType type : ImportQueryType.values()) {
29+
if (type.name().equalsIgnoreCase(value)) {
30+
return type;
31+
}
32+
}
33+
throw new IllegalArgumentException(
34+
String.format("Unsupported importQueryType '%s'. Supported types are: importQuery, tableName.", value));
35+
}
36+
}

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package io.cdap.plugin.snowflake.source.batch;
1818

19+
import com.google.common.base.Strings;
1920
import io.cdap.cdap.api.annotation.Description;
2021
import io.cdap.cdap.api.annotation.Macro;
2122
import io.cdap.cdap.api.annotation.Name;
2223
import io.cdap.cdap.etl.api.FailureCollector;
2324
import io.cdap.plugin.snowflake.common.BaseSnowflakeConfig;
24-
2525
import java.util.Objects;
2626
import javax.annotation.Nullable;
2727

@@ -34,6 +34,7 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
3434
public static final String PROPERTY_IMPORT_QUERY = "importQuery";
3535
public static final String PROPERTY_MAX_SPLIT_SIZE = "maxSplitSize";
3636
public static final String PROPERTY_SCHEMA = "schema";
37+
public static final String PROPERTY_TABLE_NAME = "tableName";
3738

3839
@Name(PROPERTY_REFERENCE_NAME)
3940
@Description("This will be used to uniquely identify this source/sink for lineage, annotating metadata, etc.")
@@ -42,6 +43,7 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
4243
@Name(PROPERTY_IMPORT_QUERY)
4344
@Description("Query for import data.")
4445
@Macro
46+
@Nullable
4547
private String importQuery;
4648

4749
@Name(PROPERTY_MAX_SPLIT_SIZE)
@@ -55,19 +57,39 @@ public class SnowflakeBatchSourceConfig extends BaseSnowflakeConfig {
5557
@Macro
5658
private String schema;
5759

60+
61+
@Name(PROPERTY_TABLE_NAME)
62+
@Nullable
63+
@Description("The name of the table used to retrieve the schema.")
64+
private final String tableName;
65+
66+
@Name("importQueryType")
67+
@Description("Whether to select Table Name or Import Query to extract the data.")
68+
@Macro
69+
@Nullable
70+
private final ImportQueryType importQueryType;
71+
72+
5873
public SnowflakeBatchSourceConfig(String referenceName, String accountName, String database,
59-
String schemaName, String importQuery, String username, String password,
74+
String schemaName, @Nullable String importQuery, String username, String password,
6075
@Nullable Boolean keyPairEnabled, @Nullable String path,
6176
@Nullable String passphrase, @Nullable Boolean oauth2Enabled,
6277
@Nullable String clientId, @Nullable String clientSecret,
6378
@Nullable String refreshToken, Long maxSplitSize,
64-
@Nullable String connectionArguments, @Nullable String schema) {
65-
super(accountName, database, schemaName, username, password,
66-
keyPairEnabled, path, passphrase, oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments);
79+
@Nullable String connectionArguments,
80+
@Nullable String schema,
81+
@Nullable String tableName,
82+
@Nullable ImportQueryType importQueryType) {
83+
super(
84+
accountName, database, schemaName, username, password, keyPairEnabled, path, passphrase,
85+
oauth2Enabled, clientId, clientSecret, refreshToken, connectionArguments
86+
);
6787
this.referenceName = referenceName;
6888
this.importQuery = importQuery;
6989
this.maxSplitSize = maxSplitSize;
7090
this.schema = schema;
91+
this.tableName = tableName;
92+
this.importQueryType = importQueryType;
7193
}
7294

7395
public String getImportQuery() {
@@ -87,6 +109,15 @@ public String getSchema() {
87109
return schema;
88110
}
89111

112+
@Nullable
113+
public String getTableName() {
114+
return tableName;
115+
}
116+
117+
public ImportQueryType getImportQueryType() {
118+
return importQueryType;
119+
}
120+
90121
public void validate(FailureCollector collector) {
91122
super.validate(collector);
92123

@@ -95,5 +126,11 @@ public void validate(FailureCollector collector) {
95126
collector.addFailure("Maximum Slit Size cannot be a negative number.", null)
96127
.withConfigProperty(PROPERTY_MAX_SPLIT_SIZE);
97128
}
129+
130+
if (Strings.isNullOrEmpty(importQuery) && Strings.isNullOrEmpty(tableName)) {
131+
collector.addFailure("Either 'Schema' or 'Table Name' must be provided.", null)
132+
.withConfigProperty(PROPERTY_IMPORT_QUERY)
133+
.withConfigProperty(PROPERTY_TABLE_NAME);
134+
}
98135
}
99136
}

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.snowflake.source.batch;
1818

1919
import au.com.bytecode.opencsv.CSVReader;
20+
import com.google.common.base.Strings;
2021
import io.cdap.plugin.snowflake.common.SnowflakeErrorType;
2122
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
2223
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
@@ -77,7 +78,12 @@ public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config, String escapeC
7778
*/
7879
public List<String> prepareStageSplits() {
7980
LOG.info("Loading data into stage: '{}'", STAGE_PATH);
80-
String copy = String.format(COMAND_COPY_INTO, QueryUtil.removeSemicolon(config.getImportQuery()));
81+
String importQuery = config.getImportQuery();
82+
if (Strings.isNullOrEmpty(importQuery)) {
83+
String tableName = config.getTableName();
84+
importQuery = String.format("SELECT * FROM %s", tableName);
85+
}
86+
String copy = String.format(COMAND_COPY_INTO, QueryUtil.removeSemicolon(importQuery));
8187
if (config.getMaxSplitSize() > 0) {
8288
copy = copy + String.format(COMMAND_MAX_FILE_SIZE, config.getMaxSplitSize());
8389
}
@@ -94,10 +100,13 @@ public List<String> prepareStageSplits() {
94100
}
95101
} catch (SQLException e) {
96102
String errorReason = String.format("Failed to load data into stage '%s' with sqlState %s and errorCode %s. " +
97-
"For more details, see %s.", STAGE_PATH, e.getErrorCode(), e.getSQLState(),
98-
DocumentUrlUtil.getSupportedDocumentUrl());
99-
String errorMessage = String.format("Failed to load data into stage '%s' with sqlState %s and errorCode %s. " +
100-
"Failed to execute query with message: %s.", STAGE_PATH, e.getSQLState(), e.getErrorCode(), e.getMessage());
103+
"For more details, see %s.", STAGE_PATH, e.getErrorCode(), e.getSQLState(),
104+
DocumentUrlUtil.getSupportedDocumentUrl());
105+
String errorMessage = String.format(
106+
"Failed to load data into stage '%s' with sqlState %s and errorCode %s. "
107+
+ "Failed to execute query with message: %s.",
108+
STAGE_PATH, e.getSQLState(), e.getErrorCode(), e.getMessage()
109+
);
101110
throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
102111
}
103112
return stageSplits;

0 commit comments

Comments
 (0)