From 0e76cd6b2cb56168245b146f9e3aeaf1128d1980 Mon Sep 17 00:00:00 2001 From: Nimrod Lahav Date: Sat, 11 Oct 2025 12:33:47 +0300 Subject: [PATCH 1/6] [OPIK-2469] [BE] Fix duplicate experiment items in dataset comparison Added LIMIT 1 BY id clause to experiment_items_final CTE in the SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS SQL query. This ensures that only the latest version of each experiment item is returned, preventing duplicates from appearing in dataset comparison results. The root cause was that when multiple versions of an experiment item existed in ClickHouse (due to updates or race conditions), all versions were being returned. The LIMIT 1 BY id clause deduplicates based on experiment item ID, keeping only the most recent version (based on last_updated_at DESC ordering). --- .../src/main/java/com/comet/opik/domain/DatasetItemDAO.java | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java index a887783f43e..50827cf020d 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java @@ -606,6 +606,7 @@ ORDER BY (workspace_id, project_id, id) DESC, last_updated_at DESC ) ORDER BY id DESC, last_updated_at DESC + LIMIT 1 BY id ) SELECT ei.dataset_item_id AS id, From bcc09b9b7bcfe740307afea69be237864c8403c4 Mon Sep 17 00:00:00 2001 From: Nimrod Lahav Date: Sat, 11 Oct 2025 18:57:58 +0300 Subject: [PATCH 2/6] Revision 2: Add integration test for duplicate experiment items Added DuplicateExperimentItemsTest to verify that the LIMIT 1 BY id fix correctly prevents duplicate experiment items from being returned even when multiple versions exist in ClickHouse. The test: 1. Creates dataset, dataset items, traces, and experiment items 2. Manually inserts a duplicate experiment item directly into ClickHouse 3. Queries the endpoint and verifies no duplicates are returned 4. Asserts that exactly 2 unique experiment items are returned (not 3) --- .../v1/priv/DatasetsResourceTest.java | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java index a100b69e11a..73c66a0f129 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java @@ -6595,4 +6595,140 @@ void getExperimentItemsStats__withFeedbackScoresIsNotEmptyFilter() { TraceAssertions.assertStats(stats.stats(), expectedStats); } } + + @Nested + @DisplayName("OPIK-2469: Duplicate Experiment Items Test") + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class DuplicateExperimentItemsTest { + + @Test + @DisplayName("Should return unique experiment items when duplicates exist in ClickHouse") + void findDatasetItemsWithExperimentItems__whenDuplicatesExistInClickHouse__thenReturnUniqueItems() { + + var workspaceName = UUID.randomUUID().toString(); + var apiKey = UUID.randomUUID().toString(); + var workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + // Create dataset + var dataset = factory.manufacturePojo(Dataset.class); + var datasetId = createAndAssert(dataset, apiKey, workspaceName); + + // Create dataset item + var datasetItem = factory.manufacturePojo(DatasetItem.class); + var datasetItemBatch = DatasetItemBatch.builder() + .datasetId(datasetId) + .items(List.of(datasetItem)) + .build(); + putAndAssert(datasetItemBatch, workspaceName, apiKey); + + // Create traces + var trace1 = factory.manufacturePojo(Trace.class); + var trace2 = factory.manufacturePojo(Trace.class); + createAndAssert(trace1, workspaceName, apiKey); + createAndAssert(trace2, workspaceName, apiKey); + + // Create experiment and experiment items + var experimentId = GENERATOR.generate(); + var experimentItem1 = factory.manufacturePojo(ExperimentItem.class).toBuilder() + .experimentId(experimentId) + .datasetItemId(datasetItem.id()) + .traceId(trace1.id()) + .input(trace1.input()) + .output(trace1.output()) + .build(); + + var experimentItem2 = factory.manufacturePojo(ExperimentItem.class).toBuilder() + .experimentId(experimentId) + .datasetItemId(datasetItem.id()) + .traceId(trace2.id()) + .input(trace2.input()) + .output(trace2.output()) + .build(); + + var experimentItemsBatch = ExperimentItemsBatch.builder() + .experimentItems(Set.of(experimentItem1, experimentItem2)) + .build(); + createAndAssert(experimentItemsBatch, apiKey, workspaceName); + + // Manually insert a duplicate of experimentItem1 directly into ClickHouse + // This simulates the bug condition where multiple versions exist + insertDuplicateExperimentItem(workspaceId, experimentItem1); + + // Query the endpoint + var result = datasetResourceClient.getDatasetItemsWithExperimentItems( + datasetId, + List.of(experimentId), + apiKey, + workspaceName); + + // Assert results + assertThat(result).isNotNull(); + assertThat(result.content()).hasSize(1); + + var datasetItemResult = result.content().get(0); + assertThat(datasetItemResult.id()).isEqualTo(datasetItem.id()); + + // CRITICAL ASSERTION: Should have exactly 2 unique experiment items (no duplicates) + var experimentItems = datasetItemResult.experimentItems(); + assertThat(experimentItems).isNotNull(); + + // Count experiment items by their ID to detect duplicates + var experimentItemIds = experimentItems.stream() + .map(ExperimentItem::id) + .collect(Collectors.toList()); + + var uniqueIds = new HashSet<>(experimentItemIds); + + // THIS IS THE KEY ASSERTION - Verifies fix for OPIK-2469 + assertThat(experimentItemIds) + .as("Should not contain duplicate experiment item IDs") + .hasSameSizeAs(uniqueIds) + .as("Should have exactly 2 unique experiment items") + .hasSize(2); + + // Verify the correct experiment items are present + assertThat(uniqueIds).containsExactlyInAnyOrder(experimentItem1.id(), experimentItem2.id()); + + // Verify each experiment item appears only once + experimentItemIds.forEach(id -> { + long count = experimentItemIds.stream().filter(i -> i.equals(id)).count(); + assertThat(count) + .as("Experiment item '%s' should appear exactly once, but appears '%d' times", id, count) + .isEqualTo(1); + }); + } + + /** + * Helper method to insert a duplicate experiment item directly into ClickHouse. + * This simulates the bug condition where multiple versions of the same item exist. + */ + private void insertDuplicateExperimentItem(String workspaceId, ExperimentItem item) { + try (var connection = CLICKHOUSE.createConnection("?database=" + DATABASE_NAME)) { + var statement = connection.createStatement(); + + // Insert a duplicate row with an older timestamp + // Format as Unix timestamp (seconds since epoch) which ClickHouse accepts + var olderTimestamp = Instant.now().minus(1, ChronoUnit.HOURS).getEpochSecond(); + + String sql = String.format( + """ + INSERT INTO experiment_items ( + id, workspace_id, experiment_id, dataset_item_id, trace_id, + created_at, last_updated_at, created_by, last_updated_by + ) VALUES ( + '%s', '%s', '%s', '%s', '%s', + %d, %d, 'test-user', 'test-user' + ) + """, + item.id(), workspaceId, item.experimentId(), item.datasetItemId(), item.traceId(), + olderTimestamp, olderTimestamp); + + statement.execute(sql); + } catch (Exception exception) { + throw new RuntimeException("Failed to insert duplicate experiment item", exception); + } + } + } } From 96db25179d5fd7bf06ace301b01cc7f2534fece8 Mon Sep 17 00:00:00 2001 From: Nimrod Lahav Date: Sat, 11 Oct 2025 20:15:58 +0300 Subject: [PATCH 3/6] Revision 2: Fix root cause by removing project_id from spans aggregation - Changed spans aggregation GROUP BY from (workspace_id, project_id, trace_id) to (workspace_id, trace_id) - This prevents duplicate rows when traces exist in multiple projects (cross-project traces) - Removed previous workaround (LIMIT 1 BY id on experiment_items_final) - Updated integration test to simulate cross-project trace scenario instead of duplicate experiment items - Test now creates spans in different projects for the same trace to verify fix --- .../com/comet/opik/domain/DatasetItemDAO.java | 3 +- .../v1/priv/DatasetsResourceTest.java | 62 +++++++------------ 2 files changed, 22 insertions(+), 43 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java index 50827cf020d..750a84683a6 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java @@ -606,7 +606,6 @@ ORDER BY (workspace_id, project_id, id) DESC, last_updated_at DESC ) ORDER BY id DESC, last_updated_at DESC - LIMIT 1 BY id ) SELECT ei.dataset_item_id AS id, @@ -689,7 +688,7 @@ LEFT JOIN ( FROM spans final WHERE workspace_id = :workspace_id AND trace_id IN (SELECT trace_id FROM experiment_items_scope) - GROUP BY workspace_id, project_id, trace_id + GROUP BY workspace_id, trace_id ) s ON t.id = s.trace_id GROUP BY t.id, diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java index 73c66a0f129..a729d738d19 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java @@ -6597,13 +6597,13 @@ void getExperimentItemsStats__withFeedbackScoresIsNotEmptyFilter() { } @Nested - @DisplayName("OPIK-2469: Duplicate Experiment Items Test") + @DisplayName("OPIK-2469: Cross-Project Traces Duplicate Test") @TestInstance(TestInstance.Lifecycle.PER_CLASS) - class DuplicateExperimentItemsTest { + class CrossProjectTracesDuplicateTest { @Test - @DisplayName("Should return unique experiment items when duplicates exist in ClickHouse") - void findDatasetItemsWithExperimentItems__whenDuplicatesExistInClickHouse__thenReturnUniqueItems() { + @DisplayName("Should return unique experiment items when traces exist in multiple projects") + void findDatasetItemsWithExperimentItems__whenTraceExistsInMultipleProjects__thenReturnUniqueItems() { var workspaceName = UUID.randomUUID().toString(); var apiKey = UUID.randomUUID().toString(); @@ -6623,9 +6623,14 @@ void findDatasetItemsWithExperimentItems__whenDuplicatesExistInClickHouse__thenR .build(); putAndAssert(datasetItemBatch, workspaceName, apiKey); - // Create traces - var trace1 = factory.manufacturePojo(Trace.class); - var trace2 = factory.manufacturePojo(Trace.class); + // Create two traces in PROJECT A + var projectA = UUID.randomUUID().toString(); + var trace1 = factory.manufacturePojo(Trace.class).toBuilder() + .projectName(projectA) + .build(); + var trace2 = factory.manufacturePojo(Trace.class).toBuilder() + .projectName(projectA) + .build(); createAndAssert(trace1, workspaceName, apiKey); createAndAssert(trace2, workspaceName, apiKey); @@ -6652,9 +6657,14 @@ void findDatasetItemsWithExperimentItems__whenDuplicatesExistInClickHouse__thenR .build(); createAndAssert(experimentItemsBatch, apiKey, workspaceName); - // Manually insert a duplicate of experimentItem1 directly into ClickHouse - // This simulates the bug condition where multiple versions exist - insertDuplicateExperimentItem(workspaceId, experimentItem1); + // ROOT CAUSE: Create spans for trace1 in a DIFFERENT PROJECT (PROJECT B) + // This simulates cross-project traces which cause duplicates in the spans aggregation + var projectB = UUID.randomUUID().toString(); + var span1InProjectB = factory.manufacturePojo(Span.class).toBuilder() + .projectName(projectB) + .traceId(trace1.id()) + .build(); + createSpan(span1InProjectB, apiKey, workspaceName); // Query the endpoint var result = datasetResourceClient.getDatasetItemsWithExperimentItems( @@ -6671,6 +6681,7 @@ void findDatasetItemsWithExperimentItems__whenDuplicatesExistInClickHouse__thenR assertThat(datasetItemResult.id()).isEqualTo(datasetItem.id()); // CRITICAL ASSERTION: Should have exactly 2 unique experiment items (no duplicates) + // Even though trace1 exists in TWO projects (A and B), experimentItem1 should appear ONCE var experimentItems = datasetItemResult.experimentItems(); assertThat(experimentItems).isNotNull(); @@ -6699,36 +6710,5 @@ void findDatasetItemsWithExperimentItems__whenDuplicatesExistInClickHouse__thenR .isEqualTo(1); }); } - - /** - * Helper method to insert a duplicate experiment item directly into ClickHouse. - * This simulates the bug condition where multiple versions of the same item exist. - */ - private void insertDuplicateExperimentItem(String workspaceId, ExperimentItem item) { - try (var connection = CLICKHOUSE.createConnection("?database=" + DATABASE_NAME)) { - var statement = connection.createStatement(); - - // Insert a duplicate row with an older timestamp - // Format as Unix timestamp (seconds since epoch) which ClickHouse accepts - var olderTimestamp = Instant.now().minus(1, ChronoUnit.HOURS).getEpochSecond(); - - String sql = String.format( - """ - INSERT INTO experiment_items ( - id, workspace_id, experiment_id, dataset_item_id, trace_id, - created_at, last_updated_at, created_by, last_updated_by - ) VALUES ( - '%s', '%s', '%s', '%s', '%s', - %d, %d, 'test-user', 'test-user' - ) - """, - item.id(), workspaceId, item.experimentId(), item.datasetItemId(), item.traceId(), - olderTimestamp, olderTimestamp); - - statement.execute(sql); - } catch (Exception exception) { - throw new RuntimeException("Failed to insert duplicate experiment item", exception); - } - } } } From b69f0697137f5ad472eac58f9eceb692c79092df Mon Sep 17 00:00:00 2001 From: Nimrod Lahav Date: Sat, 11 Oct 2025 20:22:40 +0300 Subject: [PATCH 4/6] Revision 3: Remove unhelpful integration test Test doesn't reliably reproduce cross-project trace scenario. Production verification via direct SQL queries is more reliable. --- .../v1/priv/DatasetsResourceTest.java | 115 ------------------ 1 file changed, 115 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java index a729d738d19..242ed9c82bd 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java @@ -6596,119 +6596,4 @@ void getExperimentItemsStats__withFeedbackScoresIsNotEmptyFilter() { } } - @Nested - @DisplayName("OPIK-2469: Cross-Project Traces Duplicate Test") - @TestInstance(TestInstance.Lifecycle.PER_CLASS) - class CrossProjectTracesDuplicateTest { - - @Test - @DisplayName("Should return unique experiment items when traces exist in multiple projects") - void findDatasetItemsWithExperimentItems__whenTraceExistsInMultipleProjects__thenReturnUniqueItems() { - - var workspaceName = UUID.randomUUID().toString(); - var apiKey = UUID.randomUUID().toString(); - var workspaceId = UUID.randomUUID().toString(); - - mockTargetWorkspace(apiKey, workspaceName, workspaceId); - - // Create dataset - var dataset = factory.manufacturePojo(Dataset.class); - var datasetId = createAndAssert(dataset, apiKey, workspaceName); - - // Create dataset item - var datasetItem = factory.manufacturePojo(DatasetItem.class); - var datasetItemBatch = DatasetItemBatch.builder() - .datasetId(datasetId) - .items(List.of(datasetItem)) - .build(); - putAndAssert(datasetItemBatch, workspaceName, apiKey); - - // Create two traces in PROJECT A - var projectA = UUID.randomUUID().toString(); - var trace1 = factory.manufacturePojo(Trace.class).toBuilder() - .projectName(projectA) - .build(); - var trace2 = factory.manufacturePojo(Trace.class).toBuilder() - .projectName(projectA) - .build(); - createAndAssert(trace1, workspaceName, apiKey); - createAndAssert(trace2, workspaceName, apiKey); - - // Create experiment and experiment items - var experimentId = GENERATOR.generate(); - var experimentItem1 = factory.manufacturePojo(ExperimentItem.class).toBuilder() - .experimentId(experimentId) - .datasetItemId(datasetItem.id()) - .traceId(trace1.id()) - .input(trace1.input()) - .output(trace1.output()) - .build(); - - var experimentItem2 = factory.manufacturePojo(ExperimentItem.class).toBuilder() - .experimentId(experimentId) - .datasetItemId(datasetItem.id()) - .traceId(trace2.id()) - .input(trace2.input()) - .output(trace2.output()) - .build(); - - var experimentItemsBatch = ExperimentItemsBatch.builder() - .experimentItems(Set.of(experimentItem1, experimentItem2)) - .build(); - createAndAssert(experimentItemsBatch, apiKey, workspaceName); - - // ROOT CAUSE: Create spans for trace1 in a DIFFERENT PROJECT (PROJECT B) - // This simulates cross-project traces which cause duplicates in the spans aggregation - var projectB = UUID.randomUUID().toString(); - var span1InProjectB = factory.manufacturePojo(Span.class).toBuilder() - .projectName(projectB) - .traceId(trace1.id()) - .build(); - createSpan(span1InProjectB, apiKey, workspaceName); - - // Query the endpoint - var result = datasetResourceClient.getDatasetItemsWithExperimentItems( - datasetId, - List.of(experimentId), - apiKey, - workspaceName); - - // Assert results - assertThat(result).isNotNull(); - assertThat(result.content()).hasSize(1); - - var datasetItemResult = result.content().get(0); - assertThat(datasetItemResult.id()).isEqualTo(datasetItem.id()); - - // CRITICAL ASSERTION: Should have exactly 2 unique experiment items (no duplicates) - // Even though trace1 exists in TWO projects (A and B), experimentItem1 should appear ONCE - var experimentItems = datasetItemResult.experimentItems(); - assertThat(experimentItems).isNotNull(); - - // Count experiment items by their ID to detect duplicates - var experimentItemIds = experimentItems.stream() - .map(ExperimentItem::id) - .collect(Collectors.toList()); - - var uniqueIds = new HashSet<>(experimentItemIds); - - // THIS IS THE KEY ASSERTION - Verifies fix for OPIK-2469 - assertThat(experimentItemIds) - .as("Should not contain duplicate experiment item IDs") - .hasSameSizeAs(uniqueIds) - .as("Should have exactly 2 unique experiment items") - .hasSize(2); - - // Verify the correct experiment items are present - assertThat(uniqueIds).containsExactlyInAnyOrder(experimentItem1.id(), experimentItem2.id()); - - // Verify each experiment item appears only once - experimentItemIds.forEach(id -> { - long count = experimentItemIds.stream().filter(i -> i.equals(id)).count(); - assertThat(count) - .as("Experiment item '%s' should appear exactly once, but appears '%d' times", id, count) - .isEqualTo(1); - }); - } - } } From f83875a4239aa0434b5a3c5c809ffa4a2739375e Mon Sep 17 00:00:00 2001 From: Nimrod Lahav Date: Sat, 11 Oct 2025 20:36:00 +0300 Subject: [PATCH 5/6] Revision 4: Add comprehensive cross-project trace test Test simulates production scenario where traces have spans across multiple projects. Successfully demonstrates bug: with GROUP BY project_id, returns 3 items (1 duplicate). Test validates fix by inserting cross-project spans directly into ClickHouse. --- .../v1/priv/DatasetsResourceTest.java | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java index 242ed9c82bd..69f15d9e89e 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java @@ -6596,4 +6596,200 @@ void getExperimentItemsStats__withFeedbackScoresIsNotEmptyFilter() { } } + @Nested + @DisplayName("OPIK-2469: Cross-Project Traces Duplicate Test") + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class CrossProjectTracesDuplicateTest { + + @Test + @DisplayName("Should return unique experiment items when trace has spans in multiple projects") + void findDatasetItemsWithExperimentItems__whenTraceHasSpansInMultipleProjects__thenReturnUniqueItems() { + + var workspaceName = UUID.randomUUID().toString(); + var apiKey = UUID.randomUUID().toString(); + var workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + // Create dataset + var dataset = factory.manufacturePojo(Dataset.class); + var datasetId = createAndAssert(dataset, apiKey, workspaceName); + + // Create dataset item + var datasetItem = factory.manufacturePojo(DatasetItem.class); + var datasetItemBatch = DatasetItemBatch.builder() + .datasetId(datasetId) + .items(List.of(datasetItem)) + .build(); + putAndAssert(datasetItemBatch, workspaceName, apiKey); + + // Create Project A + var projectA = UUID.randomUUID().toString(); + + // Create trace in Project A with spans + var trace1 = factory.manufacturePojo(Trace.class).toBuilder() + .projectName(projectA) + .build(); + createAndAssert(trace1, workspaceName, apiKey); + + // Create span in Project A for trace1 + var span1InProjectA = factory.manufacturePojo(Span.class).toBuilder() + .projectName(projectA) + .traceId(trace1.id()) + .build(); + createSpan(span1InProjectA, apiKey, workspaceName); + + // ROOT CAUSE SIMULATION: Insert spans directly into ClickHouse for the SAME trace in Project B + // This creates a cross-project trace scenario + insertSpansForTraceInDifferentProject(workspaceId, trace1.id(), workspaceName, apiKey); + + // Wait for ClickHouse to process the manually inserted span + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Create another trace in Project A (no cross-project issue) + var trace2 = factory.manufacturePojo(Trace.class).toBuilder() + .projectName(projectA) + .build(); + createAndAssert(trace2, workspaceName, apiKey); + + // Create experiment items for both traces + var experimentId = GENERATOR.generate(); + var experimentItem1 = factory.manufacturePojo(ExperimentItem.class).toBuilder() + .experimentId(experimentId) + .datasetItemId(datasetItem.id()) + .traceId(trace1.id()) + .input(trace1.input()) + .output(trace1.output()) + .build(); + + var experimentItem2 = factory.manufacturePojo(ExperimentItem.class).toBuilder() + .experimentId(experimentId) + .datasetItemId(datasetItem.id()) + .traceId(trace2.id()) + .input(trace2.input()) + .output(trace2.output()) + .build(); + + var experimentItemsBatch = ExperimentItemsBatch.builder() + .experimentItems(Set.of(experimentItem1, experimentItem2)) + .build(); + createAndAssert(experimentItemsBatch, apiKey, workspaceName); + + // Query the endpoint + var result = datasetResourceClient.getDatasetItemsWithExperimentItems( + datasetId, + List.of(experimentId), + apiKey, + workspaceName); + + // Assert results + assertThat(result).isNotNull(); + assertThat(result.content()).hasSize(1); + + var datasetItemResult = result.content().get(0); + assertThat(datasetItemResult.id()).isEqualTo(datasetItem.id()); + + // CRITICAL ASSERTION: Should have exactly 2 unique experiment items (no duplicates) + // Without the fix, trace1 appears twice because it has spans in 2 projects + var experimentItems = datasetItemResult.experimentItems(); + assertThat(experimentItems).isNotNull(); + + // Count experiment items by their ID to detect duplicates + var experimentItemIds = experimentItems.stream() + .map(ExperimentItem::id) + .collect(Collectors.toList()); + + var uniqueIds = new HashSet<>(experimentItemIds); + + // THIS IS THE KEY ASSERTION - Verifies fix for OPIK-2469 + assertThat(experimentItemIds) + .as("Should not contain duplicate experiment item IDs - trace1 has spans in 2 projects but should appear once") + .hasSameSizeAs(uniqueIds) + .as("Should have exactly 2 unique experiment items") + .hasSize(2); + + // Verify the correct experiment items are present + assertThat(uniqueIds).containsExactlyInAnyOrder(experimentItem1.id(), experimentItem2.id()); + + // Verify each experiment item appears only once + experimentItemIds.forEach(id -> { + long count = experimentItemIds.stream().filter(i -> i.equals(id)).count(); + assertThat(count) + .as("Experiment item '%s' should appear exactly once, but appears '%d' times", id, count) + .isEqualTo(1); + }); + } + + /** + * Simulates the production scenario where a trace has spans in multiple projects. + * This is the root cause of OPIK-2469: when GROUP BY includes project_id, + * the query returns multiple rows for the same trace_id, causing duplicates. + */ + private void insertSpansForTraceInDifferentProject(String workspaceId, UUID traceId, + String workspaceName, String apiKey) { + try { + // Create Project B through the API (this ensures all related tables are properly populated) + var projectBName = UUID.randomUUID().toString(); + var dummyTrace = factory.manufacturePojo(Trace.class).toBuilder() + .projectName(projectBName) + .build(); + createAndAssert(dummyTrace, workspaceName, apiKey); + + // Now insert spans directly into ClickHouse for the original trace but in Project B + // This creates the cross-project trace scenario + try (var connection = CLICKHOUSE.createConnection("?database=" + DATABASE_NAME)) { + var statement = connection.createStatement(); + + // Get the project ID for Project B by using the dummy trace we just created + String getProjectIdSql = String.format( + "SELECT project_id FROM traces WHERE workspace_id = '%s' AND id = '%s' LIMIT 1", + workspaceId, dummyTrace.id()); + + var resultSet = statement.executeQuery(getProjectIdSql); + String projectBId = null; + if (resultSet.next()) { + projectBId = resultSet.getString(1); + } + resultSet.close(); + + if (projectBId == null) { + throw new RuntimeException("Could not find Project B ID"); + } + + // Insert spans into ClickHouse for the SAME trace (the original trace) but in Project B + // This creates the cross-project trace scenario + var spanId = GENERATOR.generate(); + var now = Instant.now().getEpochSecond(); + + String insertSpanSql = String.format( + """ + INSERT INTO spans ( + id, workspace_id, project_id, trace_id, parent_span_id, + type, name, start_time, end_time, + input, output, metadata, tags, + usage, total_estimated_cost, + created_at, last_updated_at, created_by, last_updated_by + ) VALUES ( + '%s', '%s', '%s', '%s', '%s', + 'general', 'test-span-project-b', toDateTime64(%d, 9), toDateTime64(%d, 9), + map('key', 'value'), map('result', 'success'), map(), [], + map('tokens', 100), 0.05, + %d, %d, 'test-user', 'test-user' + ) + """, + spanId, workspaceId, projectBId, traceId, GENERATOR.generate(), + now, now + 1, + now, now); + + statement.execute(insertSpanSql); + } + } catch (Exception exception) { + throw new RuntimeException("Failed to insert cross-project spans", exception); + } + } + } } From 55ab470d84c151c5d1cf42003d0928da02cde22e Mon Sep 17 00:00:00 2001 From: Nimrod Lahav Date: Wed, 15 Oct 2025 10:20:12 +0300 Subject: [PATCH 6/6] Revision 2: Rewrite test to use API calls instead of direct ClickHouse access --- .../com/comet/opik/domain/DatasetItemDAO.java | 16 ++-- .../v1/priv/DatasetsResourceTest.java | 90 +++---------------- 2 files changed, 18 insertions(+), 88 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java index 750a84683a6..f5e7e07e717 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java @@ -681,14 +681,14 @@ ORDER BY (workspace_id, project_id, id) DESC, last_updated_at DESC LEFT JOIN feedback_scores_final AS fs ON t.id = fs.entity_id LEFT JOIN comments_final AS c ON t.id = c.entity_id LEFT JOIN ( - SELECT - trace_id, - SUM(total_estimated_cost) AS total_estimated_cost, - sumMap(usage) AS usage - FROM spans final - WHERE workspace_id = :workspace_id - AND trace_id IN (SELECT trace_id FROM experiment_items_scope) - GROUP BY workspace_id, trace_id + SELECT + trace_id, + SUM(total_estimated_cost) AS total_estimated_cost, + sumMap(usage) AS usage + FROM spans final + WHERE workspace_id = :workspace_id + AND trace_id IN (SELECT trace_id FROM experiment_items_scope) + GROUP BY workspace_id, trace_id ) s ON t.id = s.trace_id GROUP BY t.id, diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java index 69f15d9e89e..1fe5bdf376c 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceTest.java @@ -6623,10 +6623,11 @@ void findDatasetItemsWithExperimentItems__whenTraceHasSpansInMultipleProjects__t .build(); putAndAssert(datasetItemBatch, workspaceName, apiKey); - // Create Project A + // Create Project A and Project B var projectA = UUID.randomUUID().toString(); + var projectB = UUID.randomUUID().toString(); - // Create trace in Project A with spans + // Create trace in Project A var trace1 = factory.manufacturePojo(Trace.class).toBuilder() .projectName(projectA) .build(); @@ -6639,16 +6640,13 @@ void findDatasetItemsWithExperimentItems__whenTraceHasSpansInMultipleProjects__t .build(); createSpan(span1InProjectA, apiKey, workspaceName); - // ROOT CAUSE SIMULATION: Insert spans directly into ClickHouse for the SAME trace in Project B - // This creates a cross-project trace scenario - insertSpansForTraceInDifferentProject(workspaceId, trace1.id(), workspaceName, apiKey); - - // Wait for ClickHouse to process the manually inserted span - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + // ROOT CAUSE SIMULATION: Create span in Project B for the SAME trace (trace1) + // This creates a cross-project trace scenario through the API + var span1InProjectB = factory.manufacturePojo(Span.class).toBuilder() + .projectName(projectB) + .traceId(trace1.id()) + .build(); + createSpan(span1InProjectB, apiKey, workspaceName); // Create another trace in Project A (no cross-project issue) var trace2 = factory.manufacturePojo(Trace.class).toBuilder() @@ -6723,73 +6721,5 @@ void findDatasetItemsWithExperimentItems__whenTraceHasSpansInMultipleProjects__t .isEqualTo(1); }); } - - /** - * Simulates the production scenario where a trace has spans in multiple projects. - * This is the root cause of OPIK-2469: when GROUP BY includes project_id, - * the query returns multiple rows for the same trace_id, causing duplicates. - */ - private void insertSpansForTraceInDifferentProject(String workspaceId, UUID traceId, - String workspaceName, String apiKey) { - try { - // Create Project B through the API (this ensures all related tables are properly populated) - var projectBName = UUID.randomUUID().toString(); - var dummyTrace = factory.manufacturePojo(Trace.class).toBuilder() - .projectName(projectBName) - .build(); - createAndAssert(dummyTrace, workspaceName, apiKey); - - // Now insert spans directly into ClickHouse for the original trace but in Project B - // This creates the cross-project trace scenario - try (var connection = CLICKHOUSE.createConnection("?database=" + DATABASE_NAME)) { - var statement = connection.createStatement(); - - // Get the project ID for Project B by using the dummy trace we just created - String getProjectIdSql = String.format( - "SELECT project_id FROM traces WHERE workspace_id = '%s' AND id = '%s' LIMIT 1", - workspaceId, dummyTrace.id()); - - var resultSet = statement.executeQuery(getProjectIdSql); - String projectBId = null; - if (resultSet.next()) { - projectBId = resultSet.getString(1); - } - resultSet.close(); - - if (projectBId == null) { - throw new RuntimeException("Could not find Project B ID"); - } - - // Insert spans into ClickHouse for the SAME trace (the original trace) but in Project B - // This creates the cross-project trace scenario - var spanId = GENERATOR.generate(); - var now = Instant.now().getEpochSecond(); - - String insertSpanSql = String.format( - """ - INSERT INTO spans ( - id, workspace_id, project_id, trace_id, parent_span_id, - type, name, start_time, end_time, - input, output, metadata, tags, - usage, total_estimated_cost, - created_at, last_updated_at, created_by, last_updated_by - ) VALUES ( - '%s', '%s', '%s', '%s', '%s', - 'general', 'test-span-project-b', toDateTime64(%d, 9), toDateTime64(%d, 9), - map('key', 'value'), map('result', 'success'), map(), [], - map('tokens', 100), 0.05, - %d, %d, 'test-user', 'test-user' - ) - """, - spanId, workspaceId, projectBId, traceId, GENERATOR.generate(), - now, now + 1, - now, now); - - statement.execute(insertSpanSql); - } - } catch (Exception exception) { - throw new RuntimeException("Failed to insert cross-project spans", exception); - } - } } }