|
20 | 20 | import io.cdap.cdap.api.common.Bytes;
|
21 | 21 | import io.cdap.cdap.api.data.format.StructuredRecord;
|
22 | 22 | import io.cdap.cdap.api.data.schema.Schema;
|
| 23 | +import org.slf4j.Logger; |
| 24 | +import org.slf4j.LoggerFactory; |
23 | 25 |
|
24 | 26 | import java.io.IOException;
|
25 | 27 | import java.math.BigDecimal;
|
|
39 | 41 | import java.util.Objects;
|
40 | 42 | import java.util.Set;
|
41 | 43 | import java.util.concurrent.TimeUnit;
|
| 44 | +import java.util.regex.Matcher; |
| 45 | +import java.util.regex.Pattern; |
42 | 46 | import javax.annotation.Nullable;
|
43 | 47 |
|
44 | 48 | /**
|
45 | 49 | * Util class to convert structured record into json.
|
46 | 50 | */
|
47 | 51 | public final class StructuredRecordToJson {
|
| 52 | + private static final Logger LOG = LoggerFactory.getLogger(StructuredRecordToJson.class); |
| 53 | + |
48 | 54 | private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
49 | 55 | private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
|
50 | 56 | // array of arrays and map of arrays are not supported by big query
|
51 | 57 | private static final Set<Schema.Type> UNSUPPORTED_ARRAY_TYPES = ImmutableSet.of(Schema.Type.ARRAY, Schema.Type.MAP);
|
52 | 58 |
|
| 59 | + private static final int MAX_LOGICAL_DATE_TIME_FRACTION_PRECISION = 6; |
| 60 | + /* BigQuery format for DateTime: YYYY-[M]M-[D]D[( |T)[H]H:[M]M:[S]S[.F]] |
| 61 | + * [.F]: Up to six fractional digits (microsecond precision) |
| 62 | + */ |
| 63 | + private static final Pattern LOGICAL_DATE_PATTERN = |
| 64 | + Pattern.compile("\\d{4}-\\d{1,2}-\\d{1,2}([ T]\\d{1,2}:\\d{1,2}:\\d{1,2}(\\.(\\d+))?)?"); |
| 65 | + private static final int TIME_FRACTION_GROUP = 3; |
| 66 | + |
53 | 67 | /**
|
54 | 68 | * Writes object and writes to json writer.
|
55 |
| - * @param writer json writer to write the object to |
56 |
| - * @param name name of the field to be written |
57 |
| - * @param object object to be written |
| 69 | + * |
| 70 | + * @param writer json writer to write the object to |
| 71 | + * @param name name of the field to be written |
| 72 | + * @param object object to be written |
58 | 73 | * @param fieldSchema field schema to be written
|
59 | 74 | */
|
60 | 75 | public static void write(JsonWriter writer, String name, Object object, Schema fieldSchema) throws IOException {
|
@@ -143,8 +158,14 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA
|
143 | 158 | writer.value(Objects.requireNonNull(getDecimal((byte[]) object, schema)).toPlainString());
|
144 | 159 | break;
|
145 | 160 | case DATETIME:
|
146 |
| - //datetime should be already an ISO-8601 string |
147 |
| - writer.value(Objects.requireNonNull(object.toString())); |
| 161 | + String strValue = object.toString(); |
| 162 | + // Datetime should be already an ISO-8601 string |
| 163 | + // But BigQuery format is stricter than ISO-8601 and does not support Zone and Offset |
| 164 | + // Hence it is more closer to DateTimeFormatter.ISO_LOCAL_DATE_TIME but with microsecond precision |
| 165 | + // Check if the value matches expected format for DateTime and trim time fraction to |
| 166 | + // MAX_TIME_FRACTION_PRECISION if it exceeds it |
| 167 | + strValue = checkAndTrimToMaxSupportedPrecision(strValue); |
| 168 | + writer.value(Objects.requireNonNull(strValue)); |
148 | 169 | break;
|
149 | 170 | default:
|
150 | 171 | throw new IllegalStateException(
|
@@ -185,6 +206,27 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA
|
185 | 206 | }
|
186 | 207 | }
|
187 | 208 |
|
| 209 | + private static String checkAndTrimToMaxSupportedPrecision(String strValue) { |
| 210 | + Matcher matcher = LOGICAL_DATE_PATTERN.matcher(strValue); |
| 211 | + if (matcher.matches()) { |
| 212 | + String timeFraction = matcher.group(TIME_FRACTION_GROUP); |
| 213 | + //matcher.group returns null for a group if an optional group did not exist in the string |
| 214 | + if (timeFraction != null && timeFraction.length() > MAX_LOGICAL_DATE_TIME_FRACTION_PRECISION) { |
| 215 | + //Trim the time fraction to max supported precision |
| 216 | + String trimmedTimeFraction = timeFraction.substring(0, MAX_LOGICAL_DATE_TIME_FRACTION_PRECISION); |
| 217 | + strValue = new StringBuilder(strValue) |
| 218 | + .replace(matcher.start(TIME_FRACTION_GROUP), matcher.end(TIME_FRACTION_GROUP), trimmedTimeFraction) |
| 219 | + .toString(); |
| 220 | + } |
| 221 | + } else { |
| 222 | + //Don't throw exception for now as we might be missing some scenario in the format |
| 223 | + //Let it fail during BigQuery insert in case of wrong format |
| 224 | + LOG.warn("Invalid value {} for DATETIME type, it should match the " + |
| 225 | + "format YYYY-[M]M-[D]D[( |T)[H]H:[M]M:[S]S[.F]]", strValue); |
| 226 | + } |
| 227 | + return strValue; |
| 228 | + } |
| 229 | + |
188 | 230 | private static void writeArray(JsonWriter writer,
|
189 | 231 | String name,
|
190 | 232 | @Nullable Object value,
|
|
0 commit comments