Skip to content

Commit ba4a571

Browse files
authored
chore: add more info logging to semaphore (#17831)
1 parent 1d9c97b commit ba4a571

File tree

4 files changed

+62
-1
lines changed

4 files changed

+62
-1
lines changed

src/meta/semaphore/src/acquirer/acquirer.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,11 @@ impl Acquirer {
150150
// Step 4: Wait for the semaphore to be acquired or removed.
151151

152152
while let Some(sem_event) = self.permit_event_rx.recv().await {
153-
debug!("semaphore event: {:?}", sem_event);
153+
info!(
154+
"Acquirer({}): received semaphore event: {:?}",
155+
self.ctx, sem_event
156+
);
157+
154158
match sem_event {
155159
PermitEvent::Acquired((seq, _)) => {
156160
if seq == permit_key.seq {

src/meta/semaphore/src/meta_event_subscriber/processor.rs

+10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use codeq::Decode;
1616
use databend_common_meta_types::protobuf::WatchResponse;
1717
use databend_common_meta_types::SeqV;
18+
use log::info;
1819
use log::warn;
1920
use tokio::sync::mpsc;
2021

@@ -74,6 +75,11 @@ impl Processor {
7475
prev: Option<SeqV<PermitEntry>>,
7576
current: Option<SeqV<PermitEntry>>,
7677
) -> Result<(), ConnectionClosed> {
78+
info!(
79+
"{} processing kv change: {}: {:?} -> {:?}",
80+
self.ctx, sem_key, prev, current
81+
);
82+
7783
// Update local queue to update the acquired/released state.
7884
let state_changes = match (prev, current) {
7985
(None, Some(entry)) => self.queue.insert(sem_key.seq, entry.data),
@@ -89,7 +95,11 @@ impl Processor {
8995
}
9096
};
9197

98+
info!("{} queue state: {}", self.ctx, self.queue);
99+
92100
for event in state_changes {
101+
info!("{} sending event: {}", self.ctx, event);
102+
93103
self.tx.send(event).await.map_err(|e| {
94104
ConnectionClosed::new_str(format!("Semaphore-Watcher fail to send {}", e.0))
95105
.context(&self.ctx)

src/meta/semaphore/src/meta_event_subscriber/subscriber.rs

+10
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ impl MetaEventSubscriber {
7474
.context(&self.ctx)
7575
})?;
7676

77+
info!(
78+
"{} watch stream created: [{}, {})",
79+
self.ctx, self.left, self.right
80+
);
81+
7782
Ok(strm)
7883
}
7984

@@ -116,6 +121,11 @@ impl MetaEventSubscriber {
116121
}
117122
};
118123

124+
info!(
125+
"{} received event from watch-stream: {:?}",
126+
self.ctx, watch_result
127+
);
128+
119129
let Some(watch_response) = watch_result? else {
120130
// TODO: add retry connecting.
121131
error!("watch-stream closed: {}", self.ctx);

src/meta/semaphore/src/queue/semaphore_queue.rs

+37
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::fmt;
1617

1718
use crate::queue::semaphore_event::PermitEvent;
1819
use crate::PermitEntry;
@@ -32,6 +33,27 @@ pub struct SemaphoreQueue {
3233
waiting: BTreeMap<PermitSeq, PermitEntry>,
3334
}
3435

36+
impl fmt::Display for SemaphoreQueue {
37+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38+
write!(f, "SemaphoreQueue{{ {}/{}", self.size, self.capacity,)?;
39+
40+
write!(f, ", acquired: [")?;
41+
for (seq, entry) in &self.acquired {
42+
write!(f, "{}:{} ", seq, entry)?;
43+
}
44+
write!(f, "]")?;
45+
46+
write!(f, ", waiting: [")?;
47+
for (seq, entry) in &self.waiting {
48+
write!(f, "{}:{} ", seq, entry)?;
49+
}
50+
write!(f, "]")?;
51+
52+
write!(f, "}}")?;
53+
Ok(())
54+
}
55+
}
56+
3557
impl SemaphoreQueue {
3658
pub fn new(capacity: u64) -> Self {
3759
SemaphoreQueue {
@@ -152,6 +174,21 @@ mod tests {
152174
use crate::queue::*;
153175
use crate::PermitEntry;
154176

177+
#[test]
178+
fn test_display() {
179+
let queue = SemaphoreQueue {
180+
size: 10,
181+
capacity: 20,
182+
acquired: BTreeMap::from([(1, ent("t1", 3)), (2, ent("t2", 4))]),
183+
waiting: BTreeMap::from([(3, ent("t3", 5)), (4, ent("t4", 6))]),
184+
};
185+
186+
assert_eq!(
187+
format!("{}", queue),
188+
"SemaphoreQueue{ 10/20, acquired: [1:PermitEntry(id:t1, n:3) 2:PermitEntry(id:t2, n:4) ], waiting: [3:PermitEntry(id:t3, n:5) 4:PermitEntry(id:t4, n:6) ]}"
189+
);
190+
}
191+
155192
#[test]
156193
fn test_insert() {
157194
// Test case 1: Insert when there is enough capacity

0 commit comments

Comments
 (0)