Skip to content

Extracted vector-type from sqrl project #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2025
Merged

Extracted vector-type from sqrl project #76

merged 2 commits into from
Apr 18, 2025

Conversation

velo
Copy link
Collaborator

@velo velo commented Apr 18, 2025

No description provided.

velo added 2 commits April 17, 2025 21:29
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
@velo velo requested a review from Copilot April 18, 2025 01:35
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR extracts the vector type from the sqrl project and extends PostgreSQL connector functionality. Key changes include the introduction of FlinkVectorType with its serializer, new PostgreSQL type serializers and converters, and updates to the JDBC dynamic table factory for Postgres integration.

Reviewed Changes

Copilot reviewed 13 out of 21 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java Adds a custom serializer for the vector type with methods for serialization/deserialization.
types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java Introduces the Flink vector type class with a public double array.
connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java Provides an integration test for writing and reading data via PostgreSQL.
connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/* Implements several PostgreSQL-specific type serializers and converters.
connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/* Updates Postgres dialect, dynamic table factory, and base row converter logic to support the new types.
Files not reviewed (8)
  • connectors/pom.xml: Language not supported
  • connectors/postgresql-connector/pom.xml: Language not supported
  • connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer: Language not supported
  • connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory: Language not supported
  • flink-sql-runner/pom.xml: Language not supported
  • pom.xml: Language not supported
  • types/pom.xml: Language not supported
  • types/vector-type/pom.xml: Language not supported


@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeUTF(source.readUTF());
Copy link
Preview

Copilot AI Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of the copy(DataInputView, DataOutputView) method does not follow the serialization format used in serialize(), which writes an int and a series of doubles. Use corresponding read/write operations (i.e. read an int then the double values) to ensure the vector data is copied correctly.

Suggested change
target.writeUTF(source.readUTF());
int length = source.readInt(); // Read the length of the array
target.writeInt(length); // Write the length of the array
for (int i = 0; i < length; i++) {
target.writeDouble(source.readDouble()); // Read and write each double value
}

Copilot uses AI. Check for mistakes.

(val, index, statement) -> {
if (val != null && !val.isNullAt(index)) {
PGobject pgObject = new PGobject();
pgObject.setType("json");
Copy link
Preview

Copilot AI Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serializer declares its dialect type name as 'jsonb' but sets the PGobject type to 'json'; ensure consistency by setting the type to 'jsonb' if that is the intended behavior.

Suggested change
pgObject.setType("json");
pgObject.setType("jsonb");

Copilot uses AI. Check for mistakes.

Comment on lines +52 to +53
return (val) -> null;
};
Copy link
Preview

Copilot AI Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deserializer converter currently returns null for any input value, which may lead to data loss if not intentional. Implement a proper conversion so that the input value is correctly deserialized.

Suggested change
return (val) -> null;
};
return (val) -> {
if (val == null) {
return null;
}
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(val.toString());
// Convert JsonNode to Row or other appropriate structure
// Assuming a simple conversion to Row for demonstration
Row row = new Row(jsonNode.size());
int index = 0;
for (JsonNode field : jsonNode) {
row.setField(index++, field.asText());
}
return row;
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize value: " + val, e);
}
};

Copilot uses AI. Check for mistakes.

@velo velo merged commit 112a459 into main Apr 18, 2025
3 checks passed
@velo velo deleted the vector branch April 18, 2025 01:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant