Skip to content

Commit baf008f

Browse files
committed
KSQL-13911 | Auto-migrate exactly_once to exactly_once_v2
1 parent 73e25a3 commit baf008f

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
3131
import io.confluent.ksql.statement.ConfiguredStatement;
3232
import java.util.Collections;
33+
import java.util.HashMap;
3334
import java.util.Map;
3435
import java.util.Objects;
3536
import java.util.Optional;
@@ -112,6 +113,10 @@ public String getStatement() {
112113
return statement;
113114
}
114115

116+
/**
117+
* Returns the overwrite properties for JSON serialization to command topic.
118+
* This preserves the original values as they were stored.
119+
*/
115120
@JsonProperty("streamsProperties")
116121
@SuppressFBWarnings(
117122
value = "EI_EXPOSE_REP",
@@ -121,6 +126,28 @@ public Map<String, Object> getOverwriteProperties() {
121126
return PropertiesUtil.coerceTypes(overwriteProperties, true);
122127
}
123128

129+
/**
130+
* Returns the overwrite properties for runtime use, with legacy values migrated.
131+
* This is what should be used when executing queries.
132+
*/
133+
public Map<String, Object> getOverwritePropertiesForExecution() {
134+
// Create mutable copy and migrate BEFORE coerceTypes validation
135+
final Map<String, Object> mutableProps = new HashMap<>(overwriteProperties);
136+
migrateLegacyProcessingGuarantee(mutableProps);
137+
// Now validate and coerce the migrated values
138+
return PropertiesUtil.coerceTypes(mutableProps, true);
139+
}
140+
141+
private static void migrateLegacyProcessingGuarantee(final Map<String, Object> properties) {
142+
final Object guarantee = properties.get("processing.guarantee");
143+
if (guarantee != null) {
144+
final String guaranteeStr = guarantee.toString();
145+
if ("exactly_once".equals(guaranteeStr)) {
146+
properties.put("processing.guarantee", "exactly_once_v2");
147+
}
148+
}
149+
}
150+
124151
@SuppressFBWarnings(
125152
value = "EI_EXPOSE_REP",
126153
justification = "originalProperties is unmodifiableMap()"

ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private void executePlan(
232232
final KsqlConfig mergedConfig = buildMergedConfig(command);
233233
final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of(
234234
plan,
235-
SessionConfig.of(mergedConfig, command.getOverwriteProperties())
235+
SessionConfig.of(mergedConfig, command.getOverwritePropertiesForExecution())
236236
);
237237
putStatus(
238238
commandId,
@@ -321,7 +321,7 @@ private void executeStatement(
321321

322322
private KsqlConfig buildMergedConfig(final Command command) {
323323
return ksqlEngine.getKsqlConfig()
324-
.overrideBreakingConfigsWithOriginalValues(command.getOriginalProperties());
324+
.overrideBreakingConfigsWithOriginalValues(command.getOverwritePropertiesForExecution());
325325
}
326326

327327
private void pauseQuery(final PreparedStatement<PauseQuery> pauseQuery) {

ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,47 @@ public void shouldCoerceProperties() {
135135
Matchers.equalTo((short) 3)
136136
);
137137
}
138+
139+
@Test
140+
public void shouldNotMigrateModernProcessingGuaranteeValues() {
141+
// Given: Command with modern processing guarantee values
142+
final Command command1 = new Command(
143+
"CREATE STREAM test AS SELECT * FROM source;",
144+
ImmutableMap.of("processing.guarantee", "exactly_once_v2"),
145+
Collections.emptyMap(),
146+
Optional.empty()
147+
);
148+
final Command command2 = new Command(
149+
"CREATE STREAM test AS SELECT * FROM source;",
150+
ImmutableMap.of("processing.guarantee", "at_least_once"),
151+
Collections.emptyMap(),
152+
Optional.empty()
153+
);
154+
155+
// When: Getting overwrite properties
156+
final Map<String, Object> properties1 = command1.getOverwriteProperties();
157+
final Map<String, Object> properties2 = command2.getOverwriteProperties();
158+
159+
// Then: Should remain unchanged
160+
assertThat(properties1.get("processing.guarantee"), equalTo("exactly_once_v2"));
161+
assertThat(properties2.get("processing.guarantee"), equalTo("at_least_once"));
162+
}
163+
164+
@Test
165+
public void shouldHandleCommandWithNoProcessingGuarantee() {
166+
// Given: Command without processing.guarantee property
167+
final Command command = new Command(
168+
"CREATE STREAM test AS SELECT * FROM source;",
169+
ImmutableMap.of("num.stream.threads", 4),
170+
Collections.emptyMap(),
171+
Optional.empty()
172+
);
173+
174+
// When: Getting overwrite properties
175+
final Map<String, Object> properties = command.getOverwriteProperties();
176+
177+
// Then: Should not add processing.guarantee
178+
assertThat(properties.containsKey("processing.guarantee"), is(false));
179+
assertThat(properties.get("num.stream.threads"), equalTo(4));
180+
}
138181
}

0 commit comments

Comments
 (0)