Skip to content

Commit f7788a7

Browse files
committed
routing/http/server: add support for GetClosestPeers
This adds the SERVER-side for GetClosestPeers. Since FindPeers also returns PeerRecords, it is essentially a copy-paste, minus things like addrFilters which don't apply here, plus `count` and `closerThan` parsing from the query URL. The tests as well. We leave all logic regarding count/closerThan to the ContentRouter (the DHT, or the Kubo wrapper around it). Spec: ipfs/specs#476
1 parent 220a690 commit f7788a7

File tree

2 files changed

+467
-28
lines changed

2 files changed

+467
-28
lines changed

routing/http/server/server.go

Lines changed: 121 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"mime"
1111
"net/http"
12+
"strconv"
1213
"strings"
1314
"sync/atomic"
1415
"time"
@@ -42,15 +43,17 @@ const (
4243
DefaultRecordsLimit = 20
4344
DefaultStreamingRecordsLimit = 0
4445
DefaultRoutingTimeout = 30 * time.Second
46+
DefaultGetClosestPeersCount = 20
4547
)
4648

4749
var logger = logging.Logger("routing/http/server")
4850

4951
const (
50-
providePath = "/routing/v1/providers/"
51-
findProvidersPath = "/routing/v1/providers/{cid}"
52-
findPeersPath = "/routing/v1/peers/{peer-id}"
53-
getIPNSPath = "/routing/v1/ipns/{cid}"
52+
providePath = "/routing/v1/providers/"
53+
findProvidersPath = "/routing/v1/providers/{cid}"
54+
findPeersPath = "/routing/v1/peers/{peer-id}"
55+
getIPNSPath = "/routing/v1/ipns/{cid}"
56+
getClosestPeersPath = "/routing/v1/dht/closest/peers/{peer-id}"
5457
)
5558

