Skip to content

Intercept during routing #1864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 51 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fb64aad
pass messages by reference instead of by value
wyfo Mar 14, 2025
932ddd7
fix lints
wyfo Mar 14, 2025
86ce2ac
fix lints2
wyfo Mar 14, 2025
abb41bb
revert unexpected committed changes
wyfo Mar 14, 2025
cec21c6
fix doc
wyfo Mar 14, 2025
10a6b1c
Handle ingress PUSH messages
fuzzypixelz Mar 18, 2025
23609ec
Improve `InterceptorChain` logging
fuzzypixelz Mar 20, 2025
9562624
Handle egress PUSH messages
fuzzypixelz Mar 21, 2025
009e6e0
Fix typo
fuzzypixelz Mar 21, 2025
60caaf0
API changed...
fuzzypixelz Mar 31, 2025
864753e
Missing import
fuzzypixelz Mar 31, 2025
5879f99
Fix tests
fuzzypixelz Mar 31, 2025
384329c
Fix more tests and obscure features
fuzzypixelz Mar 31, 2025
ac21f8c
Fix `client_test`
fuzzypixelz Apr 3, 2025
ce58dee
Clippy error (1)
fuzzypixelz Apr 3, 2025
494144f
Refactoring
fuzzypixelz Apr 3, 2025
68ef1c7
Fix epic blunder
fuzzypixelz Apr 8, 2025
0841115
Fix imports (thanks @wyfo)
fuzzypixelz Apr 8, 2025
42f3f5f
We can't live without Clippy
fuzzypixelz Apr 8, 2025
fbeb06a
Handle `REQUEST`, `RESPONSE` & `RESPONSE_FINAL`
fuzzypixelz Apr 10, 2025
aecb43e
Merge eclipse-zenoh/zenoh@9ad5304e5
fuzzypixelz Apr 11, 2025
6874e91
Update Mux & DeMux
fuzzypixelz Apr 11, 2025
ff66265
Thanks `rustfmt`
fuzzypixelz Apr 11, 2025
a428dba
Fix `client_test`
fuzzypixelz Apr 11, 2025
1dffc57
Fix egress prefix mapping
fuzzypixelz Apr 11, 2025
489c871
Set `RoutingContext:prefix`
fuzzypixelz Apr 11, 2025
4118761
Handle `INTEREST` & `DECLARE`
fuzzypixelz Apr 11, 2025
923e31d
Fix 1.75 build errors
fuzzypixelz Apr 14, 2025
951a41b
Improve ACL logging
fuzzypixelz Apr 14, 2025
1971d9f
Move `FaceState::intercept_request` to `Face::intercept_request`
fuzzypixelz Apr 14, 2025
e92d3a6
Fix `stats` build
fuzzypixelz Apr 14, 2025
5fae354
Fix `client_test`, again...
fuzzypixelz Apr 14, 2025
0bae755
Add comment on `FaceState::intercept_*` methods
fuzzypixelz Apr 14, 2025
82fc9db
Fix `base_test`
fuzzypixelz Apr 14, 2025
bf55852
Fix `match_test`
fuzzypixelz Apr 14, 2025
567375d
Fix `test_acl_(liveliness)_query_ingress_deny`
fuzzypixelz Apr 15, 2025
270ff34
Remove question comment
fuzzypixelz Apr 16, 2025
eaf4216
Move `intercept_response(_final)` to `Face`
fuzzypixelz Apr 16, 2025
9b035e6
Move `intercept_push` to `Face`
fuzzypixelz Apr 16, 2025
714eef0
Move `intercept_declare` to `Face`
fuzzypixelz Apr 16, 2025
2bb67f8
Move `intercept_interest` to `Face`
fuzzypixelz Apr 16, 2025
5ffb3d4
Remove leftover comment
fuzzypixelz Apr 16, 2025
a37324c
Merge eclipse-zenoh/zenoh@83b874e42
fuzzypixelz Apr 16, 2025
f04cafc
Remove `NetworkMessageMut::size`
fuzzypixelz Apr 16, 2025
e889959
Keep `Tables` lock while running iceptors
fuzzypixelz Apr 17, 2025
ad860a1
Deduplicated code
fuzzypixelz Apr 17, 2025
4b53666
Remove interceptors from `McastMux`
fuzzypixelz Apr 17, 2025
fd353fa
Remove `QueryDirection`
fuzzypixelz Apr 17, 2025
cbf8100
Why indeed
fuzzypixelz Apr 17, 2025
5ab5856
Distinguish ingress/egress `Prefix`
fuzzypixelz Apr 17, 2025
34014ff
Set egress interceptors for `McastMux`
fuzzypixelz Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@
body,
}
}

