@@ -232,21 +232,17 @@ void snapshotState(
232232 try {
233233 if (timeServiceManager .isPresent ()) {
234234 boolean requiresLegacyRawKeyedStateSnapshots ;
235- final InternalTimeServiceManager <?> manager ;
236235 if (useAsyncState ) {
237236 checkState (
238237 asyncKeyedStateBackend != null ,
239- "keyedStateBackend should be available with timeServiceManager" );
240- manager = timeServiceManager .get ();
238+ "asyncKeyedStateBackend should be available with timeServiceManager" );
241239 requiresLegacyRawKeyedStateSnapshots =
242240 asyncKeyedStateBackend .requiresLegacySynchronousTimerSnapshots (
243241 checkpointOptions .getCheckpointType ());
244242 } else {
245243 checkState (
246244 keyedStateBackend != null ,
247245 "keyedStateBackend should be available with timeServiceManager" );
248- manager = timeServiceManager .get ();
249-
250246 requiresLegacyRawKeyedStateSnapshots =
251247 keyedStateBackend instanceof AbstractKeyedStateBackend
252248 && ((AbstractKeyedStateBackend <?>) keyedStateBackend )
@@ -257,6 +253,7 @@ void snapshotState(
257253 checkState (
258254 !isUsingCustomRawKeyedState ,
259255 "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write." );
256+ final InternalTimeServiceManager <?> manager = timeServiceManager .get ();
260257 manager .snapshotToRawKeyedState (
261258 snapshotContext .getRawKeyedOperatorStateOutput (), operatorName );
262259 }
0 commit comments