Skip to content

Commit 01de282

Browse files
committed
Extracted vector-type from sqrl project
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
1 parent e566543 commit 01de282

File tree

5 files changed

+201
-0
lines changed

5 files changed

+201
-0
lines changed

types/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
<modules>
3333
<module>json-type</module>
34+
<module>vector-type</module>
3435
</modules>
3536

3637
</project>

types/vector-type/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<parent>
23+
<groupId>com.datasqrl.flinkrunner</groupId>
24+
<artifactId>types</artifactId>
25+
<version>1.0.0-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>vector-type</artifactId>
29+
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.flink</groupId>
33+
<artifactId>flink-table-common</artifactId>
34+
<version>${flink.version}</version>
35+
<scope>provided</scope>
36+
</dependency>
37+
</dependencies>
38+
</project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.datasqrl.types.vector;
2+
3+
import org.apache.flink.table.annotation.DataTypeHint;
4+
5+
@DataTypeHint(
6+
value = "RAW",
7+
bridgedTo = FlinkVectorType.class,
8+
rawSerializer = FlinkVectorTypeSerializer.class)
9+
public class FlinkVectorType {
10+
public double[] value;
11+
12+
public FlinkVectorType(double[] value) {
13+
this.value = value;
14+
}
15+
16+
public double[] getValue() {
17+
return value;
18+
}
19+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.datasqrl.types.vector;
2+
3+
import java.io.IOException;
4+
import org.apache.flink.api.common.typeutils.TypeSerializer;
5+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
6+
import org.apache.flink.core.memory.DataInputView;
7+
import org.apache.flink.core.memory.DataOutputView;
8+
9+
public class FlinkVectorTypeSerializer extends TypeSerializer<FlinkVectorType> {
10+
11+
@Override
12+
public boolean isImmutableType() {
13+
return true;
14+
}
15+
16+
@Override
17+
public FlinkVectorType createInstance() {
18+
return new FlinkVectorType(null);
19+
}
20+
21+
@Override
22+
public FlinkVectorType copy(FlinkVectorType from) {
23+
return new FlinkVectorType(from.getValue());
24+
}
25+
26+
@Override
27+
public FlinkVectorType copy(FlinkVectorType from, FlinkVectorType reuse) {
28+
return copy(from);
29+
}
30+
31+
@Override
32+
public int getLength() {
33+
return -1; // indicates that this serializer does not have a fixed length
34+
}
35+
36+
@Override
37+
public void serialize(FlinkVectorType record, DataOutputView target) throws IOException {
38+
target.writeInt(record.getValue().length); // First write the length of the array
39+
for (double v : record.getValue()) {
40+
target.writeDouble(v); // Write each double value
41+
}
42+
}
43+
44+
@Override
45+
public FlinkVectorType deserialize(DataInputView source) throws IOException {
46+
int length = source.readInt();
47+
double[] array = new double[length];
48+
for (int i = 0; i < length; i++) {
49+
array[i] = source.readDouble();
50+
}
51+
return new FlinkVectorType(array);
52+
}
53+
54+
@Override
55+
public FlinkVectorType deserialize(FlinkVectorType reuse, DataInputView source)
56+
throws IOException {
57+
return deserialize(source);
58+
}
59+
60+
@Override
61+
public void copy(DataInputView source, DataOutputView target) throws IOException {
62+
target.writeUTF(source.readUTF());
63+
}
64+
65+
@Override
66+
public TypeSerializer<FlinkVectorType> duplicate() {
67+
return this;
68+
}
69+
70+
@Override
71+
public boolean equals(Object obj) {
72+
return obj instanceof FlinkVectorTypeSerializer;
73+
}
74+
75+
@Override
76+
public int hashCode() {
77+
return FlinkVectorTypeSerializer.class.hashCode();
78+
}
79+
80+
@Override
81+
public TypeSerializerSnapshot<FlinkVectorType> snapshotConfiguration() {
82+
return new FlinkVectorTypeSerializerSnapshot();
83+
}
84+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.datasqrl.types.vector;
2+
3+
import java.io.IOException;
4+
import org.apache.flink.api.common.typeutils.TypeSerializer;
5+
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
6+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
7+
import org.apache.flink.core.memory.DataInputView;
8+
import org.apache.flink.core.memory.DataOutputView;
9+
10+
public class FlinkVectorTypeSerializerSnapshot implements TypeSerializerSnapshot<FlinkVectorType> {
11+
12+
private Class<FlinkVectorTypeSerializer> serializerClass;
13+
14+
public FlinkVectorTypeSerializerSnapshot() {
15+
this.serializerClass = FlinkVectorTypeSerializer.class;
16+
}
17+
18+
@Override
19+
public int getCurrentVersion() {
20+
return 1;
21+
}
22+
23+
@Override
24+
public void writeSnapshot(DataOutputView out) throws IOException {
25+
out.writeUTF(FlinkVectorTypeSerializer.class.getName());
26+
}
27+
28+
@Override
29+
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
30+
throws IOException {
31+
String className = in.readUTF();
32+
try {
33+
this.serializerClass =
34+
(Class<FlinkVectorTypeSerializer>) Class.forName(className, true, userCodeClassLoader);
35+
} catch (ClassNotFoundException e) {
36+
throw new IOException("Failed to find serializer class: " + className, e);
37+
}
38+
}
39+
40+
@Override
41+
public TypeSerializer restoreSerializer() {
42+
try {
43+
return serializerClass.newInstance();
44+
} catch (InstantiationException | IllegalAccessException e) {
45+
throw new RuntimeException(
46+
"Failed to instantiate serializer class: " + serializerClass.getName(), e);
47+
}
48+
}
49+
50+
@Override
51+
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
52+
TypeSerializer newSerializer) {
53+
if (newSerializer.getClass() == this.serializerClass) {
54+
return TypeSerializerSchemaCompatibility.compatibleAsIs();
55+
} else {
56+
return TypeSerializerSchemaCompatibility.incompatible();
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)