Skip to content

Conversation

watchgou
Copy link
Contributor

@watchgou watchgou commented Sep 12, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #issue_id

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Added admin API to exchange broker HA info, including HA address, master flush offset, and broker address.
    • Brokers now auto-update master/HA addresses and can sync master flush offsets on startup.
    • Improved handling of master online/offline events to adjust broker services accordingly.
    • Introduced concurrency-safe tracking of the in-cluster minimum broker, with getters for current min broker ID and address.
  • Tests

    • Added serialization tests for the HA info response header to ensure correct camelCase encoding.

Copy link
Contributor

coderabbitai bot commented Sep 12, 2025

Walkthrough

Adds HA info exchange support: new handler wired into admin processor, new response header type, and runtime accessors. Implements in-group min-broker tracking with async updates and lifecycle hooks. Exposes protocol module for the new response header. Integrates request handling for ExchangeBrokerHaInfo.

Changes

Cohort / File(s) Summary
Broker runtime accessors
rocketmq-broker/src/broker_runtime.rs
Adds slave_synchronize(&self) accessor and update_slave_master_addr(&mut self, ...) to mutate slave sync master address.
Admin processor wiring for HA
rocketmq-broker/src/processor/admin_broker_processor.rs
Introduces UpdateBrokerHaHandler; stores it in AdminBrokerProcessor; initializes in new; routes RequestCode::ExchangeBrokerHaInfo to handler.
Min-broker in-group tracking
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
Adds internal RWLock state, async getters, and async update flow: reacts to min-broker changes, toggles special services, handles master online/offline, and updates runtime/store addresses.
HA update handler
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
New UpdateBrokerHaHandler: parses request, updates HA master address and offsets, or responds with master HA info when applicable. Returns success response.
Remoting header exports
rocketmq-remoting/src/protocol/header.rs
Exposes exchange_ha_info_response_header module.
HA response header type
rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs
Adds ExchangeHaInfoResponseHeader with camelCase serde, derives codecs; includes unit test validating JSON keys/values.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant AdminBrokerProcessor as AdminBrokerProcessor
  participant UpdateBrokerHaHandler as UpdateBrokerHaHandler
  participant BrokerRuntime as BrokerRuntimeInner
  participant MessageStore as MessageStore

  Client->>AdminBrokerProcessor: RemotingCommand(RequestCode::ExchangeBrokerHaInfo)
  AdminBrokerProcessor->>UpdateBrokerHaHandler: update_broker_ha_info(...)
  UpdateBrokerHaHandler->>UpdateBrokerHaHandler: Decode ExchangeHAInfoRequestHeader

  alt header.masterHaAddress present and non-empty
    UpdateBrokerHaHandler->>BrokerRuntime: set HA master address
    opt message_store available
      UpdateBrokerHaHandler->>MessageStore: update master address
      alt sync-on-startup enabled and current masterFlushOffset == 0
        UpdateBrokerHaHandler->>MessageStore: set master flushed offset
      end
    end
    UpdateBrokerHaHandler-->>AdminBrokerProcessor: Success response
  else header.masterHaAddress present but empty and broker is MASTER
    UpdateBrokerHaHandler->>BrokerRuntime: read HA server addr and offsets
    UpdateBrokerHaHandler-->>AdminBrokerProcessor: Success + ExchangeHaInfoResponseHeader
  else
    UpdateBrokerHaHandler-->>AdminBrokerProcessor: Success response
  end

  AdminBrokerProcessor-->>Client: RemotingCommand(Response)
