Skip to content

Commit a15cec2

Browse files
authored
Merge pull request #95 from gfanton/fix/ps-payload-handler
2 parents 748476e + ac252ea commit a15cec2

File tree

11 files changed

+224
-390
lines changed

11 files changed

+224
-390
lines changed

baseorbitdb/events.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package baseorbitdb
2+
3+
import (
4+
"berty.tech/go-orbit-db/iface"
5+
"github.com/libp2p/go-libp2p-core/peer"
6+
)
7+
8+
type EventExchangeHeads struct {
9+
Peer peer.ID
10+
Message *iface.MessageExchangeHeads
11+
}
12+
13+
func NewEventExchangeHeads(p peer.ID, msg *iface.MessageExchangeHeads) EventExchangeHeads {
14+
return EventExchangeHeads{
15+
Peer: p,
16+
Message: msg,
17+
}
18+
}

baseorbitdb/events_handler.go

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,23 @@ package baseorbitdb
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76

87
ipfslog "berty.tech/go-ipfs-log"
9-
"berty.tech/go-ipfs-log/enc"
8+
"berty.tech/go-ipfs-log/entry"
109
"berty.tech/go-orbit-db/iface"
1110
"berty.tech/go-orbit-db/stores"
12-
"go.uber.org/zap"
1311
)
1412

15-
func (o *orbitDB) handleEventPubSubPayload(ctx context.Context, e *iface.EventPubSubPayload, sharedKey enc.SharedKey) error {
16-
heads := &exchangedHeads{}
17-
payload := e.Payload
18-
19-
if sharedKey != nil {
20-
var err error
21-
22-
payload, err = sharedKey.Open(payload)
23-
if err != nil {
24-
return fmt.Errorf("unable to decrypt payload: %w", err)
25-
}
26-
}
27-
28-
err := json.Unmarshal(payload, &heads)
29-
if err != nil {
30-
o.logger.Error("unable to unmarshal heads", zap.Error(err))
31-
}
32-
33-
o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads.Heads), heads.Address))
34-
store, ok := o.getStore(heads.Address)
35-
36-
if !ok {
37-
return fmt.Errorf("heads from unknown store, skipping")
13+
func (o *orbitDB) handleEventExchangeHeads(ctx context.Context, e *iface.MessageExchangeHeads, store iface.Store) error {
14+
untypedHeads := make([]ipfslog.Entry, len(e.Heads))
15+
for i, h := range e.Heads {
16+
untypedHeads[i] = h
3817
}
3918

40-
if len(heads.Heads) > 0 {
41-
untypedHeads := make([]ipfslog.Entry, len(heads.Heads))
42-
for i := range heads.Heads {
43-
untypedHeads[i] = heads.Heads[i]
44-
}
19+
o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(untypedHeads), e.Address))
4520

21+
if len(untypedHeads) > 0 {
4622
if err := store.Sync(ctx, untypedHeads); err != nil {
4723
return fmt.Errorf("unable to sync heads: %w", err)
4824
}
@@ -51,7 +27,7 @@ func (o *orbitDB) handleEventPubSubPayload(ctx context.Context, e *iface.EventPu
5127
return nil
5228
}
5329

54-
func (o *orbitDB) handleEventWrite(ctx context.Context, e *stores.EventWrite, store Store, topic iface.PubSubTopic) error {
30+
func (o *orbitDB) handleEventWrite(ctx context.Context, e *stores.EventWrite, topic iface.PubSubTopic, store Store) error {
5531
o.logger.Debug("received stores.write event")
5632
if len(e.Heads) == 0 {
5733
return fmt.Errorf("'heads' are not defined")
@@ -64,19 +40,26 @@ func (o *orbitDB) handleEventWrite(ctx context.Context, e *stores.EventWrite, st
6440
}
6541

6642
if len(peer) > 0 {
67-
headsBytes, err := json.Marshal(e.Heads)
68-
if err != nil {
69-
return fmt.Errorf("unable to serialize heads %w", err)
43+
entries := make([]*entry.Entry, len(e.Heads))
44+
for i, head := range e.Heads {
45+
if entry, ok := head.(*entry.Entry); ok {
46+
entries[i] = entry
47+
} else {
48+
return fmt.Errorf("unable to unwrap entry")
49+
}
7050
}
7151

72-
if key := store.SharedKey(); key != nil {
73-
headsBytes, err = key.Seal(headsBytes)
74-
if err != nil {
75-
return fmt.Errorf("unable to encrypt heads %w", err)
76-
}
52+
msg := &iface.MessageExchangeHeads{
53+
Address: store.Address().String(),
54+
Heads: entries,
55+
}
56+
57+
payload, err := o.messageMarshaler.Marshal(msg)
58+
if err != nil {
59+
return fmt.Errorf("unable to serialize heads %w", err)
7760
}
7861

79-
err = topic.Publish(ctx, headsBytes)
62+
err = topic.Publish(ctx, payload)
8063
if err != nil {
8164
return fmt.Errorf("unable to publish message on pubsub %w", err)
8265
}

0 commit comments

Comments
 (0)