Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ LEFT JOIN (
FROM spans final
WHERE workspace_id = :workspace_id
AND trace_id IN (SELECT trace_id FROM experiment_items_scope)
GROUP BY workspace_id, project_id, trace_id
GROUP BY workspace_id, trace_id
) s ON t.id = s.trace_id
GROUP BY
t.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6595,4 +6595,201 @@ void getExperimentItemsStats__withFeedbackScoresIsNotEmptyFilter() {
TraceAssertions.assertStats(stats.stats(), expectedStats);
}
}

@Nested
@DisplayName("OPIK-2469: Cross-Project Traces Duplicate Test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CrossProjectTracesDuplicateTest {

@Test
@DisplayName("Should return unique experiment items when trace has spans in multiple projects")
void findDatasetItemsWithExperimentItems__whenTraceHasSpansInMultipleProjects__thenReturnUniqueItems() {

var workspaceName = UUID.randomUUID().toString();
var apiKey = UUID.randomUUID().toString();
var workspaceId = UUID.randomUUID().toString();

mockTargetWorkspace(apiKey, workspaceName, workspaceId);

// Create dataset
var dataset = factory.manufacturePojo(Dataset.class);
var datasetId = createAndAssert(dataset, apiKey, workspaceName);

// Create dataset item
var datasetItem = factory.manufacturePojo(DatasetItem.class);
var datasetItemBatch = DatasetItemBatch.builder()
.datasetId(datasetId)
.items(List.of(datasetItem))
.build();
putAndAssert(datasetItemBatch, workspaceName, apiKey);

// Create Project A
var projectA = UUID.randomUUID().toString();

// Create trace in Project A with spans
var trace1 = factory.manufacturePojo(Trace.class).toBuilder()
.projectName(projectA)
.build();
createAndAssert(trace1, workspaceName, apiKey);

// Create span in Project A for trace1
var span1InProjectA = factory.manufacturePojo(Span.class).toBuilder()
.projectName(projectA)
.traceId(trace1.id())
.build();
createSpan(span1InProjectA, apiKey, workspaceName);

// ROOT CAUSE SIMULATION: Insert spans directly into ClickHouse for the SAME trace in Project B
// This creates a cross-project trace scenario
insertSpansForTraceInDifferentProject(workspaceId, trace1.id(), workspaceName, apiKey);

// Wait for ClickHouse to process the manually inserted span
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

// Create another trace in Project A (no cross-project issue)
var trace2 = factory.manufacturePojo(Trace.class).toBuilder()
.projectName(projectA)
.build();
createAndAssert(trace2, workspaceName, apiKey);

// Create experiment items for both traces
var experimentId = GENERATOR.generate();
var experimentItem1 = factory.manufacturePojo(ExperimentItem.class).toBuilder()
.experimentId(experimentId)
.datasetItemId(datasetItem.id())
.traceId(trace1.id())
.input(trace1.input())
.output(trace1.output())
.build();

var experimentItem2 = factory.manufacturePojo(ExperimentItem.class).toBuilder()
.experimentId(experimentId)
.datasetItemId(datasetItem.id())
.traceId(trace2.id())
.input(trace2.input())
.output(trace2.output())
.build();

var experimentItemsBatch = ExperimentItemsBatch.builder()
.experimentItems(Set.of(experimentItem1, experimentItem2))
.build();
createAndAssert(experimentItemsBatch, apiKey, workspaceName);

// Query the endpoint
var result = datasetResourceClient.getDatasetItemsWithExperimentItems(
datasetId,
List.of(experimentId),
apiKey,
workspaceName);

// Assert results
assertThat(result).isNotNull();
assertThat(result.content()).hasSize(1);

var datasetItemResult = result.content().get(0);
assertThat(datasetItemResult.id()).isEqualTo(datasetItem.id());

// CRITICAL ASSERTION: Should have exactly 2 unique experiment items (no duplicates)
// Without the fix, trace1 appears twice because it has spans in 2 projects
var experimentItems = datasetItemResult.experimentItems();
assertThat(experimentItems).isNotNull();

// Count experiment items by their ID to detect duplicates
var experimentItemIds = experimentItems.stream()
.map(ExperimentItem::id)
.collect(Collectors.toList());

var uniqueIds = new HashSet<>(experimentItemIds);

// THIS IS THE KEY ASSERTION - Verifies fix for OPIK-2469
assertThat(experimentItemIds)
.as("Should not contain duplicate experiment item IDs - trace1 has spans in 2 projects but should appear once")
.hasSameSizeAs(uniqueIds)
.as("Should have exactly 2 unique experiment items")
.hasSize(2);

// Verify the correct experiment items are present
assertThat(uniqueIds).containsExactlyInAnyOrder(experimentItem1.id(), experimentItem2.id());

// Verify each experiment item appears only once
experimentItemIds.forEach(id -> {
long count = experimentItemIds.stream().filter(i -> i.equals(id)).count();
assertThat(count)
.as("Experiment item '%s' should appear exactly once, but appears '%d' times", id, count)
.isEqualTo(1);
});
}

/**
* Simulates the production scenario where a trace has spans in multiple projects.
* This is the root cause of OPIK-2469: when GROUP BY includes project_id,
* the query returns multiple rows for the same trace_id, causing duplicates.
*/
private void insertSpansForTraceInDifferentProject(String workspaceId, UUID traceId,
String workspaceName, String apiKey) {
try {
// Create Project B through the API (this ensures all related tables are properly populated)
var projectBName = UUID.randomUUID().toString();
var dummyTrace = factory.manufacturePojo(Trace.class).toBuilder()
.projectName(projectBName)
.build();
createAndAssert(dummyTrace, workspaceName, apiKey);

// Now insert spans directly into ClickHouse for the original trace but in Project B
// This creates the cross-project trace scenario
try (var connection = CLICKHOUSE.createConnection("?database=" + DATABASE_NAME)) {
var statement = connection.createStatement();

// Get the project ID for Project B by using the dummy trace we just created
String getProjectIdSql = String.format(
"SELECT project_id FROM traces WHERE workspace_id = '%s' AND id = '%s' LIMIT 1",
workspaceId, dummyTrace.id());

var resultSet = statement.executeQuery(getProjectIdSql);
String projectBId = null;
if (resultSet.next()) {
projectBId = resultSet.getString(1);
}
resultSet.close();

if (projectBId == null) {
throw new RuntimeException("Could not find Project B ID");
}

// Insert spans into ClickHouse for the SAME trace (the original trace) but in Project B
// This creates the cross-project trace scenario
var spanId = GENERATOR.generate();
var now = Instant.now().getEpochSecond();

String insertSpanSql = String.format(
"""
INSERT INTO spans (
id, workspace_id, project_id, trace_id, parent_span_id,
type, name, start_time, end_time,
input, output, metadata, tags,
usage, total_estimated_cost,
created_at, last_updated_at, created_by, last_updated_by
) VALUES (
'%s', '%s', '%s', '%s', '%s',
'general', 'test-span-project-b', toDateTime64(%d, 9), toDateTime64(%d, 9),
map('key', 'value'), map('result', 'success'), map(), [],
map('tokens', 100), 0.05,
%d, %d, 'test-user', 'test-user'
)
""",
spanId, workspaceId, projectBId, traceId, GENERATOR.generate(),
now, now + 1,
now, now);

statement.execute(insertSpanSql);
}
} catch (Exception exception) {
throw new RuntimeException("Failed to insert cross-project spans", exception);
}
}
}
}