Loading
sequenceDiagram
  autonumber
  participant Controller as NotifyMinBrokerChangeIdHandler
  participant Lock as RWLock<MinBrokerIngroup>
  participant Runtime as BrokerRuntimeInner
  participant Store as MessageStore

  Controller->>Controller: update_min_broker(header)
  alt enabled && broker_id != MASTER_ID
    Controller->>Lock: try_read/write (timeout 3s)
    alt minBrokerId changed
      Controller->>Controller: on_min_broker_change(...)
      Controller->>Lock: update state
      Controller->>Controller: change_special_service_status(should_start)
      Controller->>Runtime: start/stop special services
      alt offline_broker_addr == known master
        Controller->>Controller: on_master_offline()
        Controller->>Runtime: clear master addresses
        Controller->>Store: clear master addresses
      end
      alt min_broker_id == MASTER_ID or min_broker_addr non-empty
        Controller->>Controller: on_master_on_line(addr, ha_addr)
        note right of Controller: May sync master flush offset (placeholders for unimplemented branches)
      end
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Pre-merge checks (2 passed, 1 inconclusive)

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title Check ❓ Inconclusive The title "Broker id change" is related to part of the changeset (min-broker id handling) but is too brief and vague to convey the primary scope of this multi-file PR, which also adds broker-HA exchange handling (UpdateBrokerHaHandler), new remoting headers, and runtime slave-sync APIs; it does not clearly summarize the main change for a reviewer scanning history. Please rename the PR to a concise title that reflects the main scope, for example "Introduce UpdateBrokerHaHandler and min-broker ID handling" or "Add broker HA exchange and min-broker change logic", and update the description to list the key files/behaviors changed (HA exchange handler, notify-min-broker logic, broker runtime updates) so reviewers can immediately understand the intent.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.

Poem

Thump-thump, my paws sync HA delight,
New headers hop across the wire at night.
Locks keep the warren’s min-broker tight,
Masters go offline—we set it right.
With swift byte-carrots and offsets bright,
This bunny ships the cluster light. 🥕✨

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.

✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@watchgou 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@watchgou watchgou closed this Sep 12, 2025
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 12, 2025
@watchgou watchgou reopened this Sep 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (1)

75-84: Avoid panics on header decode and fix error message

unwrap() will crash the broker on malformed requests. Handle decode failures and avoid expect(...) on optional fields.

-        let change_header = request
-            .decode_command_custom_header::<NotifyMinBrokerIdChangeRequestHeader>()
-            .unwrap();
-
-        let latest_broker_id = change_header
-            .min_broker_id
-            .expect("min broker id not must be present");
+        let change_header = match request
+            .decode_command_custom_header::<NotifyMinBrokerIdChangeRequestHeader>()
+        {
+            Ok(h) => h,
+            Err(e) => {
+                error!("Failed to decode NotifyMinBrokerIdChangeRequestHeader: {}", e);
+                let mut response = RemotingCommand::default();
+                response.set_code_ref(ResponseCode::SystemError);
+                response.set_remark(format!("Invalid request header: {}", e));
+                return Some(response);
+            }
+        };
🧹 Nitpick comments (6)
rocketmq-broker/src/broker_runtime.rs (1)

2060-2070: Expose a more ergonomic API and avoid silent no-op.

  • Returning a status helps callers know whether an update occurred.
  • Accepting Into avoids an unnecessary allocation at call sites.
@@
     #[inline]
-    pub fn slave_synchronize(&self) -> &Option<SlaveSynchronize<MS>> {
-        &self.slave_synchronize
+    pub fn slave_synchronize(&self) -> &Option<SlaveSynchronize<MS>> {
+        &self.slave_synchronize
     }
 
     #[inline]
-    pub fn update_slave_master_addr(&mut self, master_addr: CheetahString) {
-        if let Some(ref mut slave) = self.slave_synchronize {
-            slave.set_master_addr(master_addr);
-        };
-    }
+    pub fn update_slave_master_addr<I: Into<CheetahString>>(&mut self, master_addr: I) -> bool {
+        if let Some(ref mut slave) = self.slave_synchronize {
+            slave.set_master_addr(master_addr.into());
+            true
+        } else {
+            false
+        }
+    }
rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs (2)

21-27: Keep acronym casing consistent with the request header.

Request uses ExchangeHAInfoRequestHeader while response is ExchangeHaInfoResponseHeader. Consider renaming to ExchangeHAInfoResponseHeader for consistency across the API. Low impact now while adoption is limited.

Would you like me to prepare a follow-up rename patch across the crate?


24-26: Skip None fields during serialization.

Keeps payloads compact and avoids emitting nulls in JSON-based tests/logs.

 pub struct ExchangeHaInfoResponseHeader {
-    pub master_ha_address: Option<CheetahString>,
-    pub master_flush_offset: Option<i64>,
-    pub master_address: Option<CheetahString>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub master_ha_address: Option<CheetahString>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub master_flush_offset: Option<i64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub master_address: Option<CheetahString>,
 }
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)

