Skip to content

Commit 7d4437d

Browse files
committed
Extracted json-type from sqrl project
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
1 parent 2725937 commit 7d4437d

File tree

6 files changed

+241
-0
lines changed

6 files changed

+241
-0
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
<modules>
5353
<module>functions</module>
54+
<module>types</module>
5455
<module>testing</module>
5556
<module>flink-sql-runner</module>
5657
</modules>

types/json-type/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<parent>
23+
<groupId>com.datasqrl.flinkrunner</groupId>
24+
<artifactId>types</artifactId>
25+
<version>1.0.0-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>json-type</artifactId>
29+
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.flink</groupId>
33+
<artifactId>flink-table-common</artifactId>
34+
<version>${flink.version}</version>
35+
<scope>provided</scope>
36+
</dependency>
37+
</dependencies>
38+
</project>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.datasqrl.types.json;
2+
3+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
4+
import org.apache.flink.table.annotation.DataTypeHint;
5+
6+
@DataTypeHint(
7+
value = "RAW",
8+
bridgedTo = FlinkJsonType.class,
9+
rawSerializer = FlinkJsonTypeSerializer.class)
10+
public class FlinkJsonType {
11+
public JsonNode json;
12+
13+
public FlinkJsonType(JsonNode json) {
14+
this.json = json;
15+
}
16+
17+
public JsonNode getJson() {
18+
return json;
19+
}
20+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.datasqrl.types.json;
2+
3+
import java.io.IOException;
4+
import org.apache.flink.api.common.typeutils.TypeSerializer;
5+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
6+
import org.apache.flink.core.memory.DataInputView;
7+
import org.apache.flink.core.memory.DataOutputView;
8+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
9+
10+
public class FlinkJsonTypeSerializer extends TypeSerializer<FlinkJsonType> {
11+
12+
ObjectMapper mapper = new ObjectMapper();
13+
14+
@Override
15+
public boolean isImmutableType() {
16+
return true;
17+
}
18+
19+
@Override
20+
public FlinkJsonType createInstance() {
21+
return new FlinkJsonType(null);
22+
}
23+
24+
@Override
25+
public FlinkJsonType copy(FlinkJsonType from) {
26+
return new FlinkJsonType(from.getJson());
27+
}
28+
29+
@Override
30+
public FlinkJsonType copy(FlinkJsonType from, FlinkJsonType reuse) {
31+
return copy(from);
32+
}
33+
34+
@Override
35+
public int getLength() {
36+
return -1; // indicates that this serializer does not have a fixed length
37+
}
38+
39+
@Override
40+
public void serialize(FlinkJsonType record, DataOutputView target) throws IOException {
41+
byte[] jsonData = mapper.writeValueAsBytes(record.getJson());
42+
target.writeInt(jsonData.length);
43+
target.write(jsonData);
44+
}
45+
46+
@Override
47+
public FlinkJsonType deserialize(DataInputView source) throws IOException {
48+
int length = source.readInt();
49+
byte[] jsonData = new byte[length];
50+
source.readFully(jsonData);
51+
return new FlinkJsonType(mapper.readTree(jsonData));
52+
}
53+
54+
@Override
55+
public FlinkJsonType deserialize(FlinkJsonType reuse, DataInputView source) throws IOException {
56+
return deserialize(source);
57+
}
58+
59+
@Override
60+
public void copy(DataInputView source, DataOutputView target) throws IOException {
61+
int length = source.readInt();
62+
byte[] jsonData = new byte[length];
63+
source.readFully(jsonData);
64+
target.writeInt(length);
65+
target.write(jsonData);
66+
}
67+
68+
@Override
69+
public TypeSerializer<FlinkJsonType> duplicate() {
70+
return this;
71+
}
72+
73+
@Override
74+
public boolean equals(Object obj) {
75+
return obj instanceof FlinkJsonTypeSerializer;
76+
}
77+
78+
@Override
79+
public int hashCode() {
80+
return FlinkJsonTypeSerializer.class.hashCode();
81+
}
82+
83+
@Override
84+
public TypeSerializerSnapshot<FlinkJsonType> snapshotConfiguration() {
85+
return new FlinkJsonTypeSerializerSnapshot();
86+
}
87+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.datasqrl.types.json;
2+
3+
import java.io.IOException;
4+
import org.apache.flink.api.common.typeutils.TypeSerializer;
5+
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
6+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
7+
import org.apache.flink.core.memory.DataInputView;
8+
import org.apache.flink.core.memory.DataOutputView;
9+
10+
public class FlinkJsonTypeSerializerSnapshot implements TypeSerializerSnapshot<FlinkJsonType> {
11+
12+
private Class<FlinkJsonTypeSerializer> serializerClass;
13+
14+
public FlinkJsonTypeSerializerSnapshot() {
15+
this.serializerClass = FlinkJsonTypeSerializer.class;
16+
}
17+
18+
@Override
19+
public int getCurrentVersion() {
20+
return 1;
21+
}
22+
23+
@Override
24+
public void writeSnapshot(DataOutputView out) throws IOException {
25+
out.writeUTF(FlinkJsonTypeSerializer.class.getName());
26+
}
27+
28+
@Override
29+
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
30+
throws IOException {
31+
String className = in.readUTF();
32+
try {
33+
this.serializerClass =
34+
(Class<FlinkJsonTypeSerializer>) Class.forName(className, true, userCodeClassLoader);
35+
} catch (ClassNotFoundException e) {
36+
throw new IOException("Failed to find serializer class: " + className, e);
37+
}
38+
}
39+
40+
@Override
41+
public TypeSerializer restoreSerializer() {
42+
try {
43+
return serializerClass.newInstance();
44+
} catch (InstantiationException | IllegalAccessException e) {
45+
throw new RuntimeException(
46+
"Failed to instantiate serializer class: " + serializerClass.getName(), e);
47+
}
48+
}
49+
50+
@Override
51+
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
52+
TypeSerializer newSerializer) {
53+
if (newSerializer.getClass() == this.serializerClass) {
54+
return TypeSerializerSchemaCompatibility.compatibleAsIs();
55+
} else {
56+
return TypeSerializerSchemaCompatibility.incompatible();
57+
}
58+
}
59+
}

types/pom.xml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>com.datasqrl.flinkrunner</groupId>
25+
<artifactId>flink-sql-runner-parent</artifactId>
26+
<version>1.0.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>types</artifactId>
30+
<packaging>pom</packaging>
31+
32+
<modules>
33+
<module>json-type</module>
34+
</modules>
35+
36+
</project>

0 commit comments

Comments
 (0)