#[inline]
pub fn wire_expr(&self) -> Option<&WireExpr> {
match &self.body {
DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
DeclareBody::UndeclareKeyExpr(_) => None,

Check warning on line 156 in commons/zenoh-protocol/src/network/declare.rs

View check run for this annotation

Codecov / codecov/patch

commons/zenoh-protocol/src/network/declare.rs#L156

Added line #L156 was not covered by tests
DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
DeclareBody::DeclareFinal(_) => None,
}
}
}

pub mod common {
Expand Down
92 changes: 9 additions & 83 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,104 +11,30 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{any::Any, sync::Arc};
use std::any::Any;

use arc_swap::ArcSwap;
use zenoh_link::Link;
use zenoh_protocol::network::{
ext, response, Declare, DeclareBody, DeclareFinal, NetworkBodyMut, NetworkMessageMut,
ResponseFinal,
};
use zenoh_protocol::network::{NetworkBodyMut, NetworkMessageMut};
use zenoh_result::ZResult;
use zenoh_transport::{unicast::TransportUnicast, TransportPeerEventHandler};

use super::Primitives;
use crate::net::routing::{
dispatcher::face::Face,
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
use crate::net::routing::dispatcher::face::Face;

pub struct DeMux {
face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) interceptor: Arc<ArcSwap<InterceptorsChain>>,
}

impl DeMux {
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
interceptor: Arc<ArcSwap<InterceptorsChain>>,
) -> Self {
Self {
face,
transport,
interceptor,
}
pub(crate) fn new(face: Face, transport: Option<TransportUnicast>) -> Self {
Self { face, transport }
}
}

impl TransportPeerEventHandler for DeMux {
#[inline]
fn handle_message(&self, mut msg: NetworkMessageMut) -> ZResult<()> {
let interceptor = self.interceptor.load();
if !interceptor.interceptors.is_empty() {
let mut ctx = RoutingContext::new_in(msg.as_mut(), self.face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache_guard = prefix
.as_ref()
.and_then(|p| p.get_ingress_cache(&self.face, &interceptor));
let cache = cache_guard.as_ref().and_then(|c| c.get_ref().as_ref());

match &ctx.msg.body {
NetworkBodyMut::Request(request) => {
let request_id = request.id;
if !interceptor.intercept(&mut ctx, cache) {
// request was blocked by an interceptor, we need to send response final to avoid timeout error
self.face
.state
.primitives
.send_response_final(&mut ResponseFinal {
rid: request_id,
ext_qos: response::ext::QoSType::RESPONSE_FINAL,
ext_tstamp: None,
});
return Ok(());
}
}
NetworkBodyMut::Interest(interest) => {
let interest_id = interest.id;
if !interceptor.intercept(&mut ctx, cache) {
// request was blocked by an interceptor, we need to send declare final to avoid timeout error
self.face
.state
.primitives
.send_declare(RoutingContext::new_in(
&mut Declare {
interest_id: Some(interest_id),
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareFinal(DeclareFinal),
},
self.face.clone(),
));
return Ok(());
}
}
_ => {
if !interceptor.intercept(&mut ctx, cache) {
return Ok(());
}
}
};
}

fn handle_message(&self, msg: NetworkMessageMut) -> ZResult<()> {
match msg.body {
NetworkBodyMut::Push(m) => self.face.send_push(m, msg.reliability),
NetworkBodyMut::Declare(m) => self.face.send_declare(m),
Expand All @@ -126,12 +52,12 @@ impl TransportPeerEventHandler for DeMux {
&self.face.tables,
m,
transport,
&mut |p, m| declares.push((p.clone(), m)),
&mut |p, m, r| declares.push((p.clone(), m, r)),
)?;
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
m.with_mut(|m| p.send_declare(m));
for (p, mut m, r) in declares {
p.intercept_declare(&mut m, r.as_ref());
}
}
}
Expand Down
Loading
Loading