Skip to content

Commit b168d8e

Browse files
committed
fix subscribe multiple times to the same channel
when subscribing to the same channel on the same client, an error will be returned
1 parent 95f26fd commit b168d8e

File tree

2 files changed

+31
-13
lines changed

2 files changed

+31
-13
lines changed

src/network/network_handler.rs

+24-8
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,11 @@ impl NetworkHandler {
412412
Status::Subscribed => {
413413
if let Some(resp_buf) = self.try_match_pubsub_message(result).await {
414414
self.receive_result(resp_buf);
415-
if self.subscriptions.is_empty() {
415+
if self.subscriptions.is_empty() && self.pending_subscriptions.is_empty() {
416+
debug!(
417+
"[{}] goint out from Pub/Sub state to connected state",
418+
self.tag
419+
);
416420
self.status = Status::Connected;
417421
}
418422
}
@@ -591,24 +595,36 @@ impl NetworkHandler {
591595
| RefPubSubMessage::SSubscribe(channel_or_pattern) => {
592596
if let Some(pending_sub) = self.pending_subscriptions.pop_front() {
593597
if pending_sub.channel_or_pattern == channel_or_pattern {
594-
self.subscriptions.insert(
595-
channel_or_pattern.to_vec(),
596-
(pending_sub.subscription_type, pending_sub.sender),
597-
);
598+
if self
599+
.subscriptions
600+
.insert(
601+
channel_or_pattern.to_vec(),
602+
(pending_sub.subscription_type, pending_sub.sender),
603+
)
604+
.is_some()
605+
{
606+
return Some(Err(Error::Client(
607+
format!(
608+
"There is already a subscription on channel '{}'",
609+
String::from_utf8_lossy(channel_or_pattern)
610+
)
611+
.to_string(),
612+
)));
613+
}
598614

599615
if pending_sub.more_to_come {
600616
return None;
601617
}
602618
} else {
603619
error!(
604-
"[{}] Unexpected subscription confirmation on channel '{:?}'",
620+
"[{}] Unexpected subscription confirmation on channel '{}'",
605621
self.tag,
606622
String::from_utf8_lossy(channel_or_pattern)
607623
);
608624
}
609625
} else {
610626
error!(
611-
"[{}] Cannot find pending subscription for channel '{:?}'",
627+
"[{}] Cannot find pending subscription for channel '{}'",
612628
self.tag,
613629
String::from_utf8_lossy(channel_or_pattern)
614630
);
@@ -666,7 +682,7 @@ impl NetworkHandler {
666682
}
667683
None => {
668684
error!(
669-
"[{}] Unexpected message on channel '{:?}' for pattern '{:?}'",
685+
"[{}] Unexpected message on channel '{}' for pattern '{}'",
670686
self.tag,
671687
String::from_utf8_lossy(channel),
672688
String::from_utf8_lossy(pattern)

src/tests/pub_sub_commands.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -769,11 +769,10 @@ async fn split() -> Result<()> {
769769
Ok(())
770770
}
771771

772-
773772
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
774773
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
775774
#[serial]
776-
async fn subscribe_twice() -> Result<()> {
775+
async fn subscribe_multiple_times_to_the_same_channel() -> Result<()> {
777776
let pub_sub_client = get_test_client().await?;
778777
let regular_client = get_test_client().await?;
779778

@@ -782,12 +781,15 @@ async fn subscribe_twice() -> Result<()> {
782781

783782
let mut pub_sub_stream = pub_sub_client.subscribe("mychannel").await?;
784783
assert!(pub_sub_stream.subscribe("mychannel").await.is_err());
784+
assert!(pub_sub_client.subscribe("mychannel").await.is_err());
785785

786786
pub_sub_stream.psubscribe("pattern").await?;
787787
assert!(pub_sub_stream.psubscribe("pattern").await.is_err());
788+
assert!(pub_sub_client.psubscribe("pattern").await.is_err());
788789

789-
pub_sub_stream.ssubscribe("mychannel").await?;
790-
assert!(pub_sub_stream.ssubscribe("mychannel").await.is_err());
790+
pub_sub_stream.ssubscribe("myshardchannel").await?;
791+
assert!(pub_sub_stream.ssubscribe("myshardchannel").await.is_err());
792+
assert!(pub_sub_client.ssubscribe("myshardchannel").await.is_err());
791793

792794
Ok(())
793-
}
795+
}

0 commit comments

Comments
 (0)