-
Notifications
You must be signed in to change notification settings - Fork 1
Extracted vector-type from sqrl project #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR extracts the vector type from the sqrl project and extends PostgreSQL connector functionality. Key changes include the introduction of FlinkVectorType with its serializer, new PostgreSQL type serializers and converters, and updates to the JDBC dynamic table factory for Postgres integration.
Reviewed Changes
Copilot reviewed 13 out of 21 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java | Adds a custom serializer for the vector type with methods for serialization/deserialization. |
types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java | Introduces the Flink vector type class with a public double array. |
connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java | Provides an integration test for writing and reading data via PostgreSQL. |
connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/* | Implements several PostgreSQL-specific type serializers and converters. |
connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/* | Updates Postgres dialect, dynamic table factory, and base row converter logic to support the new types. |
Files not reviewed (8)
- connectors/pom.xml: Language not supported
- connectors/postgresql-connector/pom.xml: Language not supported
- connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer: Language not supported
- connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory: Language not supported
- flink-sql-runner/pom.xml: Language not supported
- pom.xml: Language not supported
- types/pom.xml: Language not supported
- types/vector-type/pom.xml: Language not supported
|
||
@Override | ||
public void copy(DataInputView source, DataOutputView target) throws IOException { | ||
target.writeUTF(source.readUTF()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of the copy(DataInputView, DataOutputView) method does not follow the serialization format used in serialize(), which writes an int and a series of doubles. Use corresponding read/write operations (i.e. read an int then the double values) to ensure the vector data is copied correctly.
target.writeUTF(source.readUTF()); | |
int length = source.readInt(); // Read the length of the array | |
target.writeInt(length); // Write the length of the array | |
for (int i = 0; i < length; i++) { | |
target.writeDouble(source.readDouble()); // Read and write each double value | |
} |
Copilot uses AI. Check for mistakes.
(val, index, statement) -> { | ||
if (val != null && !val.isNullAt(index)) { | ||
PGobject pgObject = new PGobject(); | ||
pgObject.setType("json"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serializer declares its dialect type name as 'jsonb' but sets the PGobject type to 'json'; ensure consistency by setting the type to 'jsonb' if that is the intended behavior.
pgObject.setType("json"); | |
pgObject.setType("jsonb"); |
Copilot uses AI. Check for mistakes.
return (val) -> null; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deserializer converter currently returns null for any input value, which may lead to data loss if not intentional. Implement a proper conversion so that the input value is correctly deserialized.
return (val) -> null; | |
}; | |
return (val) -> { | |
if (val == null) { | |
return null; | |
} | |
try { | |
ObjectMapper mapper = new ObjectMapper(); | |
JsonNode jsonNode = mapper.readTree(val.toString()); | |
// Convert JsonNode to Row or other appropriate structure | |
// Assuming a simple conversion to Row for demonstration | |
Row row = new Row(jsonNode.size()); | |
int index = 0; | |
for (JsonNode field : jsonNode) { | |
row.setField(index++, field.asText()); | |
} | |
return row; | |
} catch (Exception e) { | |
throw new RuntimeException("Failed to deserialize value: " + val, e); | |
} | |
}; |
Copilot uses AI. Check for mistakes.
No description provided.