5659
type FindProvidersAsyncResponse struct {
@@ -78,6 +81,10 @@ type ContentRouter interface {
7881
// PutIPNS stores the provided [ipns.Record] for the given [ipns.Name].
7982
// It is guaranteed that the record matches the provided name.
8083
PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error
84+
85+
// GetClosestPeers returns the DHT closest peers to the given peer ID.
86+
// If empty, it will use the content router's peer ID (self). `closerThan` (optional) forces resulting records to be closer to `PeerID` than to `closerThan`. `count` specifies how many records to return ([1,100], with 20 as default when set to 0).
87+
GetClosestPeers(ctx context.Context, peerID, closerThan peer.ID, count int) (iter.ResultIter[*types.PeerRecord], error)
8188
}
8289

8390
// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]:
@@ -183,6 +190,7 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler {
183190
r.Handle(findPeersPath, middlewarestd.Handler(findPeersPath, mdlw, http.HandlerFunc(server.findPeers))).Methods(http.MethodGet)
184191
r.Handle(getIPNSPath, middlewarestd.Handler(getIPNSPath, mdlw, http.HandlerFunc(server.GetIPNS))).Methods(http.MethodGet)
185192
r.Handle(getIPNSPath, middlewarestd.Handler(getIPNSPath, mdlw, http.HandlerFunc(server.PutIPNS))).Methods(http.MethodPut)
193+
r.Handle(getClosestPeersPath, middlewarestd.Handler(getClosestPeersPath, mdlw, http.HandlerFunc(server.getClosestPeers))).Methods(http.MethodGet)
186194

187195
return r
188196
}
@@ -313,30 +321,7 @@ func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.Result
313321

314322
func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
315323
pidStr := mux.Vars(r)["peer-id"]
316-
317-
// While specification states that peer-id is expected to be in CIDv1 format, reality
318-
// is the clients will often learn legacy PeerID string from other sources,
319-
// and try to use it.
320-
// See https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation
321-
// We are liberal in inputs here, and uplift legacy PeerID to CID if necessary.
322-
// Rationale: it is better to fix this common mistake than to error and break peer routing.
323-
324-
// Attempt to parse PeerID
325-
pid, err := peer.Decode(pidStr)
326-
if err != nil {
327-
// Retry by parsing PeerID as CID, then setting codec to libp2p-key
328-
// and turning that back to PeerID.
329-
// This is necessary to make sure legacy keys like:
330-
// - RSA QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
331-
// - ED25519 12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA
332-
// are parsed correctly.
333-
pidAsCid, err2 := cid.Decode(pidStr)
334-
if err2 == nil {
335-
pidAsCid = cid.NewCidV1(cid.Libp2pKey, pidAsCid.Hash())
336-
pid, err = peer.FromCid(pidAsCid)
337-
}
338-
}
339-
324+
pid, err := parsePeerID(pidStr)
340325
if err != nil {
341326
writeErr(w, "FindPeers", http.StatusBadRequest, fmt.Errorf("unable to parse PeerID %q: %w", pidStr, err))
342327
return
@@ -608,6 +593,88 @@ func (s *server) PutIPNS(w http.ResponseWriter, r *http.Request) {
608593
w.WriteHeader(http.StatusOK)
609594
}
610595

596+
func (s *server) getClosestPeers(w http.ResponseWriter, r *http.Request) {
597+
pidStr := mux.Vars(r)["peer-id"]
598+
pid, err := parsePeerID(pidStr)
599+
if err != nil {
600+
writeErr(w, "GetClosestPeers", http.StatusBadRequest, fmt.Errorf("unable to parse PeerID %q: %w", pidStr, err))
601+
return
602+
}
603+
604+
query := r.URL.Query()
605+
closerThanStr := query.Get("closerThan")
606+
var closerThanPid peer.ID
607+
if closerThanStr != "" { // it is fine to omit. We will pass an empty peer.ID then.
608+
closerThanPid, err = parsePeerID(closerThanStr)
609+
if err != nil {
610+
writeErr(w, "GetClosestPeers", http.StatusBadRequest, fmt.Errorf("unable to parse closer-than PeerID %q: %w", pidStr, err))
611+
return
612+
}
613+
}
614+
615+
countStr := query.Get("count")
616+
count, err := strconv.Atoi(countStr)
617+
if err != nil {
618+
count = 0
619+
}
620+
if count > 100 {
621+
count = 100
622+
}
623+
// If limit is still 0, set THE default.
624+
if count <= 0 {
625+
count = DefaultGetClosestPeersCount
626+
}
627+
628+
mediaType, err := s.detectResponseType(r)
629+
if err != nil {
630+
writeErr(w, "GetClosestPeers", http.StatusBadRequest, err)
631+
return
632+
}
633+
634+
var (
635+
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord])
636+
)
637+
638+
if mediaType == mediaTypeNDJSON {
639+
handlerFunc = s.getClosestPeersNDJSON
640+
} else {
641+
handlerFunc = s.getClosestPeersJSON
642+
}
643+
644+
// Add timeout to the routing operation
645+
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
646+
defer cancel()
647+
648+
provIter, err := s.svc.GetClosestPeers(ctx, pid, closerThanPid, count)
649+
if err != nil {
650+
if errors.Is(err, routing.ErrNotFound) {
651+
// handlerFunc takes care of setting the 404 and necessary headers
652+
provIter = iter.FromSlice([]iter.Result[*types.PeerRecord]{})
653+
} else {
654+
writeErr(w, "GetClosestPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
655+
return
656+
}
657+
}
658+
handlerFunc(w, provIter)
659+
}
660+
661+
func (s *server) getClosestPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) {
662+
defer peersIter.Close()
663+
peers, err := iter.ReadAllResults(peersIter)
664+
if err != nil {
665+
writeErr(w, "GetClosestPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
666+
return
667+
}
668+
669+
writeJSONResult(w, "FindPeers", jsontypes.PeersResponse{
670+
Peers: peers,
671+
})
672+
}
673+
674+
func (s *server) getClosestPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) {
675+
writeResultsIterNDJSON(w, peersIter)
676+
}
677+
611678
var (
612679
// Rule-of-thumb Cache-Control policy is to work well with caching proxies and load balancers.
613680
// If there are any results, cache on the client for longer, and hint any in-between caches to
@@ -618,6 +685,32 @@ var (
618685
maxStale = int((48 * time.Hour).Seconds()) // allow stale results as long within Amino DHT Expiration window
619686
)
620687

688+
func parsePeerID(pidStr string) (peer.ID, error) {
689+
// While specification states that peer-id is expected to be in CIDv1 format, reality
690+
// is the clients will often learn legacy PeerID string from other sources,
691+
// and try to use it.
692+
// See https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation
693+
// We are liberal in inputs here, and uplift legacy PeerID to CID if necessary.
694+
// Rationale: it is better to fix this common mistake than to error and break peer routing.
695+
696+
// Attempt to parse PeerID
697+
pid, err := peer.Decode(pidStr)
698+
if err != nil {
699+
// Retry by parsing PeerID as CID, then setting codec to libp2p-key
700+
// and turning that back to PeerID.
701+
// This is necessary to make sure legacy keys like:
702+
// - RSA QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
703+
// - ED25519 12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA
704+
// are parsed correctly.
705+
pidAsCid, err2 := cid.Decode(pidStr)
706+
if err2 == nil {
707+
pidAsCid = cid.NewCidV1(cid.Libp2pKey, pidAsCid.Hash())
708+
pid, err = peer.FromCid(pidAsCid)
709+
}
710+
}
711+
return pid, err
712+
}
713+
621714
func setCacheControl(w http.ResponseWriter, maxAge int, stale int) {
622715
w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d, stale-while-revalidate=%d, stale-if-error=%d", maxAge, stale, stale))
623716
}

0 commit comments

Comments
 (0)