|
18 | 18 |
|
19 | 19 | import au.com.bytecode.opencsv.CSVReader;
|
20 | 20 | import com.google.common.base.Strings;
|
| 21 | +import io.cdap.cdap.api.data.schema.Schema; |
21 | 22 | import io.cdap.plugin.snowflake.common.SnowflakeErrorType;
|
22 | 23 | import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
|
23 | 24 | import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
|
24 | 25 | import io.cdap.plugin.snowflake.common.util.QueryUtil;
|
| 26 | +import io.cdap.plugin.snowflake.common.util.SchemaHelper; |
25 | 27 | import net.snowflake.client.jdbc.SnowflakeConnection;
|
26 | 28 | import org.slf4j.Logger;
|
27 | 29 | import org.slf4j.LoggerFactory;
|
|
35 | 37 | import java.util.ArrayList;
|
36 | 38 | import java.util.List;
|
37 | 39 | import java.util.UUID;
|
| 40 | +import java.util.stream.Collectors; |
38 | 41 |
|
39 | 42 | /**
|
40 | 43 | * A class which accesses Snowflake API to do actions used by batch source.
|
@@ -81,7 +84,15 @@ public List<String> prepareStageSplits() {
|
81 | 84 | String importQuery = config.getImportQuery();
|
82 | 85 | if (Strings.isNullOrEmpty(importQuery)) {
|
83 | 86 | String tableName = config.getTableName();
|
84 |
| - importQuery = String.format("SELECT * FROM %s", tableName); |
| 87 | + Schema schema = SchemaHelper.getParsedSchema(config.getSchema()); |
| 88 | + if (schema != null && schema.getFields() != null && !schema.getFields().isEmpty()) { |
| 89 | + String columns = schema.getFields().stream() |
| 90 | + .map(Schema.Field::getName) |
| 91 | + .collect(Collectors.joining(",")); |
| 92 | + importQuery = String.format("SELECT %s FROM %s", columns, tableName); |
| 93 | + } else { |
| 94 | + importQuery = String.format("SELECT * FROM %s", tableName); |
| 95 | + } |
85 | 96 | }
|
86 | 97 | String copy = String.format(COMAND_COPY_INTO, QueryUtil.removeSemicolon(importQuery));
|
87 | 98 | if (config.getMaxSplitSize() > 0) {
|
|
0 commit comments