Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
715 changes: 683 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"rocketmq-cli",
"rocketmq-client",
"rocketmq-common",
"rocketmq-controller",
"rocketmq-error",
"rocketmq-example",
"rocketmq-filter",
Expand Down
79 changes: 79 additions & 0 deletions rocketmq-controller/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[package]
name = "rocketmq-controller"
version.workspace = true
authors.workspace = true
edition.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
readme = "README.md"
description = "RocketMQ Controller Module - High Availability Raft-based Controller"
keywords = ["rocketmq", "controller", "raft", "distributed", "messaging"]
categories = ["network-programming", "asynchronous"]

[dependencies]
# Core async runtime
tokio = { workspace = true }
tokio-util = { workspace = true }

# Raft consensus algorithm
raft = "0.7"
raft-proto = "0.7"
prost = "0.13"
protobuf = "2.28"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_info"] }

# Serialization
serde = { workspace = true }
serde_json = { workspace = true }
bincode = "1.3"

# Networking and protocols
bytes = { workspace = true }
prost-types = "0.13"
futures = "0.3"

# Concurrent data structures
dashmap = { workspace = true }
parking_lot = { workspace = true }

# Error handling
thiserror = { workspace = true }
anyhow = { workspace = true }

# Logging and tracing
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

# Time utilities
chrono = "0.4"

# Async trait support
async-trait = "0.1"

# Storage backends
rocksdb = { version = "0.22", optional = true }

# Internal dependencies (rocketmq-rust modules)
rocketmq-common = { workspace = true }
rocketmq-remoting = { workspace = true }
rocketmq-runtime = { workspace = true }
rocketmq-store = { workspace = true }
rocketmq-rust = { workspace = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_tokio"] }
tempfile = "3.10"
env_logger = "0.11"
proptest = "1.4"

[[bench]]
name = "controller_bench"
harness = false

[features]
default = ["storage-file"]
storage-rocksdb = ["rocksdb"]
storage-file = []
metrics = []
debug = []
148 changes: 148 additions & 0 deletions rocketmq-controller/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# RocketMQ Controller

RocketMQ Controller Module - High Availability Controller based on Raft

## Introduction

RocketMQ Controller is the core management component of RocketMQ cluster, responsible for:

- **Cluster Metadata Management**: Broker registration, Topic configuration, cluster configuration, etc.
- **High Availability**: Master-slave failover based on Raft consensus algorithm
- **Leader Election**: Automatic leader node election and failover
- **Data Consistency**: Ensures strong data consistency through Raft log replication

## Architecture

```
┌──────────────────────────────────────────┐
│ Controller Manager │
├──────────────────────────────────────────┤
│ │
│ ┌────────────┐ ┌────────────────────┐ │
│ │ Raft │ │ Metadata Store │ │
│ │ Controller │ │ │ │
│ │ │ │ - Broker Manager │ │
│ │ - Election │ │ - Topic Manager │ │
│ │ - Replica │ │ - Config Manager │ │
│ └────────────┘ └────────────────────┘ │
│ │
│ ┌────────────────────────────────────┐ │
│ │ Processor Manager │ │
│ │ │ │
│ │ - Register Broker │ │
│ │ - Heartbeat │ │
│ │ - Create/Update Topic │ │
│ │ - Query Metadata │ │
│ └────────────────────────────────────┘ │
└──────────────────────────────────────────┘
```

## Features

### ✅ Implemented
- Basic project structure
- Configuration management (ControllerConfig)
- Error handling (ControllerError)
- Raft controller framework
- Metadata storage (Broker, Topic, Config)
- Processor manager framework

### 🚧 In Progress
- Complete Raft node implementation
- Network communication layer
- RPC processor implementation

### 📋 Planned
- Persistent storage (RocksDB/custom logging)
- Snapshot management
- Complete integration tests
- Performance benchmarks
- Monitoring metrics

## Quick Start

### Basic Usage

```rust
use rocketmq_controller::*;

#[tokio::main]
async fn main() -> Result<()> {
// Create configuration
let config = ControllerConfig::new(
1, // node_id
"127.0.0.1:9876".parse().unwrap()
)
.with_raft_peers(vec![
RaftPeer { id: 1, addr: "127.0.0.1:9876".parse().unwrap() },
RaftPeer { id: 2, addr: "127.0.0.1:9877".parse().unwrap() },
RaftPeer { id: 3, addr: "127.0.0.1:9878".parse().unwrap() },
])
.with_storage_path("/data/controller".into());

// Create and start Controller
let manager = ControllerManager::new(config).await?;
manager.start().await?;

// Wait...

// Graceful shutdown
manager.shutdown().await?;
Ok(())
}
```

## Dependencies

Main dependencies:

- `raft-rs` - Raft consensus algorithm implementation
- `tokio` - Async runtime
- `dashmap` - Concurrent hash map
- `serde` - Serialization/deserialization
- `tracing` - Logging and tracing

## Development

### Build

```bash
cargo build -p rocketmq-controller
```

### 测试
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chinese characters '测试' (meaning 'test') should be replaced with English 'Test' to match the rest of the documentation.

Suggested change
### 测试
### Test

Copilot uses AI. Check for mistakes.

```bash
cargo test -p rocketmq-controller
```

### Benchmark

```bash
cargo bench -p rocketmq-controller
```

## Comparison with Java Version

| Feature | Java (DLedger) | Rust (raft-rs) |
|---------|---------------|----------------|
| Consensus Algorithm | DLedger | raft-rs |
| Async Model | Netty | Tokio |
| Concurrency Control | ConcurrentHashMap | DashMap |
| Error Handling | Exceptions | Result<T, E> |
| Type Safety | Runtime | Compile-time |

## Performance Goals

- Leader election latency: < 500ms
- Heartbeat throughput: > 10,000 ops/s
- Metadata write latency: < 10ms (p99)
- Metadata read latency: < 1ms (p99)

## Contributing

Contributions are welcome! Please see [CONTRIBUTING.md](../CONTRIBUTING.md).

## License

Licensed under Apache License 2.0 or MIT license, at your option.
37 changes: 37 additions & 0 deletions rocketmq-controller/benches/controller_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use criterion::{criterion_group, criterion_main, Criterion};

/// Controller benchmark placeholder
///
/// TODO: Implement actual benchmarks:
/// - Leader election latency
/// - Heartbeat processing throughput
/// - Metadata operation latency
/// - Raft log append performance
fn controller_bench(c: &mut Criterion) {
c.bench_function("placeholder", |b| {
b.iter(|| {
// Placeholder benchmark
1 + 1
})
});
}

criterion_group!(benches, controller_bench);
criterion_main!(benches);
Loading
Loading