Skip to content

[ShareChat] Release sharechat-1.14#4

Merged
mradulrathorej merged 132 commits into
main-sharechatfrom
release-sharechat-1.14
Apr 21, 2026
Merged

[ShareChat] Release sharechat-1.14#4
mradulrathorej merged 132 commits into
main-sharechatfrom
release-sharechat-1.14

Conversation

@mradulrathorej
Copy link
Copy Markdown
Member

@mradulrathorej mradulrathorej commented Mar 25, 2026

Summary

Upgrade ShareChat's fork of Flink Kubernetes Operator to upstream release 1.14.

Changes

ShareChat-specific changes updated for 1.14 compatibility

  • Replaced deprecated TARGET_UTILIZATION_BOUNDARY with UTILIZATION_MIN and UTILIZATION_MAX in uniform parallelism test

Next Steps

  • Trigger TeamCity build on release-sharechat-1.14 to push image to Armory

Testing

Summary by CodeRabbit

  • New Features

    • Introduced Blue/Green deployments with controlled transitions and ingress rotation.
    • Upgraded baseline to Java 17; expanded Flink support (incl. 1.20, 2.0, 2.1).
    • Enhanced autoscaler: utilization min/max, observed scalability, uniform parallelism, reset nonce.
    • Added dynamic namespace watching; improved Helm/Ingress management.
    • New metrics: Blue/Green lifecycle, job exception events, state size, slow Kubernetes requests.
  • Documentation

    • Updated guides/examples to newer Flink versions, Java 17, and Helm tips; added Blue/Green docs.
  • Chores

    • New E2E reusable workflow and tests; composite rsync action; Makefile for image builds.

mateczagany and others added 30 commits October 22, 2024 11:48
…during bundle creation

Signed-off-by: kwall <kwall@apache.org>
…keystore pod volume

Signed-off-by: kwall <kwall@apache.org>
… images.

From Java 16 onwards, the project jigsaw java opts are now mandatory. To
allow Java 17 based job images, we need to include the java opts from the
flink dist base conf.yaml to the per-flink-version env.java.default-opts.all
config in the operator default flink-conf.yaml.

These will be combined with any user supplied opts via env.java.opts.all
and ignored by JDK 8 and 11 as we set -XX:IgnoreUnrecognizedVMOptions.

