Skip to content

Commit 59b8948

Browse files
authored
Merge pull request #74 from DataSQRL/json
Extract json and csv formats from sqrl
2 parents 2725937 + a4fb883 commit 59b8948

File tree

16 files changed

+913
-0
lines changed

16 files changed

+913
-0
lines changed

formats/flexible-csv-format/pom.xml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.datasqrl.flinkrunner</groupId>
24+
<artifactId>formats</artifactId>
25+
<version>1.0.0-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>flexible-csv-format</artifactId>
29+
30+
<description>Flexible csv format</description>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-json</artifactId>
36+
<version>${flink.version}</version>
37+
<scope>provided</scope>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-csv</artifactId>
42+
<version>${flink.version}</version>
43+
<scope>provided</scope>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-table-common</artifactId>
48+
<version>${flink.version}</version>
49+
<scope>provided</scope>
50+
</dependency>
51+
</dependencies>
52+
53+
</project>
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.flink.format.csv;
17+
18+
import java.io.IOException;
19+
import java.util.Set;
20+
import lombok.SneakyThrows;
21+
import org.apache.flink.api.common.serialization.DeserializationSchema;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.configuration.ConfigOption;
24+
import org.apache.flink.configuration.ConfigOptions;
25+
import org.apache.flink.configuration.ReadableConfig;
26+
import org.apache.flink.formats.csv.CsvFormatFactory;
27+
import org.apache.flink.table.connector.ChangelogMode;
28+
import org.apache.flink.table.connector.format.DecodingFormat;
29+
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
30+
import org.apache.flink.table.connector.source.DynamicTableSource;
31+
import org.apache.flink.table.data.RowData;
32+
import org.apache.flink.table.factories.DeserializationFormatFactory;
33+
import org.apache.flink.table.factories.DynamicTableFactory;
34+
import org.apache.flink.table.factories.FactoryUtil;
35+
import org.apache.flink.table.types.DataType;
36+
37+
public class FlexibleCsv implements DeserializationFormatFactory {
38+
final CsvFormatFactory csvJson;
39+
40+
public FlexibleCsv() {
41+
csvJson = new CsvFormatFactory();
42+
}
43+
44+
ConfigOption<Boolean> skipHeader =
45+
ConfigOptions.key("skip-header").booleanType().defaultValue(true);
46+
47+
@Override
48+
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
49+
DynamicTableFactory.Context factoryContext, ReadableConfig formatOptions) {
50+
FactoryUtil.validateFactoryOptions(this, formatOptions);
51+
ProjectableDecodingFormat<DeserializationSchema<RowData>> decodingFormat =
52+
(ProjectableDecodingFormat) csvJson.createDecodingFormat(factoryContext, formatOptions);
53+
54+
return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
55+
@SneakyThrows
56+
@Override
57+
public DeserializationSchema<RowData> createRuntimeDecoder(
58+
DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
59+
DeserializationSchema<RowData> runtimeDecoder =
60+
decodingFormat.createRuntimeDecoder(context, physicalDataType, projections);
61+
boolean skipHeaderBool = formatOptions.get(skipHeader);
62+
RuntimeDecoderDelegate decoderDelegate =
63+
new RuntimeDecoderDelegate(runtimeDecoder, skipHeaderBool);
64+
return decoderDelegate;
65+
}
66+
67+
@Override
68+
public ChangelogMode getChangelogMode() {
69+
return ChangelogMode.insertOnly();
70+
}
71+
};
72+
}
73+
74+
public static class RuntimeDecoderDelegate implements DeserializationSchema<RowData> {
75+
76+
private final DeserializationSchema<RowData> runtimeDecoder;
77+
private final boolean skipHeader;
78+
private boolean hasSkipped = false;
79+
80+
public RuntimeDecoderDelegate(
81+
DeserializationSchema<RowData> runtimeDecoder, boolean skipHeader) {
82+
this.runtimeDecoder = runtimeDecoder;
83+
this.skipHeader = skipHeader;
84+
}
85+
86+
@Override
87+
public RowData deserialize(byte[] message) throws IOException {
88+
if (skipHeader && !hasSkipped) {
89+
this.hasSkipped = true;
90+
return null;
91+
}
92+
return runtimeDecoder.deserialize(message);
93+
}
94+
95+
@Override
96+
public boolean isEndOfStream(RowData nextElement) {
97+
return runtimeDecoder.isEndOfStream(nextElement);
98+
}
99+
100+
@Override
101+
public TypeInformation getProducedType() {
102+
return runtimeDecoder.getProducedType();
103+
}
104+
105+
@Override
106+
public void open(InitializationContext context) throws Exception {
107+
runtimeDecoder.open(context);
108+
}
109+
}
110+
111+
@Override
112+
public String factoryIdentifier() {
113+
return "flexible-csv";
114+
}
115+
116+
@Override
117+
public Set<ConfigOption<?>> requiredOptions() {
118+
return Set.of();
119+
}
120+
121+
@Override
122+
public Set<ConfigOption<?>> optionalOptions() {
123+
return Set.of(skipHeader);
124+
}
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.datasqrl.csv.FlexibleCsv
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
JDBC is provided scope.

