Skip to content

Commit 5ce0b2f

Browse files
authored
fix: safely completing doClose() (#818)
Fixes #809
1 parent ab277c9 commit 5ce0b2f

File tree

1 file changed

+35
-13
lines changed

1 file changed

+35
-13
lines changed

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java

+35-13
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,25 @@ private void waitForClose(Duration timeout) throws TimeoutException, ExecutionEx
629629
}
630630

631631
private void doClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException {
632+
// fixes github issue #809 - ensure doClose() state transition to CLOSED
633+
// by catching unhandled exceptions in subsystems during close
634+
try {
635+
innerDoClose(timeout);
636+
} catch (Exception e) {
637+
log.error("exception during close", e);
638+
throw e;
639+
} finally {
640+
deregisterMeters();
641+
pcMetrics.close();
642+
log.debug("Close complete.");
643+
this.state = CLOSED;
644+
if (this.getFailureCause() != null) {
645+
log.error("PC closed due to error: {}", getFailureCause(), null);
646+
}
647+
}
648+
}
649+
650+
private void innerDoClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException {
632651
log.debug("Starting close process (state: {})...", state);
633652

634653
// Drain and pause polling - keeps consumer alive for later commit, but paused
@@ -678,23 +697,26 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
678697
if (Thread.currentThread().isInterrupted()) {
679698
log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition");
680699
}
681-
commitOffsetsThatAreReady();
682-
700+
try {
701+
commitOffsetsThatAreReady();
702+
} catch (Exception e) {
703+
log.warn("failed to commit during close sequence", e);
704+
}
683705
// only close consumer once producer has committed it's offsets (tx'l)
684706
log.debug("Closing and waiting for broker poll system...");
685-
brokerPollSubsystem.closeAndWait();
707+
try {
708+
brokerPollSubsystem.closeAndWait();
709+
} catch (Exception e) {
710+
log.warn("failed to close brokerPollSubsystem during close sequence", e);
711+
}
686712

687-
maybeCloseConsumer();
713+
try {
714+
maybeCloseConsumer();
715+
} catch (Exception e) {
716+
log.warn("failed to maybeCloseConsumer during close sequence", e);
717+
}
688718

689719
producerManager.ifPresent(x -> x.close(timeout));
690-
deregisterMeters();
691-
pcMetrics.close();
692-
log.debug("Close complete.");
693-
this.state = CLOSED;
694-
695-
if (this.getFailureCause() != null) {
696-
log.error("PC closed due to error: {}", getFailureCause(), null);
697-
}
698720
}
699721

700722
/**
@@ -1482,4 +1504,4 @@ private void clearCommitCommand() {
14821504
}
14831505
}
14841506

1485-
}
1507+
}

0 commit comments

Comments
 (0)