Skip to content

Commit 598cb25

Browse files
committed
WIP
1 parent 7b4ef2a commit 598cb25

3 files changed

Lines changed: 4 additions & 112 deletions

File tree

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java

Lines changed: 1 addition & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
package org.hypertrace.core.documentstore.postgres;
22

3-
import com.fasterxml.jackson.databind.JsonNode;
4-
import com.fasterxml.jackson.databind.ObjectMapper;
53
import java.io.IOException;
6-
import java.sql.PreparedStatement;
7-
import java.sql.ResultSet;
8-
import java.sql.SQLException;
9-
import java.util.ArrayList;
10-
import java.util.Iterator;
114
import java.util.List;
125
import java.util.Map;
136
import java.util.Optional;
147
import java.util.Set;
15-
import java.util.stream.Collectors;
168
import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest;
179
import org.hypertrace.core.documentstore.BulkDeleteResult;
1810
import org.hypertrace.core.documentstore.BulkUpdateRequest;
@@ -26,7 +18,6 @@
2618
import org.hypertrace.core.documentstore.model.options.QueryOptions;
2719
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
2820
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
29-
import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata;
3021
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
3122
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer;
3223
import org.hypertrace.core.documentstore.query.Query;
@@ -43,7 +34,6 @@
4334
public class FlatPostgresCollection extends PostgresCollection {
4435

4536
private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class);
46-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4737
private static final String WRITE_NOT_SUPPORTED =
4838
"Write operations are not supported for flat collections yet!";
4939

@@ -97,105 +87,7 @@ public boolean upsert(Key key, Document document) throws IOException {
9787

9888
@Override
9989
public Document upsertAndReturn(Key key, Document document) throws IOException {
100-
String tableName = tableIdentifier.getTableName();
101-
Map<String, PostgresColumnMetadata> schema = schemaRegistry.getSchema(tableName);
102-
103-
if (schema.isEmpty()) {
104-
throw new IOException("No schema found for table: " + tableName);
105-
}
106-
107-
try {
108-
JsonNode docJson = OBJECT_MAPPER.readTree(document.toJson());
109-
List<String> columns = new ArrayList<>();
110-
List<Object> values = new ArrayList<>();
111-
112-
// Extract fields from document that exist in schema
113-
Iterator<Map.Entry<String, JsonNode>> fields = docJson.fields();
114-
while (fields.hasNext()) {
115-
Map.Entry<String, JsonNode> field = fields.next();
116-
String colName = field.getKey();
117-
if (schemaRegistry.getColumnOrRefresh(tableName, colName).isPresent()) {
118-
columns.add(colName);
119-
values.add(extractValue(field.getValue()));
120-
}
121-
}
122-
123-
if (columns.isEmpty()) {
124-
throw new IOException("No matching columns found in schema for document");
125-
}
126-
127-
// Build UPSERT SQL: INSERT ... ON CONFLICT DO UPDATE
128-
String columnList = String.join(", ", columns);
129-
String placeholders = columns.stream().map(c -> "?").collect(Collectors.joining(", "));
130-
String updateSet =
131-
columns.stream().map(c -> c + " = EXCLUDED." + c).collect(Collectors.joining(", "));
132-
133-
// Determine primary key column (assume first column or 'id')
134-
String pkColumn = schema.containsKey("id") ? "id" : columns.get(0);
135-
136-
String sql =
137-
String.format(
138-
"INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING *",
139-
tableIdentifier, columnList, placeholders, pkColumn, updateSet);
140-
141-
try (PreparedStatement ps = client.getConnection().prepareStatement(sql)) {
142-
for (int i = 0; i < values.size(); i++) {
143-
ps.setObject(i + 1, values.get(i));
144-
}
145-
146-
try (ResultSet rs = ps.executeQuery()) {
147-
if (rs.next()) {
148-
return resultSetToDocument(rs, columns);
149-
}
150-
}
151-
}
152-
return document;
153-
} catch (SQLException e) {
154-
LOGGER.error("SQLException in upsertAndReturn. key: {} document: {}", key, document, e);
155-
throw new IOException(e);
156-
}
157-
}
158-
159-
private Object extractValue(JsonNode node) {
160-
if (node.isNull()) {
161-
return null;
162-
} else if (node.isBoolean()) {
163-
return node.booleanValue();
164-
} else if (node.isInt()) {
165-
return node.intValue();
166-
} else if (node.isLong()) {
167-
return node.longValue();
168-
} else if (node.isDouble() || node.isFloat()) {
169-
return node.doubleValue();
170-
} else if (node.isTextual()) {
171-
return node.textValue();
172-
} else {
173-
return node.toString();
174-
}
175-
}
176-
177-
private Document resultSetToDocument(ResultSet rs, List<String> columns)
178-
throws SQLException, IOException {
179-
StringBuilder json = new StringBuilder("{");
180-
for (int i = 0; i < columns.size(); i++) {
181-
if (i > 0) {
182-
json.append(",");
183-
}
184-
String col = columns.get(i);
185-
Object value = rs.getObject(col);
186-
json.append("\"").append(col).append("\":");
187-
if (value == null) {
188-
json.append("null");
189-
} else if (value instanceof String) {
190-
json.append("\"").append(value).append("\"");
191-
} else if (value instanceof Boolean) {
192-
json.append(value);
193-
} else {
194-
json.append(value);
195-
}
196-
}
197-
json.append("}");
198-
return new org.hypertrace.core.documentstore.JSONDocument(json.toString());
90+
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
19991
}
20092

20193
@Override

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) {
6060
client = new PostgresClient(postgresConnectionConfig);
6161
database = connectionConfig.database();
6262
docStoreMetricProvider = new PostgresDocStoreMetricProvider(this, postgresConnectionConfig);
63-
schemaRegistry = new PostgresSchemaRegistry(new PostgresMetadataFetcher(this));
63+
schemaRegistry = new PostgresSchemaRegistry(new PostgresMetadataFetcher(client));
6464
} catch (final IllegalArgumentException e) {
6565
throw new IllegalArgumentException(
6666
String.format("Unable to instantiate PostgresClient with config:%s", connectionConfig),

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresMetadataFetcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
@AllArgsConstructor
1919
public class PostgresMetadataFetcher {
2020

21-
private final PostgresDatastore datastore;
21+
private final PostgresClient client;
2222

2323
private static final String DISCOVERY_SQL =
2424
"SELECT column_name, udt_name, is_nullable "
@@ -28,7 +28,7 @@ public class PostgresMetadataFetcher {
2828
public Map<String, PostgresColumnMetadata> fetch(String tableName) {
2929
Map<String, PostgresColumnMetadata> metadataMap = new HashMap<>();
3030

31-
try (Connection conn = datastore.getPostgresClient();
31+
try (Connection conn = client.getPooledConnection();
3232
PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) {
3333

3434
ps.setString(1, tableName);

0 commit comments

Comments
 (0)