Signed-off-by: Thomas Cooper <code@tomcooper.dev>
… adopt a more radical strategy when source vertex or upstream shuffle is keyBy (apache#904)
…allow customizating of flink-webhook container envs
The docker container was being run with the uid/gid arguments switched about.
…orical parallelism to reduce the rescale frequency

1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down
2. Never scale down when (currentTime - triggerTime) < scale-down.interval
…e#923)

The observedGeneration field wasn't updated anymore when the spec update results
in a NOOP reconciliation.

This is a regression of a10fb45 /
apache#834
gyfora and others added 16 commits January 5, 2026 14:05
…estartSavepointNonce

Co-authored-by: Daniel Rossos <daniel.rossos@shopify.com>
…c + resolve path conflict on Blue and Green deployment ingresses
…ding README (#2)

Added Makefile and instruction / protocol in the README.
In Tardis, the autoscale struggled with finding the right balance. The
issue is that in the heterogeneous parallelism across all vertices the
decisions of autoscaler can be suboptimal. Vertices are not independent,
and the current parallelism of "parent" vertex influences how much
traffic receives a "child" vertex, hence impacts the decision when we
decide new parallelims of the "child" vertex. Basically - the relative
parallelism of vertices can change after scale event, and in the new -
changed - situation the decision about the optimal scaling can be very
different.

How it looks like in practice is tons of "bouncing": autoscaler scales
down, then quickly realizes it needs to upscale back. And this process
never ends, no matter how hard we try to tune parameters.

<img width="896" alt="image"
src="https://github.com/user-attachments/assets/05d32637-cc96-4d60-8021-49396e297234">

Introduces the concept of "flat parallelism". To reduce the "cognitive
load" on autoscaler and prevent the situation when relative parallelism
changes over time, we can simply maintain the same parallelism across
all vertices. Pretty much like we do now in Tardis.

With this setting, Tardis job autoscales perfectly, maintaining small
lag without "bouncing" back situations.

<img width="895" alt="image"
src="https://github.com/user-attachments/assets/ca2f77e8-7ad4-4009-8fdd-3be5971dd8f6">

<img width="893" alt="image"
src="https://github.com/user-attachments/assets/f9060ee1-0503-47b0-ac89-8babfc472862">
Updating deployment to push to Armory. 

# TODO
Need to update TC pipeline.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 25, 2026

Note

.coderabbit.yaml has unrecognized properties

CodeRabbit is using all valid settings from your configuration. Unrecognized properties (listed below) have been ignored and may indicate typos or deprecated fields that can be removed.

⚠️ Parsing warnings (1)
Validation error: Unrecognized key(s) in object: 'version'
⚙️ Configuration instructions
  • Please see the configuration documentation for more information.
  • You can also validate your configuration using the online YAML validator.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
📝 Walkthrough

Walkthrough

Introduces Blue/Green deployment support, migrates configuration handling to nested YAML via ConfigObjectNode, updates autoscaler algorithms/APIs, refactors JDBC autoscaler to DataSource, expands operator metrics and event handling, modernizes Flink service/controller flows, adds reusable E2E workflows/tests, updates docs/examples to Flink 1.20/Java 17, and adjusts CI/build scripts.

Changes

Cohort / File(s) Summary
Blue/Green feature (API, controllers, utils, metrics, tests, docs)
flink-kubernetes-operator-api/.../FlinkBlueGreenDeployment*, .../status/FlinkBlueGreen*, .../spec/FlinkDeploymentTemplateSpec.java, flink-kubernetes-operator/.../controller/bluegreen/*, .../controller/FlinkBlueGreenDeploymentController.java, .../metrics/FlinkBlueGreenDeploymentMetrics.java, .../metrics/lifecycle/*, .../utils/bluegreen/*, docs/content/.../concepts/bluegreen-controller-flow.md, e2e-tests/data/bluegreen-*.yaml, e2e-tests/test_bluegreen_*.sh
Adds Blue/Green CRD, controller, state handlers, services, ingress reconciliation, metrics (state, lifecycle), utilities, tests, and documentation.
Config model and versioning
flink-kubernetes-operator-api/.../spec/AbstractFlinkSpec.java, .../spec/ConfigObjectNode*.java, .../spec/FlinkVersion.java, .../utils/SpecUtils.java, .../validation/CrdCompatibilityChecker.java, .../test/.../ConfigObjectNodeTest.java, .../FlinkVersionTest.java
Switches flinkConfiguration to ConfigObjectNode with YAML flattening; enhances enum-based versioning with major/minor and v2.x; adds JSON helpers and CRD compatibility rules.
Autoscaler core
flink-autoscaler/src/main/java/... (DelayedScaleDown*, JobVertexScaler*, ScalingExecutor*, ScalingMetric*, AutoScalerOptions, validation/AutoscalerValidator, event/*, exceptions/NotReadyException, state/*, topology/JobTopology, utils/AutoScalerUtils) and flink-autoscaler/src/test/...
Overhauls scale-down windowing, utilization bounds, observed scalability, uniform parallelism, metric handling, validation, and interfaces (cleanup, closeable). Broad test updates and new suites.
Autoscaler JDBC plugin + standalone
flink-autoscaler-plugin-jdbc/... (event/state interactors/stores, tests), flink-autoscaler-standalone/... (factories, executor, utils, tests), .../utils/HikariJDBCUtil.java
Migrates from Connection to DataSource, adds close lifecycles, adjusts serialization, updates executor semantics/validation, and adapts tests.
Operator core controllers/services
flink-kubernetes-operator/.../FlinkOperator.java, .../controller/FlinkDeployment*, .../controller/FlinkSessionJob*, .../controller/FlinkStateSnapshotController.java, .../reconciler/* (deployment/session job, utils), .../observer/*, .../service/*, .../kubeclient/decorators/FlinkConfMountDecorator.java, .../utils/* (Event*, Ingress*, FlinkUtils, ExceptionUtils, StatusRecorder, EventSourceUtils, K8sAnnotationsSanitizer)
Adds config mounting decorator, updates reconciliation/error/event flows, ingress reconciliation, cluster info API (jobId), exception/event recording, cleanup semantics, and SSA/termination settings.
Operator configuration and metrics
.../config/FlinkConfigManager.java, .../config/FlinkConfigBuilder.java, .../config/FlinkOperatorConfiguration.java, .../config/KubernetesOperatorConfigOptions.java, .../metrics/* (client, deployment, josdk, manager, options)
Expands default-config merging (version prefixes, YAML), adds new operator options (timeouts, ingress management, limits), introduces slow-request metric, tracks state size, and Blue/Green metric managers.
CI/CD and reusable workflows
.github/workflows/ci.yml, .github/workflows/e2e.yaml, .github/workflows/docs.yaml, .github/workflows/docker_push.yml, .github/actions/rsync-deployments/action.yml, .asf.yaml, LICENSE.rsync-deployments
Adds reusable E2E workflow, updates CI matrices/triggers, replaces external rsync with local composite action (MIT-licensed), pins binfmt, updates docs build on JDK17/Hugo, and adds protected branches.
E2E tests and data (non-B/G)
e2e-tests/data/* (version/config reshapes), e2e-tests/test_*.sh, e2e-tests/utils.sh
Updates manifests to Flink 1.20 and nested config, adds batch/dynamic-config tests, adjusts savepoint formats, utilities, and URLs.
Docs and examples
docs/**/* (many), examples/**/* (YAMLs, poms, Java)
Bumps versions to Flink 1.20/Beam 2.62.0/Java 17, restructures YAML configs, adds autoscaling README, fixes typos, and updates site config/analytics.
Build and repo maintenance
Dockerfile, Makefile, Sharechat-Utils.mk, .gitignore, NOTICE, flink-*-*/pom.xml
Moves base Java to 17, bumps operator version to 1.14-SNAPSHOT, adds make targets and env var checks, ignores .kube/.vscode, updates NOTICEs and Maven deps (incl. Guava additions).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User as User
  participant API as CRD: FlinkBlueGreenDeployment
  participant Ctrl as Controller
  participant Svc as BlueGreenDeploymentService
  participant K8s as Kubernetes API
  participant Flink as Flink Service
  participant Ing as Ingress

  rect rgba(135, 206, 250, 0.5)
    User->>K8s: Apply FlinkBlueGreenDeployment
    K8s-->>Ctrl: Watch event
    Ctrl->>Svc: INIT state handling
    Svc->>K8s: Create child FlinkDeployment (Blue)
    Svc->>Ctrl: Patch status INITIALIZING_BLUE
  end

  rect rgba(144, 238, 144, 0.5)
    K8s-->>Ctrl: Child READY update
    Ctrl->>Svc: Transition to ACTIVE_BLUE
    Svc->>Flink: Fetch job status/checkpoints
    Svc->>Ing: Reconcile active ingress
    Svc->>Ctrl: Patch status ACTIVE_BLUE/RUNNING
  end

  rect rgba(255, 228, 181, 0.5)
    User->>K8s: Update spec (transition)
    K8s-->>Ctrl: Watch update
    Ctrl->>Svc: Diff specs (decide TRANSITION)
    Svc->>Flink: Trigger savepoint (if required)
    Flink-->>Svc: Savepoint result
    Svc->>K8s: Create/Update Green deployment
    Svc->>Ctrl: Patch status TRANSITIONING_TO_GREEN
  end

  rect rgba(255, 182, 193, 0.5)
    K8s-->>Ctrl: Green READY
    Ctrl->>Svc: Finalize transition
    Svc->>Ing: Switch ingress to Green
    Svc->>K8s: Delete/cleanup Blue (after delay)
    Svc->>Ctrl: Patch status ACTIVE_GREEN/RUNNING
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~180 minutes

Suggested labels

feature, tests, documentation, dependencies, chore

Poem

A hare with helm charts hops in delight,
Blue to green, we flip the light—
Configs nest like burrows neat,
Autoscaler drums a steady beat.
CI winds hum, Java 17 breeze—
Ship the jars, rsync with ease.
Thump-thump: release, with carrots please! 🥕

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch release-sharechat-1.14
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch release-sharechat-1.14

@coderabbitai coderabbitai Bot added documentation Improvements or additions to documentation feature tests dependencies chore labels Mar 25, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Note

Due to the large number of review comments, Critical severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (9)
docs/content.zh/docs/operations/rbac.md (2)

110-110: ⚠️ Potential issue | 🔴 Critical

Fix the invalid kubectl command.

The command kubectl -f example/basic.yaml is missing the apply subcommand and will fail when users try to run it. The English version has the correct command.

🐛 Proposed fix
-    kubectl -f example/basic.yaml
+    kubectl apply -f example/basic.yaml
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/content.zh/docs/operations/rbac.md` at line 110, Replace the invalid
kubectl invocation string "kubectl -f example/basic.yaml" with the correct
command that includes the apply subcommand; update the occurrence to "kubectl
apply -f example/basic.yaml" so the example command runs successfully (search
for the exact text "kubectl -f example/basic.yaml" in the RBAC docs and replace
it).

108-108: ⚠️ Potential issue | 🟡 Minor

Fix the repository name typo to match the English version.

The English version (docs/content/docs/operations/rbac.md:108) was corrected to flink-kubernetes-operator, but the Chinese version still has the typo flink-kuberntes-operator.

📝 Proposed fix
-3. Optionally create an example Flink job in the namespace. Run the command from the root of the cloned flink-kuberntes-operator repo:
+3. Optionally create an example Flink job in the namespace. Run the command from the root of the cloned flink-kubernetes-operator repo:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/content.zh/docs/operations/rbac.md` at line 108, Update the Chinese docs
text that still contains the misspelled repo name "flink-kuberntes-operator" to
the correct "flink-kubernetes-operator" so it matches the English version;
locate the occurrence in docs/content.zh/docs/operations/rbac.md (the line with
"Optionally create an example Flink job in the namespace. Run the command from
the root of the cloned flink-kuberntes-operator repo:") and replace the repo
name string accordingly.
examples/flink-python-example/python-example.yaml (1)

38-38: ⚠️ Potential issue | 🟡 Minor

Version mismatch in jarURI placeholder.

The flinkVersion was updated to v1_20, but the jarURI still references flink-python_2.12-1.16.1.jar. While the comment indicates this is a placeholder, the version mismatch could cause confusion. Consider updating to reflect a more appropriate version (e.g., flink-python-1.20.x.jar) or use a version-agnostic placeholder.

Suggested fix
-    jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
+    jarURI: local:///opt/flink/opt/flink-python-1.20.0.jar # Note, this jarURI is actually a placeholder
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/flink-python-example/python-example.yaml` at line 38, The jarURI
placeholder references an older flink-python jar (flink-python_2.12-1.16.1.jar)
which mismatches the updated flinkVersion (v1_20); update the jarURI value used
in the yaml (the jarURI key) to either a matching v1.20 artifact (e.g.,
flink-python-1.20.x.jar) or switch to a version-agnostic placeholder (e.g.,
flink-python-<version>.jar or an env var like ${FLINK_PYTHON_JAR}) so the
placeholder clearly aligns with flinkVersion and avoids confusion.
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java (1)

64-84: ⚠️ Potential issue | 🟠 Major

Close the event handler to release its DataSource.

The eventHandler created at line 73 holds a reference to a DataSource via JdbcEventInteractor (which implements AutoCloseable). JdbcAutoScalerEventHandler has a close() method that properly cleans up these resources by calling jdbcEventInteractor.close(), but the test never invokes it. Add a try-finally block or use try-with-resources to ensure the event handler is closed after the assertion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java`
around lines 64 - 84, The test creates an event handler via
AutoscalerEventHandlerFactory.create(conf) but never closes it, leaking the
DataSource held by JdbcAutoScalerEventHandler/JdbcEventInteractor; wrap the
creation and assertion in a try-with-resources or try/finally and call
eventHandler.close() (or ensure the handler is used as an AutoCloseable) so
JdbcEventInteractor.close() is invoked to release the DataSource after the
assertion.
flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java (1)

76-78: ⚠️ Potential issue | 🟡 Minor

Use predeclared SQL statements instead of string formatting for cleanup operations.

At line 77, String.format("DELETE from %s", tableName) uses string concatenation with SQL. While the table names are hardcoded constants (not user input), use complete predeclared statements for consistency with the SQL safety guidelines.

Suggested change
-    private static final List<String> TABLES =
-            List.of("t_flink_autoscaler_state_store", "t_flink_autoscaler_event_handler");
+    private static final List<String> CLEANUP_SQL =
+            List.of(
+                    "DELETE FROM t_flink_autoscaler_state_store",
+                    "DELETE FROM t_flink_autoscaler_event_handler");
...
-            for (var tableName : TABLES) {
-                st.executeUpdate(String.format("DELETE from %s", tableName));
+            for (var sql : CLEANUP_SQL) {
+                st.executeUpdate(sql);
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java`
around lines 76 - 78, Replace the dynamic String.format SQL used in
PostgreSQLExtension cleanup loop: instead of calling
st.executeUpdate(String.format("DELETE from %s", tableName)) for each entry in
TABLES, use predeclared SQL statements (e.g., a Map or list of constant DELETE
statements keyed by the table name) and execute those constants via
st.executeUpdate; update the loop that references TABLES and the call site
(st.executeUpdate) so it looks up the matching predeclared DELETE statement for
each tableName and runs that statement instead of formatting SQL at runtime.
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java (1)

93-125: ⚠️ Potential issue | 🔴 Critical

Exit the retry loop after the first successful status write.

Once replaceStatus(...) returns, the update already succeeded. The current loop still performs up to two extra updateStatus() calls, and a conflict on those follow-up iterations can make this method report failure even though the first write already landed.

Proposed fix
         for (int i = 0; i < 3; i++) {
             // We retry the status update 3 times to avoid some intermittent connectivity errors
             try {
                 replaceStatus(resource, prevStatus, client);
                 err = null;
+                break;
             } catch (KubernetesClientException e) {
                 LOG.error("Error while patching status, retrying {}/3...", (i + 1), e);
                 Thread.sleep(1000);
                 err = e;
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java`
around lines 93 - 125, The retry loop in patchAndCacheStatus may continue after
a successful replaceStatus call causing later conflicts to surface; modify
patchAndCacheStatus so that once replaceStatus(resource, prevStatus, client)
succeeds you immediately exit the retry loop (e.g., break) and clear any saved
err so subsequent iterations are not executed; ensure the logic still throws the
last exception only if all retries failed, and then proceed to update
statusCache, call statusUpdateListener.accept(resource, prevStatus), and
metricManager.onUpdate(resource).
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java (1)

170-182: ⚠️ Potential issue | 🟠 Major

The plural secondary-resource path doesn't match these fixtures.

createContextWithDeployment(...) injects a synthetic Deployment, but TestingContext.getSecondaryResources() ignores it, only knows how to query FlinkDeployment from client, and returns null otherwise. Any code path that uses this accessor will either miss the supplied secondary or NPE when the helper was built with a null client.

Minimal hardening for the shared helper
         `@Override`
         public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
-            // TODO: improve this, even if we only support FlinkDeployment as a secondary resource
-            if (aClass.getSimpleName().equals(FlinkDeployment.class.getSimpleName())) {
+            if (aClass == FlinkDeployment.class && getClient() != null) {
                 KubernetesClient client = getClient();
                 var hasMetadata =
                         new HashSet<>(
                                 client.resources(FlinkDeployment.class)
                                         .inAnyNamespace()
                                         .list()
                                         .getItems());
                 return (Set<T1>) hasMetadata;
-            } else {
-                return null;
             }
+            return Collections.emptySet();
         }

Also consider overriding getSecondaryResources(...) in createContextWithDeployment(...) so the plural accessor returns the injected deployment fixture instead of querying the client.

Also applies to: 499-513

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java`
around lines 170 - 182, The helper createContextWithDeployment(...) currently
only overrides TestingContext.getSecondaryResource(...) but not
getSecondaryResources(...), so callers using the plural accessor will miss the
injected Deployment (or NPE if client is null); update the anonymous
TestingContext returned by createContextWithDeployment to also override
getSecondaryResources(Class expectedType, String eventSourceName) to return a
List containing the supplied deployment when expectedType is Deployment (or
empty list when deployment is null), and avoid querying the client there; apply
the same change to the other similar helper overloads (the other TestingContext
anonymous classes) so the plural accessor consistently returns the injected
fixture and is null-safe.
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java (2)

258-290: ⚠️ Potential issue | 🟠 Major

Register the blue/green controller only when its CRD is available.

run() now always installs this controller, unlike the snapshot controller. That makes rollout order matter: starting the new binary before the FlinkBlueGreenDeployment CRD is applied can take the whole operator down instead of just skipping the optional feature.

One way to gate registration
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
@@
     `@VisibleForTesting`
     void registerBlueGreenController() {
+        if (!KubernetesClientUtils.isCrdInstalled(FlinkBlueGreenDeployment.class)) {
+            LOG.warn(
+                    "Skipping registering blue/green controller as blue/green resources are not installed.");
+            return;
+        }
         var metricManager =
                 MetricManager.createFlinkBlueGreenDeploymentMetricManager(baseConfig, metricGroup);
         var statusRecorder =
                 StatusRecorder.createForFlinkBlueGreenDeployment(client, metricManager, listeners);
         var controller =
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java`
around lines 258 - 290, The operator must only register the blue/green
controller when the FlinkBlueGreenDeployment CRD is present; modify run() to
probe for the CRD and call registerBlueGreenController() only if the CRD exists
(log the decision), otherwise skip registration; implement the CRD existence
check using the Kubernetes client available in this class and keep
overrideControllerConfigs, FlinkBlueGreenDeploymentController and
registerBlueGreenController unchanged except for the conditional call in run().

94-121: ⚠️ Potential issue | 🟠 Major

Use the initialized client when creating the EventRecorder.

In the normal constructor path this code is called with client == null, so it bypasses the freshly created this.client. That leaves every controller using an EventRecorder built from the wrong instance.

Proposed fix
-        this.eventRecorder = EventRecorder.create(client, listeners);
+        this.eventRecorder = EventRecorder.create(this.client, listeners);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java`
around lines 94 - 121, The EventRecorder is created with the constructor
parameter 'client' instead of the initialized field 'this.client', causing
controllers to use the wrong KubernetesClient when the passed-in client is null;
update the EventRecorder creation to use the initialized field (pass this.client
to EventRecorder.create) so the recorder uses the actual client returned by
KubernetesClientUtils.getKubernetesClient, keeping the rest of the
initialization (listeners, ctxFactory) unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: ShareChat/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 9d7a5a4d-f743-4483-877e-b2edbeb9d3bc

📥 Commits

Reviewing files that changed from the base of the PR and between 7735609 and 28ea550.

⛔ Files ignored due to path filters (8)
  • docs/layouts/shortcodes/generated/auto_scaler_configuration.html is excluded by !**/generated/**
  • docs/layouts/shortcodes/generated/dynamic_section.html is excluded by !**/generated/**
  • docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html is excluded by !**/generated/**
  • docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html is excluded by !**/generated/**
  • docs/layouts/shortcodes/generated/system_advanced_section.html is excluded by !**/generated/**
  • docs/layouts/shortcodes/generated/system_section.html is excluded by !**/generated/**
  • docs/static/img/concepts/BlueGreenConfigurationQuickstart.png is excluded by !**/*.png
  • docs/static/img/concepts/Flink_Blue_Green_Deployment_State_Machine.png is excluded by !**/*.png
📒 Files selected for processing (292)
  • .asf.yaml
  • .github/actions/rsync-deployments/action.yml
  • .github/workflows/ci.yml
  • .github/workflows/docker_push.yml
  • .github/workflows/docs.sh
  • .github/workflows/docs.yaml
  • .github/workflows/e2e.yaml
  • .github/workflows/publish_snapshot.yml
  • .gitignore
  • Dockerfile
  • LICENSE.rsync-deployments
  • Makefile
  • NOTICE
  • README.md
  • Sharechat-Utils.mk
  • docs/README.md
  • docs/config.toml
  • docs/content.zh/docs/concepts/controller-flow.md
  • docs/content.zh/docs/concepts/overview.md
  • docs/content.zh/docs/custom-resource/autoscaler.md
  • docs/content.zh/docs/custom-resource/job-management.md
  • docs/content.zh/docs/custom-resource/overview.md
  • docs/content.zh/docs/custom-resource/pod-template.md
  • docs/content.zh/docs/development/guide.md
  • docs/content.zh/docs/operations/configuration.md
  • docs/content.zh/docs/operations/helm.md
  • docs/content.zh/docs/operations/ingress.md
  • docs/content.zh/docs/operations/metrics-logging.md
  • docs/content.zh/docs/operations/rbac.md
  • docs/content.zh/docs/try-flink-kubernetes-operator/quick-start.md
  • docs/content/docs/concepts/bluegreen-controller-flow.md
  • docs/content/docs/concepts/overview.md
  • docs/content/docs/custom-resource/autoscaler.md
  • docs/content/docs/custom-resource/job-management.md
  • docs/content/docs/custom-resource/overview.md
  • docs/content/docs/custom-resource/pod-template.md
  • docs/content/docs/custom-resource/reference.md
  • docs/content/docs/development/guide.md
  • docs/content/docs/operations/configuration.md
  • docs/content/docs/operations/helm.md
  • docs/content/docs/operations/ingress.md
  • docs/content/docs/operations/metrics-logging.md
  • docs/content/docs/operations/plugins.md
  • docs/content/docs/operations/rbac.md
  • docs/content/docs/operations/upgrade.md
  • docs/content/docs/try-flink-kubernetes-operator/quick-start.md
  • docs/layouts/_default/baseof.html
  • e2e-tests/cert-manager.yaml
  • e2e-tests/data/autoscaler.yaml
  • e2e-tests/data/bluegreen-ingress.yaml
  • e2e-tests/data/bluegreen-laststate.yaml
  • e2e-tests/data/bluegreen-stateless.yaml
  • e2e-tests/data/flinkdep-batch-cr.yaml
  • e2e-tests/data/flinkdep-cr.yaml
  • e2e-tests/data/multi-sessionjob.yaml
  • e2e-tests/data/savepoint.yaml
  • e2e-tests/data/sessionjob-cr.yaml
  • e2e-tests/test_batch_job.sh
  • e2e-tests/test_bluegreen_ingress_rotation.sh
  • e2e-tests/test_bluegreen_laststate.sh
  • e2e-tests/test_bluegreen_stateless.sh
  • e2e-tests/test_dynamic_config.sh
  • e2e-tests/test_dynamic_flink_conf.sh
  • e2e-tests/test_snapshot.sh
  • e2e-tests/utils.sh
  • examples/advanced-ingress.yaml
  • examples/autoscaling/Dockerfile
  • examples/autoscaling/README.md
  • examples/autoscaling/autoscaling-dynamic.yaml
  • examples/autoscaling/autoscaling.yaml
  • examples/autoscaling/pom.xml
  • examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
  • examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
  • examples/basic-checkpoint-ha.yaml
  • examples/basic-ingress.yaml
  • examples/basic-reactive.yaml
  • examples/basic-session-deployment-and-job.yaml
  • examples/basic-session-deployment-only.yaml
  • examples/basic.yaml
  • examples/custom-logging.yaml
  • examples/flink-beam-example/beam-example.yaml
  • examples/flink-beam-example/pom.xml
  • examples/flink-python-example/python-example.yaml
  • examples/flink-sql-runner-example/pom.xml
  • examples/flink-sql-runner-example/sql-example.yaml
  • examples/flink-tls-example/basic-secure-deployment-only.yaml
  • examples/flink-tls-example/basic-secure-session-job-only.yaml
  • examples/flink-tls-example/basic-secure.yaml
  • examples/kubernetes-client-examples/README.MD
  • examples/kubernetes-client-examples/pom.xml
  • examples/kubernetes-client-examples/src/main/java/org/apache/flink/examples/Basic.java
  • examples/pod-template.yaml
  • examples/snapshot/job-from-savepoint.yaml
  • flink-autoscaler-plugin-jdbc/pom.xml
  • flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
  • flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
  • flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
  • flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
  • flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
  • flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
  • flink-autoscaler-standalone/pom.xml
  • flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
  • flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
  • flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
  • flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java
  • flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
  • flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
  • flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
  • flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
  • flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
  • flink-autoscaler/pom.xml
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
  • flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
  • flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
  • flink-kubernetes-docs/pom.xml
  • flink-kubernetes-operator-api/pom.xml
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkBlueGreenDeployment.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkSessionJob.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDeploymentType.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/ConfigObjectNode.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/ConfigObjectNodeDeserializer.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentConfigOptions.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/RemoveScaleSubResource.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
  • flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
  • flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/FlinkConfigurationYamlSupportTest.java
  • flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/ConfigObjectNodeTest.java
  • flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
  • flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
  • flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtilsTest.java
  • flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityCheckerTest.java
  • flink-kubernetes-operator-api/src/test/resources/pre-flink-configuration-yaml-crd.yml
  • flink-kubernetes-operator-api/src/test/resources/test-deployment-key-value-configuration.yaml
  • flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
  • flink-kubernetes-operator-api/src/test/resources/test-deployment-yaml-configuration.yaml
  • flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
  • flink-kubernetes-operator-api/src/test/resources/test-snapshot-with-unknown-fields.yaml
  • flink-kubernetes-operator-api/src/test/resources/test-snapshot.yaml
  • flink-kubernetes-operator/pom.xml
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenStateHandlerRegistry.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/AbstractBlueGreenStateHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/BlueGreenStateHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/TransitioningStateHandler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetrics.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetrics.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenResourceLifecycleMetricTracker.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResult.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizer.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
  • flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
  • flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContextTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java
  • flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java
💤 Files with no reviewable changes (2)
  • .github/workflows/docs.sh
  • flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

非常重要的是要理解,观察阶段会将集群和资源的某个时间点视图记录到状态中。在大多数情况下,这可能会在任何未来时间发生变化(一个正在运行的作业可能会随时失败),在某些罕见情况下它是稳定的(一个终端失败或完成的作业将保持不变)。因此,协调器逻辑必须始终考虑集群状态可能已经与状态中的内容发生偏差的可能性(大多数复杂情况由此产生)。

{{< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler Class Hierarchy" >}}
< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix the broken Hugo shortcode syntax.

The image shortcode has incorrect syntax with spaces around the angle brackets. It should use Hugo's shortcode delimiters {{< ... >}} to match the pattern used in line 56.

🖼️ Proposed fix for the image shortcode
-< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >
+{{< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >}}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >
{{< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >}}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/content.zh/docs/concepts/controller-flow.md` at line 94, The image tag
"< img src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >" uses
incorrect spaced angle-bracket syntax; replace it with a Hugo shortcode using
the {{< ... >}} delimiters (e.g., use {{< img
src="/img/concepts/reconciler_classes.svg" alt="Reconciler 类层次结构" >}} or the
project's standard {{< figure ... >}} variant) so it matches the shortcode
pattern used elsewhere (see the working example around the other image
shortcode).

Install the certificate manager on your Kubernetes cluster to enable adding the webhook component (only needed once per Kubernetes cluster):
```bash
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.18.2/cert-manager.yaml
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify that cert-manager v1.18.2 release exists
curl -s -o /dev/null -w "%{http_code}" https://github.com/jetstack/cert-manager/releases/tag/v1.18.2

Repository: ShareChat/flink-kubernetes-operator

Length of output: 82


🏁 Script executed:

#!/bin/bash
# Follow redirects to get the final HTTP status code
curl -s -o /dev/null -w "%{http_code}" -L https://github.com/jetstack/cert-manager/releases/tag/v1.18.2

# Also try the direct download URL
echo "---"
curl -s -o /dev/null -w "%{http_code}" -L https://github.com/jetstack/cert-manager/releases/download/v1.18.2/cert-manager.yaml

Repository: ShareChat/flink-kubernetes-operator

Length of output: 89


🌐 Web query:

cert-manager v1.18.2 release github jetstack

💡 Result:

No evidence found of a cert-manager v1.18.2 release on GitHub or official documentation as of 2026-03-25. The cert-manager project is maintained by Jetstack (jetstack-release-bot publishes releases). The v1.18 series latest patch is v1.18.6 (released Feb 24, 2026), a vulnerability fix for CVE-2025-68121. v1.18 EOL is Mar 10, 2026 per official supported releases page. Current latest stable is v1.20.0 (Mar 10, 2026). Full v1.18 release notes: https://cert-manager.io/docs/releases/release-notes/release-notes-1.18/. GitHub releases page: https://github.com/cert-manager/cert-manager/releases. v1.18.0 released Jun 10, 2025 with features like ACME profiles support and breaking changes (default PrivateKey.RotationPolicy=Always, RevisionHistoryLimit=1).

Citations:


Update cert-manager version to a valid release; v1.18.2 does not exist.

The documentation references cert-manager v1.18.2, which is not a published release. The latest v1.18 patch is v1.18.6 (released Feb 24, 2026), and the current stable release is v1.20.0 (Mar 10, 2026). The v1.18 series reached end-of-life on Mar 10, 2026. Update the URL to use an available version such as v1.18.6, v1.20.0, or another supported release to ensure the quick-start guide works.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/content/docs/try-flink-kubernetes-operator/quick-start.md` at line 71,
The documentation references a non-existent cert-manager release (v1.18.2) in
the line containing "kubectl apply -f
https://github.com/jetstack/cert-manager/releases/download/v1.18.2/cert-manager.yaml";
update that URL to point to a valid release (for example v1.18.6 or v1.20.0) by
replacing "v1.18.2" with the chosen published tag so the quick-start command
downloads an available cert-manager manifest.

Comment on lines +9 to +34
- com.squareup.okhttp3:okhttp:jar:4.12.0
- com.squareup.okio:okio-jvm:jar:3.6.0
- com.squareup.okio:okio:jar:3.6.0
- com.twitter:chill-java:jar:0.7.6
- com.zaxxer:HikariCP:jar:5.1.0
- commons-cli:commons-cli:jar:1.5.0
- commons-codec:commons-codec:jar:1.17.0
- commons-collections:commons-collections:jar:3.2.2
- commons-io:commons-io:jar:2.15.1
- org.apache.commons:commons-lang3:jar:3.18.0
- org.apache.commons:commons-math3:jar:3.6.1
- org.apache.logging.log4j:log4j-1.2-api:jar:2.23.1
- org.apache.logging.log4j:log4j-api:jar:2.23.1
- org.apache.logging.log4j:log4j-core:jar:2.23.1
- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.23.1
- org.javassist:javassist:jar:3.24.0-GA
- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.9.10
- org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.8.21
- org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.8.21
- org.jetbrains.kotlin:kotlin-stdlib:jar:1.8.21
- org.jetbrains:annotations:jar:13.0
- org.objenesis:objenesis:jar:2.1
- org.quartz-scheduler:quartz:jar:2.4.0
- org.slf4j:slf4j-api:jar:1.7.36
- org.snakeyaml:snakeyaml-engine:jar:2.6
- tools.profiler:async-profiler:jar:2.9
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Extract dependencies from pom.xml and compare with NOTICE file

# Find the autoscaler-standalone pom.xml and extract dependencies
fd -t f "pom.xml" flink-autoscaler-standalone -x cat {} | \
  gawk '/<dependencies>/,/<\/dependencies>/ {print}'

# Also check for shade/assembly plugin configuration that might bundle dependencies
fd -t f "pom.xml" flink-autoscaler-standalone -x cat {} | \
  gawk '/<plugin>/,/<\/plugin>/ {if (/maven-shade-plugin|maven-assembly-plugin/) print}'

Repository: ShareChat/flink-kubernetes-operator

Length of output: 5113


🏁 Script executed:

#!/bin/bash
# Extract complete maven-shade-plugin configuration
fd -t f "pom.xml" flink-autoscaler-standalone -x cat {} | \
  gawk '/maven-shade-plugin/,/<\/plugin>/ {print NR": "$0}' | head -100

Repository: ShareChat/flink-kubernetes-operator

Length of output: 2484


🏁 Script executed:

#!/bin/bash
# Display the current NOTICE file to see its full content
cat -n flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE

Repository: ShareChat/flink-kubernetes-operator

Length of output: 1984


🏁 Script executed:

#!/bin/bash
# Extract the dependency tree to understand transitive dependencies
fd -t f "pom.xml" flink-autoscaler-standalone -x head -20 {}

Repository: ShareChat/flink-kubernetes-operator

Length of output: 1102


🏁 Script executed:

#!/bin/bash
# Extract properties and dependency versions
fd -t f "pom.xml" flink-autoscaler-standalone -x cat {} | \
  gawk '/^<properties>/,/<\/properties>/ {print NR": "$0}' | head -50

Repository: ShareChat/flink-kubernetes-operator

Length of output: 61


🏁 Script executed:

#!/bin/bash
# Get complete pom.xml content length and structure
wc -l flink-autoscaler-standalone/pom.xml

Repository: ShareChat/flink-kubernetes-operator

Length of output: 118


🏁 Script executed:

#!/bin/bash
# Extract all dependencies section with versions
fd -t f "pom.xml" flink-autoscaler-standalone -x cat {} | \
  gawk '/^    <dependencies>/,/<\/dependencies>/ {print NR": "$0}' | tail -80

Repository: ShareChat/flink-kubernetes-operator

Length of output: 2958


🏁 Script executed:

#!/bin/bash
# Read complete pom.xml to see properties and parent declaration
cat -n flink-autoscaler-standalone/pom.xml

Repository: ShareChat/flink-kubernetes-operator

Length of output: 10882


🏁 Script executed:

#!/bin/bash
# Find and read the parent pom.xml
find . -path "*flink-kubernetes-operator/pom.xml" -type f | head -1 | xargs cat | head -100

Repository: ShareChat/flink-kubernetes-operator

Length of output: 3883


🏁 Script executed:

#!/bin/bash
# List structure to find parent pom location
ls -la flink-autoscaler-standalone/..

Repository: ShareChat/flink-kubernetes-operator

Length of output: 2332


🏁 Script executed:

#!/bin/bash
# Extract properties section from parent pom.xml
cat -n pom.xml | sed -n '/<properties>/,/<\/properties>/p'

Repository: ShareChat/flink-kubernetes-operator

Length of output: 2933


Update commons-io version in NOTICE file from 2.15.1 to 2.17.0.

The NOTICE file at lines 9-34 lists dependency versions that should match the build configuration in pom.xml. Review of the parent pom.xml properties shows that commons-io.version is defined as 2.17.0, but the NOTICE file lists commons-io as version 2.15.1. All other verifiable versions (log4j, slf4j, okhttp, quartz, HikariCP, commons-lang3) match correctly.

Verification details

Version comparison against pom.xml properties:

  • commons-io.version property: 2.17.0 (pom.xml line 85)
  • commons-io in NOTICE: 2.15.1 (line 17)
  • All other verifiable dependency versions match their pom.xml property definitions
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE` around lines
9 - 34, The NOTICE file lists commons-io as "commons-io:commons-io:jar:2.15.1"
which is out of sync with the project's pom property commons-io.version
(2.17.0); update the NOTICE entry "commons-io:commons-io:jar:2.15.1" to
"commons-io:commons-io:jar:2.17.0" so the NOTICE matches the pom.xml.

Comment on lines +155 to +172
/** Get properties map for the cluster-side after removal of some keys. */
private List<String> getClusterSideConfData(Configuration flinkConfig) {
// For Flink versions that use the standard config we have to set the standardYaml flag in
// the Configuration object manually instead of simply cloning, otherwise it would simply
// inherit it from the base config (which would always be false currently).
Configuration clusterSideConfig = new Configuration(useStandardYamlConfig());
clusterSideConfig.addAll(flinkConfig);
// Remove some configuration options that should not be taken to cluster side.
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS);
clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
clusterSideConfig.removeConfig(TaskManagerOptions.HOST);

validateConfigKeysForV2(clusterSideConfig);

return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

Flink Configuration class constructor boolean standardYaml parameter

💡 Result:

The Apache Flink Configuration class (org.apache.flink.configuration.Configuration) does not have a public constructor that takes a boolean standardYaml parameter. The documented constructors are: public Configuration for an empty configuration, and public Configuration(Configuration other) to copy another configuration. No constructor with a boolean parameter named standardYaml or similar is mentioned in the official API docs for Flink 2.2-SNAPSHOT. However, starting with Flink 1.19 (FLIP-366), Flink supports "standard YAML" configuration via config.yaml files, distinct from the legacy flink-conf.yaml. Configuration objects loaded from standard YAML are treated differently internally, e.g., in ConfigurationUtils.convertConfigToWritableLines(Configuration configuration, boolean flattenYaml), which checks "the standardYaml attribute of the configuration object" to decide between nested YAML or flat output. This suggests Configuration instances have an internal "standardYaml" attribute or method (possibly private or added post-API doc generation) to flag if they originate from standard YAML parsing. The confData field stores key/value pairs without YAML escaping, explicitly noting it excludes "escaping that is required by the standard YAML syntax". No public constructor exposes a standardYaml boolean directly; such flagging likely occurs during loading via GlobalConfiguration or YamlParserUtils, not via a user-facing constructor. Sources confirm no matching public constructor; any such parameter would be internal or non-existent.

Citations:


🏁 Script executed:

# Find the file mentioned in the review
find . -name "FlinkConfMountDecorator.java" -type f

Repository: ShareChat/flink-kubernetes-operator

Length of output: 199


🏁 Script executed:

# Search for Configuration constructors in the codebase
rg "class Configuration" -A 10 --type java

Repository: ShareChat/flink-kubernetes-operator

Length of output: 61


🏁 Script executed:

# Search for useStandardYamlConfig method definition
rg "useStandardYamlConfig" --type java -B 2 -A 5

Repository: ShareChat/flink-kubernetes-operator

Length of output: 5305


🏁 Script executed:

# Read the FlinkConfMountDecorator file to see imports
head -50 ./flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java

Repository: ShareChat/flink-kubernetes-operator

Length of output: 2814


🏁 Script executed:

# Search for Configuration class definition in the repository
find . -path "*/flink-core/*" -o -path "*/flink-configuration/*" | head -20

Repository: ShareChat/flink-kubernetes-operator

Length of output: 61


🏁 Script executed:

# Check Flink version in pom.xml
find . -name "pom.xml" -exec grep -l "flink.version\|<version>" {} \; | head -5

Repository: ShareChat/flink-kubernetes-operator

Length of output: 589


🏁 Script executed:

# Find and check main pom.xml for Flink version
find . -maxdepth 2 -name "pom.xml" -type f

Repository: ShareChat/flink-kubernetes-operator

Length of output: 373


🏁 Script executed:

# Check Flink version in the main pom.xml
cat ./pom.xml | grep -A 2 -B 2 "flink" | head -30

Repository: ShareChat/flink-kubernetes-operator

Length of output: 1244


🏁 Script executed:

# Look for any custom Configuration class in this project
find . -path "*/src/*" -name "Configuration.java" -type f

Repository: ShareChat/flink-kubernetes-operator

Length of output: 61


🏁 Script executed:

# Search for Flink version property in pom.xml
grep -n "flink\|version" ./pom.xml | head -40

Repository: ShareChat/flink-kubernetes-operator

Length of output: 2528


🏁 Script executed:

# Look for the Flink version more specifically
rg "flink.version|<flink" ./pom.xml -A 1 -B 1

Repository: ShareChat/flink-kubernetes-operator

Length of output: 399


This code will not compile: Configuration(boolean) constructor does not exist.

The Configuration class in Flink 1.20.1 has no public constructor accepting a boolean parameter. The only available constructors are Configuration() and Configuration(Configuration other). While the Configuration object does have an internal standardYaml attribute used internally (e.g., in ConfigurationUtils.convertConfigToWritableLines), there is no public API to set it via a constructor.

The code at line 160 will fail at compile time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java`
around lines 155 - 172, The code uses a non-existent Configuration(boolean)
constructor in getClusterSideConfData; replace the constructor call with a
no-arg Configuration and then copy settings from flinkConfig (i.e., change "new
Configuration(useStandardYamlConfig())" to "new Configuration()" followed by
clusterSideConfig.addAll(flinkConfig)); keep the rest of getClusterSideConfData
(removals and validateConfigKeysForV2) unchanged and, if you need to force the
internal standard-yaml behavior, do so via the existing public utilities or
helper methods elsewhere rather than a boolean constructor.

Comment on lines +155 to +157
} catch (Exception e) {
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
return Set.of();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fail closed when unmanaged-job discovery fails.

Returning Set.of() here makes cleanupInternal() behave as if the cluster has no live unmanaged jobs. With BLOCK_ON_UNMANAGED_JOBS enabled, a transient REST failure can therefore delete a session cluster that still has running jobs.

🛑 Suggested direction
-    Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
+    Optional<Set<JobID>> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
 ...
-            return Set.of();
+            return Optional.empty();

Then, in cleanupInternal(), treat Optional.empty() as a reschedule condition instead of "no jobs found".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java`
around lines 155 - 157, The catch block in SessionReconciler that currently logs
the exception and returns Set.of() should instead return an "unknown" sentinel
(e.g., Optional.empty() or a distinguished result) so callers can reschedule
instead of assuming no unmanaged jobs; change the exception handler in the
method that detects non-terminal/unmanaged jobs (the block that now does
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e); return
Set.of();) to return Optional.empty() (or otherwise propagate an unknown result)
and then update cleanupInternal() to treat Optional.empty() as a reschedule
condition rather than "no jobs found" so transient REST failures won't permit
cluster deletion.

Comment on lines 127 to 131
pod.setStatus(
new PodStatusBuilder()
.withContainerStatuses(Collections.singletonList(cs))
.withContainerStatuses(new ContainerStatusBuilder().withReady().build())
.withInitContainerStatuses(cs)
.build());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

Does io.fabric8.kubernetes.api.model.ContainerStatusBuilder expose a no-argument withReady()overload in the Fabric8 Kubernetes Client version used by this project, or should this call bewithReady(true)?

💡 Result:

No, io.fabric8.kubernetes.api.model.ContainerStatusBuilder does not expose a no-argument withReady overload. It requires withReady(boolean ready), typically called as withReady(true) to set the container as ready. The Fabric8 Kubernetes Client (latest ~7.6 as of 2026-03) uses generated builders following Kubernetes API conventions. ContainerStatus.ready is a required Boolean field, so the fluent builder provides withReady(Boolean b) but no no-arg overload that defaults to true, unlike some optional fields with withNew/withXXX patterns. Evidence from Javadoc result 1 (kubernetes-model-core 6.4.0): lists "withReady, withReady" indicating two overloads, both taking arguments (likely boolean and Boolean). No mention of overload. Cheat sheet (result 2) shows similar builder patterns requiring explicit values like withImage("nginx"). Kubernetes spec confirms ContainerStatus.ready is non-optional Boolean.

Citations:


🏁 Script executed:

cat -n flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java | sed -n '120,135p'

Repository: ShareChat/flink-kubernetes-operator

Length of output: 854


withReady() is not a valid ContainerStatusBuilder call and will cause a compilation error.

Fabric8's ContainerStatusBuilder does not expose a no-argument withReady() overload. The correct method signature requires a boolean argument: withReady(true).

Required fix
         pod.setStatus(
                 new PodStatusBuilder()
-                        .withContainerStatuses(new ContainerStatusBuilder().withReady().build())
+                        .withContainerStatuses(
+                                new ContainerStatusBuilder()
+                                        .withName("main")
+                                        .withReady(true)
+                                        .build())
                         .withInitContainerStatuses(cs)
                         .build());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pod.setStatus(
new PodStatusBuilder()
.withContainerStatuses(Collections.singletonList(cs))
.withContainerStatuses(new ContainerStatusBuilder().withReady().build())
.withInitContainerStatuses(cs)
.build());
pod.setStatus(
new PodStatusBuilder()
.withContainerStatuses(
new ContainerStatusBuilder()
.withName("main")
.withReady(true)
.build())
.withInitContainerStatuses(cs)
.build());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java`
around lines 127 - 131, The ContainerStatusBuilder invocation is using a
non-existent no-arg withReady(); update the call to use the correct signature by
replacing ContainerStatusBuilder().withReady() with
ContainerStatusBuilder().withReady(true) so the PodStatusBuilder(...)
construction compiles; look for the ContainerStatusBuilder usage inside the
PodStatusBuilder chain in TestUtils.java and change the withReady call to pass
true.

Copy link
Copy Markdown
Member

@david-sharechat david-sharechat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks sound against flink-k8s-operator branch 👍

@mradulrathorej mradulrathorej merged commit 571ffbd into main-sharechat Apr 21, 2026
236 checks passed
@mradulrathorej mradulrathorej deleted the release-sharechat-1.14 branch April 21, 2026 07:18
@mradulrathorej mradulrathorej restored the release-sharechat-1.14 branch April 24, 2026 16:12
mradulrathorej added a commit that referenced this pull request May 13, 2026
- Add missing static import of TRUE_PROCESSING_RATE in ScalingExecutor
  (1.14 had it for unrelated upstream code that 1.15 removed, so the
  cherry-pick did not bring it in)
- Update testUniformParallelism to use the 1.15 option names:
  TARGET_UTILIZATION -> UTILIZATION_TARGET, and replace the
  TARGET_UTILIZATION_BOUNDARY value with explicit UTILIZATION_MIN
  and UTILIZATION_MAX bounds (same fix applied for 1.14 in PR #4)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

chore dependencies documentation Improvements or additions to documentation feature tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.