diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index 005485a7a0a8e..76f4593f1d8f9 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -243,3 +243,34 @@ Format 参数 + +特性 +-------- + +### 允许 json array 直接展开成多行数据 + +通常,我们假设 JSON 的最外层数据是一个 JSON Object。所以一条 JSON 会转换成一行结果。 + +但是在某些情况下 JSON 的最外层数据可能是一个 JSON Array,我们期望它可以被展开成多条结果。 JSON Array 的每个元素都是一个 JSON Object, 这些 JSON Object 的 schema 需要和 SQL 定义一致。然后每个 JSON Object 会被转成一行结果。Flink JSON Format 支持对这种情况的默认处理。 + +例如,对于如下 DDL: +```sql +CREATE TABLE user_behavior ( + col1 BIGINT, + col2 VARCHAR +) WITH ( + 'format' = 'json', + ... +) +``` + +以下两种情况下 Flink JSON Format 都将会产生两条数据 `(123, "a")` 和 `(456, "b")`. +最外层是一个 JSON Array: +```json lines +[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}] +``` +最外层是一个 JSON Object: +```json lines +{"col1": 123, "col2": "a"} +{"col1": 456, "col2": "b"} +``` diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index 64592ac28bea7..7ead1ab90d825 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -257,6 +257,35 @@ The following table lists the type mapping from Flink type to JSON type. +Features +-------- +### Allow top-level JSON Arrays + +Usually, we assume the top-level of JSON string is a stringified JSON object. Then this stringified JSON object can be converted into one SQL row. + +There are some cases that, the top-level of JSON string is a stringified JSON array, and we want to explode the array into multiple records. Each element within the array is a JSON object, the schema of every such JSON object is the same as defined in SQL, and each of these JSON objects can be converted into one row. Flink JSON Format supports reading such data. + +For example, for the following SQL DDL: +```sql +CREATE TABLE user_behavior ( + col1 BIGINT, + col2 VARCHAR +) WITH ( + 'format' = 'json', + ... +) +``` + +Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both of following two JSON string. +The top-level is JSON Array: +```json lines +[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}] +``` +The top-level is JSON Object: +```json lines +{"col1": 123, "col2": "a"} +{"col1": 456, "col2": "b"} +``` diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index aa62e0d5f8711..a89eb042d38ff 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.json; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.common.TimestampFormat; @@ -25,12 +26,19 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,6 +70,10 @@ public abstract class AbstractJsonDeserializationSchema implements Deserializati private final boolean hasDecimalType; + private transient Collector collector; + + private transient List reusableCollectList; + public AbstractJsonDeserializationSchema( RowType rowType, TypeInformation resultTypeInfo, @@ -89,6 +101,23 @@ public void open(InitializationContext context) throws Exception { if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } + reusableCollectList = new ArrayList<>(); + collector = new ListCollector<>(reusableCollectList); + } + + @Override + public RowData deserialize(@Nullable byte[] message) throws IOException { + reusableCollectList.clear(); + deserialize(message, collector); + if (reusableCollectList.size() > 1) { + throw new FlinkRuntimeException( + "Please invoke " + + "DeserializationSchema#deserialize(byte[], Collector) instead."); + } + if (reusableCollectList.isEmpty()) { + return null; + } + return reusableCollectList.get(0); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java index 22df48f2ac209..eea1f607f1c58 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java @@ -23,6 +23,7 @@ import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; @@ -72,10 +73,10 @@ public JsonParserRowDataDeserializationSchema( } @Override - public RowData deserialize(byte[] message) throws IOException { + public void deserialize(byte[] message, Collector out) throws IOException { // return null when there is no token if (message == null || message.length == 0) { - return null; + return; } try (JsonParser root = objectMapper.getFactory().createParser(message)) { /* First: must point to a token; if not pointing to one, advance. @@ -85,16 +86,30 @@ public RowData deserialize(byte[] message) throws IOException { if (root.currentToken() == null) { root.nextToken(); } - if (root.currentToken() != JsonToken.START_OBJECT) { + if (root.currentToken() != JsonToken.START_OBJECT + && root.currentToken() != JsonToken.START_ARRAY) { throw JsonMappingException.from(root, "No content to map due to end-of-input"); } - return (RowData) runtimeConverter.convert(root); + if (root.currentToken() == JsonToken.START_ARRAY) { + processArray(root, out); + } else { + processObject(root, out); + } } catch (Throwable t) { - if (ignoreParseErrors) { - return null; + if (!ignoreParseErrors) { + throw new IOException( + format("Failed to deserialize JSON '%s'.", new String(message)), t); } - throw new IOException( - format("Failed to deserialize JSON '%s'.", new String(message)), t); } } + + private void processArray(JsonParser root, Collector out) throws IOException { + while (root.nextToken() != JsonToken.END_ARRAY) { + out.collect((RowData) runtimeConverter.convert(root)); + } + } + + private void processObject(JsonParser root, Collector out) throws IOException { + out.collect((RowData) runtimeConverter.convert(root)); + } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index 5a3fe22b308c8..e8f76eca52aba 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -23,8 +23,10 @@ import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; import javax.annotation.Nullable; @@ -63,18 +65,38 @@ public JsonRowDataDeserializationSchema( } @Override - public RowData deserialize(@Nullable byte[] message) throws IOException { + public void deserialize(@Nullable byte[] message, Collector out) throws IOException { if (message == null) { - return null; + return; } try { - return convertToRowData(deserializeToJsonNode(message)); + final JsonNode root = deserializeToJsonNode(message); + if (root != null && root.isArray()) { + ArrayNode arrayNode = (ArrayNode) root; + for (int i = 0; i < arrayNode.size(); i++) { + try { + RowData result = convertToRowData(arrayNode.get(i)); + if (result != null) { + out.collect(result); + } + } catch (Throwable t) { + if (!ignoreParseErrors) { + // will be caught by outer try-catch + throw t; + } + } + } + } else { + RowData result = convertToRowData(root); + if (result != null) { + out.collect(result); + } + } } catch (Throwable t) { - if (ignoreParseErrors) { - return null; + if (!ignoreParseErrors) { + throw new IOException( + format("Failed to deserialize JSON '%s'.", new String(message)), t); } - throw new IOException( - format("Failed to deserialize JSON '%s'.", new String(message)), t); } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index 916b04f50f8be..18d01c9c083ae 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.json; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.core.testutils.FlinkAssertions; @@ -34,6 +35,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -51,6 +53,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -224,6 +227,55 @@ void testSerDe() throws Exception { assertThat(serializedJson).containsExactly(actualBytes); } + @Test + public void testJsonArrayToMultiRecords() throws Exception { + DataType dataType = ROW(FIELD("f1", INT()), FIELD("f2", BOOLEAN()), FIELD("f3", STRING())); + RowType rowType = (RowType) dataType.getLogicalType(); + + ObjectMapper objectMapper = new ObjectMapper(); + + ObjectNode element1 = objectMapper.createObjectNode(); + element1.put("f1", 1); + element1.put("f2", true); + element1.put("f3", "str"); + + ObjectNode element2 = objectMapper.createObjectNode(); + element2.put("f1", 10); + element2.put("f2", false); + element2.put("f3", "newStr"); + + ArrayNode arrayNode = objectMapper.createArrayNode(); + arrayNode.add(element1); + arrayNode.add(element2); + + DeserializationSchema deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, false, TimestampFormat.ISO_8601); + + open(deserializationSchema); + + // test serialization + JsonRowDataSerializationSchema serializationSchema = + new JsonRowDataSerializationSchema( + rowType, + TimestampFormat.ISO_8601, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + true, + false); + open(serializationSchema); + + List result = new ArrayList<>(); + Collector collector = new ListCollector<>(result); + deserializationSchema.deserialize(objectMapper.writeValueAsBytes(arrayNode), collector); + assertThat(result).hasSize(2); + + byte[] result1 = serializationSchema.serialize(result.get(0)); + byte[] result2 = serializationSchema.serialize(result.get(1)); + assertThat(result1).isEqualTo(objectMapper.writeValueAsBytes(element1)); + assertThat(result2).isEqualTo(objectMapper.writeValueAsBytes(element2)); + } + /** * Tests the deserialization slow path, e.g. convert into string and use {@link * Double#parseDouble(String)}.