165-168: Tighten "master online" condition

Current condition fires if either id==MASTER_ID or addr is non-empty. Consider requiring both to reduce false positives.

-        if min_broker_id == MASTER_ID || !min_broker_addr.is_empty() {
+        if min_broker_id == MASTER_ID && !min_broker_addr.is_empty() {

51-57: Confirm MASTER_ID type vs u64

min_broker_id_in_group is Option but MASTER_ID’s type may differ. If not u64, cast explicitly.

-            min_broker_id_in_group: Some(MASTER_ID),
+            min_broker_id_in_group: Some(MASTER_ID as u64),

223-229: API surface: avoid exposing Arc in getter

Returning Arc leaks internal synchronization details. Prefer returning CheetahString (clone) or &CheetahString with a guard pattern.

Want me to sketch a small accessor wrapper that avoids cloning on the hot path?

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7200334 and ae12384.

📒 Files selected for processing (6)
  • rocketmq-broker/src/broker_runtime.rs (1 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor.rs (6 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (2 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs (2)
rocketmq-broker/src/broker_runtime.rs (1)
  • message_store (1908-1910)
rocketmq-broker/src/processor/admin_broker_processor.rs (1)
  • new (79-105)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-broker/src/slave/slave_synchronize.rs (1)
  • master_addr (46-48)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (2)
rocketmq-broker/src/broker_runtime.rs (8)
  • message_store (1908-1910)
  • broker_config (271-273)
  • broker_config (1838-1840)
  • on_min_broker_change (2562-2570)
  • get_min_broker_id_in_group (2237-2239)
  • get_broker_addr (2443-2445)
  • change_special_service_status (2497-2509)
  • slave_synchronize (2061-2063)
rocketmq-broker/src/slave/slave_synchronize.rs (1)
  • master_addr (46-48)
rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs (1)
rocketmq-common/src/utils/parse_config_file.rs (1)
  • cfg (36-36)
rocketmq-broker/src/processor/admin_broker_processor.rs (6)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)
  • new (52-57)
  • new (61-66)
  • request (75-76)
rocketmq-broker/src/broker_runtime.rs (1)
  • new (140-269)
rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs (3)
  • new (44-48)
  • request (59-60)
  • request (94-95)
rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs (1)
  • new (46-50)
rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs (3)
  • new (46-50)
  • request (62-63)
  • request (115-116)
rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (7)
  • new (62-66)
  • request (78-79)
  • request (314-315)
  • request (448-449)
  • request (527-528)
  • request (574-575)
  • request (600-601)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: auto-approve
🔇 Additional comments (6)
rocketmq-remoting/src/protocol/header.rs (1)

32-32: LGTM: response header export added.

rocketmq-broker/src/processor/admin_broker_processor.rs (2)

36-46: Wiring for HA handler looks correct.

Module import, construction, and field storage are consistent with other handlers.

Also applies to: 58-59, 92-93, 103-104


235-239: Routing ExchangeBrokerHaInfo to the new handler is fine; ensure error codes propagate.

If the handler needs to return non-Success responses, have it set appropriate ResponseCode and remarks; the match arm already forwards its result.

Do we have integration tests covering a malformed ExchangeHAInfoRequestHeader path?

rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)

171-176: Service status propagation looks good

Async delegation to BrokerRuntimeInner::change_special_service_status is appropriate.


100-102: Scope of updates when acting as master

You skip updates unless enable_slave_acting_master && broker_id != MASTER_ID. Confirm masters don’t need to react to NotifyMinBrokerIdChange.


110-111: Compare against tracked in-group min, not local broker_id

Read the tracked cluster min from the same lock and compare the incoming min_broker_id to that — do not call self.broker_runtime_inner.get_min_broker_id_in_group() here.
File: rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (lines 110–111).

Comment on lines +103 to +126
if self
.lock
.try_write_timeout(Duration::from_millis(3000))
.await
.is_some()
{
if let Some(min_broker_id) = change_header.min_broker_id {
if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() {
// on min broker change
let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap();

self.on_min_broker_change(
min_broker_id,
min_broker_addr,
&change_header.offline_broker_addr,
&change_header.ha_broker_addr,
)
.await;
}
}
} else {
error!("Update min broker failed");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fix lock misuse: ignoring write-guard and read-under-write causes race/deadlock

  • The result of try_write_timeout(...) is discarded; you immediately reacquire a fresh write-lock later. This is TOCTOU and defeats the timeout.
  • In on_min_broker_change you hold a write lock and then await a read lock from the same RwLock (Lines 148-150) — this can deadlock.

Refactor to (a) compare against the tracked in-group value under a read lock, (b) avoid the unused try_write_timeout, and (c) compute should_start without re-locking:

@@
-            if self
-                .lock
-                .try_write_timeout(Duration::from_millis(3000))
-                .await
-                .is_some()
-            {
-                if let Some(min_broker_id) = change_header.min_broker_id {
-                    if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() {
-                        // on min broker change
-                        let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap();
-
-                        self.on_min_broker_change(
-                            min_broker_id,
-                            min_broker_addr,
-                            &change_header.offline_broker_addr,
-                            &change_header.ha_broker_addr,
-                        )
-                        .await;
-                    }
-                }
-            } else {
-                error!("Update min broker failed");
-            }
+            if let Some(min_broker_id) = change_header.min_broker_id {
+                let current = self.lock.read().await.min_broker_id_in_group;
+                if current != Some(min_broker_id) {
+                    let min_broker_addr = change_header
+                        .min_broker_addr
+                        .as_deref()
+                        .unwrap_or_default();
+                    self.on_min_broker_change(
+                        min_broker_id,
+                        min_broker_addr,
+                        &change_header.offline_broker_addr,
+                        &change_header.ha_broker_addr,
+                    )
+                    .await;
+                }
+            } else {
+                warn!("min_broker_id missing; skip update_min_broker");
+            }
@@
-        info!(
-            "Min broker changed, old: {}-{}, new {}-{}",
-            self.broker_runtime_inner.get_min_broker_id_in_group(),
-            self.broker_runtime_inner.get_broker_addr(),
-            min_broker_id,
-            min_broker_addr
-        );
-
-        let mut lock_guard = self.lock.write().await;
-        lock_guard.min_broker_id_in_group = Some(min_broker_id);
-        lock_guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));
-
-        let should_start = self.broker_runtime_inner.get_min_broker_id_in_group()
-            == self.lock.read().await.min_broker_id_in_group.unwrap();
+        let mut guard = self.lock.write().await;
+        let prev_id = guard.min_broker_id_in_group;
+        let prev_addr = guard.min_broker_addr_in_group.clone();
+        info!(
+            "Min broker changed, old: {:?}-{}, new {}-{}",
+            prev_id, prev_addr, min_broker_id, min_broker_addr
+        );
+        guard.min_broker_id_in_group = Some(min_broker_id);
+        guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));
+        drop(guard);
+
+        let should_start =
+            self.broker_runtime_inner.get_min_broker_id_in_group() == min_broker_id;