formats/flexible-json-format/pom.xml

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.datasqrl.flinkrunner</groupId>
24+
<artifactId>formats</artifactId>
25+
<version>1.0.0-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>flexible-json-format</artifactId>
29+
<description>Flexible json format</description>
30+
31+
<dependencies>
32+
<dependency>
33+
<groupId>${project.groupId}</groupId>
34+
<artifactId>json-type</artifactId>
35+
<version>${project.version}</version>
36+
</dependency>
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-table-api-java-bridge</artifactId>
40+
<version>${flink.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-table-common</artifactId>
46+
<version>${flink.version}</version>
47+
<scope>provided</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.apache.flink</groupId>
51+
<artifactId>flink-json</artifactId>
52+
<version>${flink.version}</version>
53+
<scope>provided</scope>
54+
</dependency>
55+
</dependencies>
56+
57+
</project>
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.flink.format.json;
17+
18+
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
19+
import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
20+
21+
import java.util.Set;
22+
import lombok.SneakyThrows;
23+
import org.apache.flink.api.common.serialization.DeserializationSchema;
24+
import org.apache.flink.api.common.serialization.SerializationSchema;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.configuration.ConfigOption;
27+
import org.apache.flink.configuration.ReadableConfig;
28+
import org.apache.flink.formats.common.TimestampFormat;
29+
import org.apache.flink.formats.json.JsonFormatFactory;
30+
import org.apache.flink.formats.json.JsonFormatOptions;
31+
import org.apache.flink.formats.json.JsonFormatOptionsUtil;
32+
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
33+
import org.apache.flink.table.connector.ChangelogMode;
34+
import org.apache.flink.table.connector.Projection;
35+
import org.apache.flink.table.connector.format.DecodingFormat;
36+
import org.apache.flink.table.connector.format.EncodingFormat;
37+
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
38+
import org.apache.flink.table.connector.sink.DynamicTableSink;
39+
import org.apache.flink.table.connector.source.DynamicTableSource;
40+
import org.apache.flink.table.data.RowData;
41+
import org.apache.flink.table.factories.DeserializationFormatFactory;
42+
import org.apache.flink.table.factories.DynamicTableFactory.Context;
43+
import org.apache.flink.table.factories.FactoryUtil;
44+
import org.apache.flink.table.factories.SerializationFormatFactory;
45+
import org.apache.flink.table.types.DataType;
46+
import org.apache.flink.table.types.logical.RowType;
47+
48+
public class FlexibleJsonFormat
49+
implements DeserializationFormatFactory, SerializationFormatFactory {
50+
51+
public static final String FORMAT_NAME = "flexible-json";
52+
53+
/**
54+
* This just delegates to the "standard" json format in Flink
55+
*
56+
* @param context
57+
* @param formatOptions
58+
* @return
59+
*/
60+
@Override
61+
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
62+
Context context, ReadableConfig formatOptions) {
63+
FactoryUtil.validateFactoryOptions(this, formatOptions);
64+
65+
return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
66+
@SneakyThrows
67+
@Override
68+
public DeserializationSchema<RowData> createRuntimeDecoder(
69+
DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
70+
final DataType producedDataType = Projection.of(projections).project(physicalDataType);
71+
final RowType rowType = (RowType) producedDataType.getLogicalType();
72+
final TypeInformation<RowData> rowDataTypeInfo =
73+
context.createTypeInformation(producedDataType);
74+
JsonRowDataDeserializationSchema jsonRowDataDeserializationSchema =
75+
new JsonRowDataDeserializationSchema(
76+
rowType, rowDataTypeInfo, false, false, TimestampFormat.ISO_8601);
77+
return jsonRowDataDeserializationSchema;
78+
}
79+
80+
@Override
81+
public ChangelogMode getChangelogMode() {
82+
return ChangelogMode.insertOnly();
83+
}
84+
};
85+
}
86+
87+
/**
88+
* This uses a SQRL specific encoding format so that we can add support for SQRL types
89+
*
90+
* @param context
91+
* @param formatOptions
92+
* @return
93+
*/
94+
@Override
95+
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
96+
Context context, ReadableConfig formatOptions) {
97+
FactoryUtil.validateFactoryOptions(this, formatOptions);
98+
JsonFormatOptionsUtil.validateEncodingFormatOptions(formatOptions);
99+
100+
TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
101+
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
102+
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
103+
String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL);
104+
105+
final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
106+
107+
return new EncodingFormat<SerializationSchema<RowData>>() {
108+
@Override
109+
public SerializationSchema<RowData> createRuntimeEncoder(
110+
DynamicTableSink.Context context, DataType consumedDataType) {
111+
final RowType rowType = (RowType) consumedDataType.getLogicalType();
112+
return new SqrlJsonRowDataSerializationSchema(
113+
rowType,
114+
timestampOption,
115+
mapNullKeyMode,
116+
mapNullKeyLiteral,
117+
encodeDecimalAsPlainNumber);
118+
}
119+
120+
@Override
121+
public ChangelogMode getChangelogMode() {
122+
return ChangelogMode.insertOnly();
123+
}
124+
};
125+
}
126+
127+
@Override
128+
public String factoryIdentifier() {
129+
return FORMAT_NAME;
130+
}
131+
132+
@Override
133+
public Set<ConfigOption<?>> requiredOptions() {
134+
return new JsonFormatFactory().requiredOptions();
135+
}
136+
137+
@Override
138+
public Set<ConfigOption<?>> optionalOptions() {
139+
return new JsonFormatFactory().optionalOptions();
140+
}
141+
}

0 commit comments

Comments
 (0)