Skip to content

Commit 77d267a

Browse files
committed
forward request messages to connected peers
Context: when a peer requests a document from us we often don't want to just check our own storage, we also want to ask all other peers we are connected to if they have the document. This is the case for e.g. a sync server which is relaying documents between two peers. Problem: the current immplementation just checks local storage Solution: when we receive a request, first send a request out to all the peers we are connected to and once they have all responded only then send a response to the requestor.
1 parent 958c54b commit 77d267a

File tree

7 files changed

+920
-275
lines changed

7 files changed

+920
-275
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,4 @@ autosurgeon = "0.8.0"
6565
bolero = { version = "0.10.0", features = ["arbitrary"] }
6666
arbitrary = { version = "1.3.1", features = ["derive"] }
6767
bolero-generator = { version = "0.10.0", features = ["arbitrary"] }
68+
rand = "0.8.5"

src/network_connect.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ impl RepoHandle {
2222
Str: Stream<Item = Result<Message, RecvErr>> + Send + 'static + Unpin,
2323
{
2424
let other_id = self.handshake(&mut stream, &mut sink, direction).await?;
25-
tracing::trace!(?other_id, repo_id=?self.get_repo_id(), "Handshake complete");
25+
tracing::trace!(?other_id, repo_id=?self.get_repo_id(), "handshake complete");
2626

2727
let stream = stream.map({
2828
let repo_id = self.get_repo_id().clone();
2929
move |msg| match msg {
3030
Ok(Message::Repo(repo_msg)) => {
31-
tracing::trace!(?repo_msg, repo_id=?repo_id, "Received repo message");
31+
tracing::trace!(?repo_msg, repo_id=?repo_id, "received repo message");
3232
Ok(repo_msg)
3333
}
3434
Ok(m) => {
35-
tracing::warn!(?m, repo_id=?repo_id, "Received non-repo message");
35+
tracing::warn!(?m, repo_id=?repo_id, "received non-repo message");
3636
Err(NetworkError::Error(
3737
"unexpected non-repo message".to_string(),
3838
))
@@ -48,12 +48,9 @@ impl RepoHandle {
4848
});
4949

5050
let sink = sink
51-
.with_flat_map::<RepoMessage, _, _>(|msg| match msg {
52-
RepoMessage::Sync { .. } => futures::stream::iter(vec![Ok(Message::Repo(msg))]),
53-
_ => futures::stream::iter(vec![]),
54-
})
51+
.with::<_, _, _, SendErr>(move |msg| futures::future::ready(Ok(Message::Repo(msg))))
5552
.sink_map_err(|e| {
56-
tracing::error!(?e, "Error sending repo message");
53+
tracing::error!(?e, "error sending repo message");
5754
NetworkError::Error(format!("error sending repo message: {}", e))
5855
});
5956

0 commit comments

Comments
 (0)