Also applies to: 136-151

Comment on lines +112 to +113
let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don't unwrap min_broker_addr

min_broker_addr may be absent; unwrap() panics. Use a safe default or branch.

Covered in the larger diff above by using as_deref().unwrap_or_default().

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
around lines 112 to 113, the code calls
change_header.min_broker_addr.as_deref().unwrap() which can panic if
min_broker_addr is None; replace this unwrap with a safe fallback or branch —
for example use as_deref().unwrap_or_default() to get an empty string when
absent or explicitly match/if-let on min_broker_addr to handle the None case and
proceed accordingly, ensuring no panics.

Comment on lines +198 to +221
let need_sync_master_flush_offset =
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store.get_master_flushed_offset() == 0x0000
&& self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup
} else {
false
};

if master_ha_addr.is_none() || need_sync_master_flush_offset {
if need_sync_master_flush_offset {
unimplemented!();
}

if master_ha_addr.is_none() {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
if let Some(_message_store) = broker_runtime_inner.message_store() {
unimplemented!("");
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Replace unimplemented!() with non-fatal behavior

unimplemented!() will panic the broker at runtime. Log and return until the implementation is ready.

-        if master_ha_addr.is_none() || need_sync_master_flush_offset {
-            if need_sync_master_flush_offset {
-                unimplemented!();
-            }
-
-            if master_ha_addr.is_none() {
-                let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
-                if let Some(_message_store) = broker_runtime_inner.message_store() {
-                    unimplemented!("");
-                }
-            }
-        }
+        if master_ha_addr.is_none() || need_sync_master_flush_offset {
+            if need_sync_master_flush_offset {
+                warn!("Syncing master flush offset is not implemented yet");
+            }
+            if master_ha_addr.is_none() {
+                let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
+                if let Some(_message_store) = broker_runtime_inner.message_store() {
+                    warn!("Updating message store with master HA addr is not implemented yet");
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let need_sync_master_flush_offset =
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store.get_master_flushed_offset() == 0x0000
&& self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup
} else {
false
};
if master_ha_addr.is_none() || need_sync_master_flush_offset {
if need_sync_master_flush_offset {
unimplemented!();
}
if master_ha_addr.is_none() {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
if let Some(_message_store) = broker_runtime_inner.message_store() {
unimplemented!("");
}
}
}
}
let need_sync_master_flush_offset =
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store.get_master_flushed_offset() == 0x0000
&& self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup
} else {
false
};
if master_ha_addr.is_none() || need_sync_master_flush_offset {
if need_sync_master_flush_offset {
warn!("Syncing master flush offset is not implemented yet");
}
if master_ha_addr.is_none() {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
if let Some(_message_store) = broker_runtime_inner.message_store() {
warn!("Updating message store with master HA addr is not implemented yet");
}
}
}
}
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
around lines 198 to 221, replace the unimplemented!() calls with non-fatal
behavior: log a warning or info including context (e.g., why master flush sync
or message_store branch is not implemented) and then return early from the
handler (or otherwise exit this code path) instead of panicking; ensure you use
the existing logger instance available on self or broker_runtime_inner, include
relevant variables in the message (like need_sync_master_flush_offset or whether
message_store is present), and avoid changing control flow beyond
returning/short-circuiting so the broker continues running.

Comment on lines +50 to +53
let exchange_request_header = request
.decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
.unwrap();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t unwrap on header decode; return a proper error response.

Unwrap will panic on malformed/missing headers. Decode errors should map to a non-Success ResponseCode.

-        let exchange_request_header = request
-            .decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
-            .unwrap();
+        let exchange_request_header = match request
+            .decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
+        {
+            Ok(h) => h,
+            Err(e) => {
+                let mut resp = RemotingCommand::default();
+                resp.set_code_ref(ResponseCode::SystemError);
+                resp.set_remark(format!("Failed to decode ExchangeHAInfoRequestHeader: {e}"));
+                return Some(resp);
+            }
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let exchange_request_header = request
.decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
.unwrap();
let exchange_request_header = match request
.decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
{
Ok(h) => h,
Err(e) => {
let mut resp = RemotingCommand::default();
resp.set_code_ref(ResponseCode::SystemError);
resp.set_remark(format!("Failed to decode ExchangeHAInfoRequestHeader: {e}"));
return Some(resp);
}
};
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
around lines 50-53, replace the unwrap on
request.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() with
proper error handling: attempt to decode the header and if it fails or is
missing, construct and return a RemotingCommand response with a non-success
ResponseCode (e.g., REQUEST_HEADER_NOT_WELL_FORMED or an equivalent bad-request
code), set a<codegen_instructions>
In
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
around lines 50-53, replace the unwrap on
request.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() with
proper error handling: attempt to decode the header and if it fails or is
missing, construct and return a RemotingCommand response with a non-success
ResponseCode (e.g., REQUEST_HEADER_NOT_WELL_FORMED or an equivalent bad-request
code), set the response remark to include the decode error or "missing header",
and return early instead of panicking; otherwise continue with the decoded
header.

Comment on lines +56 to +82
if let Some(master_ha_addr) = exchange_request_header.master_ha_address {
if !master_ha_addr.is_empty() {
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store
.update_ha_master_address(master_ha_addr.as_str())
.await;

let master_address = exchange_request_header.master_address.unwrap_or_default();
message_store.update_master_address(&master_address);

let is_sync_master_flush_offset_when_startip = self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup;
if message_store.get_master_flushed_offset() == 0x000
&& is_sync_master_flush_offset_when_startip
{
let master_flush_offset =
exchange_request_header.master_flush_offset.unwrap();

info!(
"Set master flush offset in slave to {}",
master_flush_offset
);
message_store.set_master_flushed_offset(master_flush_offset);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix partial-move bug, avoid unwraps, and update SlaveSynchronize too.

  • Using if let Some(master_ha_addr) = exchange_request_header.master_ha_address moves the field, preventing subsequent access to other fields—this won’t compile. Borrow with as_ref() or pattern-match on references.
  • Avoid unwrap() on master_flush_offset.
  • Keep SlaveSynchronize’s master address in sync via update_slave_master_addr.
-        if let Some(master_ha_addr) = exchange_request_header.master_ha_address {
-            if !master_ha_addr.is_empty() {
+        if let Some(master_ha_addr) = exchange_request_header.master_ha_address.as_ref() {
+            if !master_ha_addr.is_empty() {
                 if let Some(message_store) = self.broker_runtime_inner.message_store() {
                     message_store
                         .update_ha_master_address(master_ha_addr.as_str())
                         .await;
-
-                    let master_address = exchange_request_header.master_address.unwrap_or_default();
-                    message_store.update_master_address(&master_address);
-
-                    let is_sync_master_flush_offset_when_startip = self
+                    let master_address =
+                        exchange_request_header.master_address.clone().unwrap_or_default();
+                    message_store.update_master_address(&master_address);
+                    // keep SlaveSynchronize in sync
+                    let _ = self
+                        .broker_runtime_inner
+                        .update_slave_master_addr(master_address);
+
+                    let sync_master_flush_on_startup = self
                         .broker_runtime_inner
                         .message_store_config()
                         .sync_master_flush_offset_when_startup;
-                    if message_store.get_master_flushed_offset() == 0x000
-                        && is_sync_master_flush_offset_when_startip
+                    if message_store.get_master_flushed_offset() == 0
+                        && sync_master_flush_on_startup
                     {
-                        let master_flush_offset =
-                            exchange_request_header.master_flush_offset.unwrap();
-
-                        info!(
-                            "Set master flush offset in slave to {}",
-                            master_flush_offset
-                        );
-                        message_store.set_master_flushed_offset(master_flush_offset);
+                        if let Some(master_flush_offset) = exchange_request_header.master_flush_offset
+                        {
+                            info!("Set master flush offset in slave to {}", master_flush_offset);
+                            message_store.set_master_flushed_offset(master_flush_offset);
+                        }
                     }
                 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(master_ha_addr) = exchange_request_header.master_ha_address {
if !master_ha_addr.is_empty() {
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store
.update_ha_master_address(master_ha_addr.as_str())
.await;
let master_address = exchange_request_header.master_address.unwrap_or_default();
message_store.update_master_address(&master_address);
let is_sync_master_flush_offset_when_startip = self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup;
if message_store.get_master_flushed_offset() == 0x000
&& is_sync_master_flush_offset_when_startip
{
let master_flush_offset =
exchange_request_header.master_flush_offset.unwrap();
info!(
"Set master flush offset in slave to {}",
master_flush_offset
);
message_store.set_master_flushed_offset(master_flush_offset);
}
}
if let Some(master_ha_addr) = exchange_request_header.master_ha_address.as_ref() {
if !master_ha_addr.is_empty() {
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store
.update_ha_master_address(master_ha_addr.as_str())
.await;
let master_address =
exchange_request_header.master_address.clone().unwrap_or_default();
message_store.update_master_address(&master_address);
// keep SlaveSynchronize in sync
let _ = self
.broker_runtime_inner
.update_slave_master_addr(master_address);
let sync_master_flush_on_startup = self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup;
if message_store.get_master_flushed_offset() == 0
&& sync_master_flush_on_startup
{
if let Some(master_flush_offset) = exchange_request_header.master_flush_offset
{
info!("Set master flush offset in slave to {}", master_flush_offset);
message_store.set_master_flushed_offset(master_flush_offset);
}
}
}

Comment on lines +90 to +103
let response_header = response
.read_custom_header_mut::<ExchangeHaInfoResponseHeader>()
.unwrap();

response_header.master_ha_address =
Some(self.broker_runtime_inner.get_ha_server_addr());

if let Some(message_store) = self.broker_runtime_inner.message_store() {
response_header.master_flush_offset =
Some(message_store.get_broker_init_max_offset());
}
response_header.master_address =
Some(self.broker_runtime_inner.get_broker_addr().clone());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Response header construction likely incorrect; avoid read_custom_header_mut on a fresh response.

Calling read_custom_header_mut() on a default response is prone to Err/unwrap panic. Prefer the established pattern used elsewhere (e.g., build a header instance and attach it to the response via the RemotingCommand API).

Please confirm the canonical way to attach a response header in this codebase and update accordingly. Script to locate existing patterns:

Proposed shape (adjust to the actual API):

-                let response_header = response
-                    .read_custom_header_mut::<ExchangeHaInfoResponseHeader>()
-                    .unwrap();
-
-                response_header.master_ha_address =
-                    Some(self.broker_runtime_inner.get_ha_server_addr());
+                let mut header = ExchangeHaInfoResponseHeader {
+                    master_ha_address: Some(self.broker_runtime_inner.get_ha_server_addr()),
+                    master_flush_offset: None,
+                    master_address: Some(self.broker_runtime_inner.get_broker_addr().clone()),
+                };
 
-                if let Some(message_store) = self.broker_runtime_inner.message_store() {
-                    response_header.master_flush_offset =
-                        Some(message_store.get_broker_init_max_offset());
-                }
-                response_header.master_address =
-                    Some(self.broker_runtime_inner.get_broker_addr().clone());
+                if let Some(message_store) = self.broker_runtime_inner.message_store() {
+                    header.master_flush_offset = Some(message_store.get_broker_init_max_offset());
+                }
+                // Attach header to response (replace with the correct API)
+                // response.write_custom_header(&header);

🏁 Script executed:

#!/bin/bash
# Find how response headers are attached to RemotingCommand across the repo
rg -nP -C3 --type=rust '\b(RemotingCommand::create_response_command|write_custom_header|set_custom_header|make_custom_header|read_custom_header_mut<)' 

Length of output: 125690


Don't call read_custom_header_mut() on a fresh RemotingCommand — attach a header instead.

read_custom_header_mut() returns None for a newly-created RemotingCommand (unwrap will panic). Construct an ExchangeHaInfoResponseHeader, set master_ha_address / master_flush_offset / master_address, then attach it with RemotingCommand::create_response_command_with_header(header) or response.set_command_custom_header(header).
Location: rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs (≈ lines 90–103).

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
around lines 90 to 103, the code calls
response.read_custom_header_mut::<ExchangeHaInfoResponseHeader>().unwrap() on a
fresh RemotingCommand which returns None and will panic; instead instantiate an
ExchangeHaInfoResponseHeader, populate master_ha_address, master_flush_offset
(if message_store exists), and master_address from broker_runtime_inner, then
attach it to the response using
RemotingCommand::create_response_command_with_header(header) or
response.set_command_custom_header(header) (do not call read_custom_header_mut
or unwrap).

@watchgou watchgou closed this Sep 12, 2025
@watchgou watchgou deleted the broker_id_change branch September 15, 2025 13:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants