Skip to content

Commit b6dfcf2

Browse files
authored
Merge pull request #48 from anyproto/GO-2836-coordinator-acl
GO-2836 coordinator acl
2 parents f1c7b29 + 7cc0a41 commit b6dfcf2

File tree

9 files changed

+434
-45
lines changed

9 files changed

+434
-45
lines changed

acl/acl.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package acl
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/anyproto/any-sync/app"
8+
"github.com/anyproto/any-sync/app/logger"
9+
"github.com/anyproto/any-sync/app/ocache"
10+
"github.com/anyproto/any-sync/commonspace/object/acl/list"
11+
"github.com/anyproto/any-sync/consensus/consensusclient"
12+
"github.com/anyproto/any-sync/consensus/consensusproto"
13+
"github.com/anyproto/any-sync/metric"
14+
"github.com/prometheus/client_golang/prometheus"
15+
)
16+
17+
const CName = "coordinator.acl"
18+
19+
var log = logger.NewNamed(CName)
20+
21+
func New() Acl {
22+
return &aclService{}
23+
}
24+
25+
type Acl interface {
26+
AddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (result *consensusproto.RawRecordWithId, err error)
27+
RecordsAfter(ctx context.Context, spaceId, aclHead string) (result []*consensusproto.RawRecordWithId, err error)
28+
app.ComponentRunnable
29+
}
30+
31+
type aclService struct {
32+
consService consensusclient.Service
33+
cache ocache.OCache
34+
}
35+
36+
func (as *aclService) Init(a *app.App) (err error) {
37+
as.consService = app.MustComponent[consensusclient.Service](a)
38+
39+
var metricReg *prometheus.Registry
40+
if m := a.Component(metric.CName); m != nil {
41+
metricReg = m.(metric.Metric).Registry()
42+
}
43+
as.cache = ocache.New(as.loadObject,
44+
ocache.WithTTL(5*time.Minute),
45+
ocache.WithLogger(log.Sugar()),
46+
ocache.WithPrometheus(metricReg, "coordinator", "acl"),
47+
)
48+
return
49+
}
50+
51+
func (as *aclService) Name() (name string) {
52+
return CName
53+
}
54+
55+
func (as *aclService) loadObject(ctx context.Context, id string) (ocache.Object, error) {
56+
return newAclObject(ctx, as.consService, id)
57+
}
58+
59+
func (as *aclService) get(ctx context.Context, spaceId string) (list.AclList, error) {
60+
obj, err := as.cache.Get(ctx, spaceId)
61+
if err != nil {
62+
return nil, err
63+
}
64+
aObj := obj.(*aclObject)
65+
aObj.lastUsage.Store(time.Now())
66+
return aObj.AclList, nil
67+
}
68+
69+
func (as *aclService) AddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (result *consensusproto.RawRecordWithId, err error) {
70+
acl, err := as.get(ctx, spaceId)
71+
if err != nil {
72+
return nil, err
73+
}
74+
acl.RLock()
75+
defer acl.RUnlock()
76+
err = acl.ValidateRawRecord(rec)
77+
if err != nil {
78+
return
79+
}
80+
81+
return as.consService.AddRecord(ctx, spaceId, rec)
82+
}
83+
84+
func (as *aclService) RecordsAfter(ctx context.Context, spaceId, aclHead string) (result []*consensusproto.RawRecordWithId, err error) {
85+
acl, err := as.get(ctx, spaceId)
86+
if err != nil {
87+
return nil, err
88+
}
89+
acl.RLock()
90+
defer acl.RUnlock()
91+
return acl.RecordsAfter(ctx, aclHead)
92+
}
93+
94+
func (as *aclService) Run(ctx context.Context) (err error) {
95+
return
96+
}
97+
98+
func (as *aclService) Close(ctx context.Context) (err error) {
99+
return as.cache.Close()
100+
}

acl/acl_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package acl
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/anyproto/any-sync/app"
9+
"github.com/anyproto/any-sync/commonspace/object/accountdata"
10+
"github.com/anyproto/any-sync/commonspace/object/acl/list"
11+
"github.com/anyproto/any-sync/consensus/consensusclient"
12+
"github.com/anyproto/any-sync/consensus/consensusclient/mock_consensusclient"
13+
"github.com/anyproto/any-sync/consensus/consensusproto"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"go.uber.org/mock/gomock"
17+
)
18+
19+
var ctx = context.Background()
20+
21+
func TestAclService_AddRecord(t *testing.T) {
22+
ownerKeys, err := accountdata.NewRandom()
23+
require.NoError(t, err)
24+
spaceId := "spaceId"
25+
ownerAcl, err := list.NewTestDerivedAcl(spaceId, ownerKeys)
26+
require.NoError(t, err)
27+
inv, err := ownerAcl.RecordBuilder().BuildInvite()
28+
require.NoError(t, err)
29+
30+
t.Run("success", func(t *testing.T) {
31+
fx := newFixture(t)
32+
defer fx.finish(t)
33+
34+
expRes := list.WrapAclRecord(inv.InviteRec)
35+
var watcherCh = make(chan consensusclient.Watcher)
36+
fx.consCl.EXPECT().Watch(spaceId, gomock.Any()).DoAndReturn(func(spaceId string, w consensusclient.Watcher) error {
37+
go func() {
38+
w.AddConsensusRecords([]*consensusproto.RawRecordWithId{
39+
ownerAcl.Root(),
40+
})
41+
watcherCh <- w
42+
}()
43+
return nil
44+
})
45+
46+
fx.consCl.EXPECT().AddRecord(ctx, spaceId, inv.InviteRec).Return(expRes, nil)
47+
fx.consCl.EXPECT().UnWatch(spaceId)
48+
49+
res, err := fx.AddRecord(ctx, spaceId, inv.InviteRec)
50+
assert.Equal(t, expRes, res)
51+
assert.NoError(t, err)
52+
53+
w := <-watcherCh
54+
w.AddConsensusRecords([]*consensusproto.RawRecordWithId{
55+
expRes,
56+
})
57+
})
58+
t.Run("error", func(t *testing.T) {
59+
fx := newFixture(t)
60+
defer fx.finish(t)
61+
62+
var testErr = errors.New("test")
63+
64+
fx.consCl.EXPECT().Watch(spaceId, gomock.Any()).DoAndReturn(func(spaceId string, w consensusclient.Watcher) error {
65+
go func() {
66+
w.AddConsensusError(testErr)
67+
}()
68+
return nil
69+
})
70+
fx.consCl.EXPECT().UnWatch(spaceId)
71+
72+
res, err := fx.AddRecord(ctx, spaceId, inv.InviteRec)
73+
assert.Nil(t, res)
74+
assert.EqualError(t, err, testErr.Error())
75+
})
76+
77+
}
78+
79+
func TestAclService_RecordsAfter(t *testing.T) {
80+
ownerKeys, err := accountdata.NewRandom()
81+
require.NoError(t, err)
82+
spaceId := "spaceId"
83+
ownerAcl, err := list.NewTestDerivedAcl(spaceId, ownerKeys)
84+
require.NoError(t, err)
85+
86+
fx := newFixture(t)
87+
defer fx.finish(t)
88+
89+
fx.consCl.EXPECT().Watch(spaceId, gomock.Any()).DoAndReturn(func(spaceId string, w consensusclient.Watcher) error {
90+
go func() {
91+
w.AddConsensusRecords([]*consensusproto.RawRecordWithId{
92+
ownerAcl.Root(),
93+
})
94+
}()
95+
return nil
96+
})
97+
fx.consCl.EXPECT().UnWatch(spaceId)
98+
99+
res, err := fx.RecordsAfter(ctx, spaceId, "")
100+
require.NoError(t, err)
101+
assert.Len(t, res, 1)
102+
}
103+
104+
func newFixture(t *testing.T) *fixture {
105+
ctrl := gomock.NewController(t)
106+
fx := &fixture{
107+
a: new(app.App),
108+
ctrl: ctrl,
109+
consCl: mock_consensusclient.NewMockService(ctrl),
110+
Acl: New(),
111+
}
112+
113+
fx.consCl.EXPECT().Name().Return(consensusclient.CName).AnyTimes()
114+
fx.consCl.EXPECT().Init(gomock.Any()).AnyTimes()
115+
fx.consCl.EXPECT().Run(gomock.Any()).AnyTimes()
116+
fx.consCl.EXPECT().Close(gomock.Any()).AnyTimes()
117+
118+
fx.a.Register(fx.consCl).Register(fx.Acl)
119+
120+
require.NoError(t, fx.a.Start(ctx))
121+
return fx
122+
}
123+
124+
type fixture struct {
125+
a *app.App
126+
ctrl *gomock.Controller
127+
consCl *mock_consensusclient.MockService
128+
Acl
129+
}
130+
131+
func (fx *fixture) finish(t *testing.T) {
132+
require.NoError(t, fx.a.Close(ctx))
133+
fx.ctrl.Finish()
134+
}

acl/object.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package acl
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/anyproto/any-sync/commonspace/object/acl/list"
9+
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage"
10+
"github.com/anyproto/any-sync/consensus/consensusclient"
11+
"github.com/anyproto/any-sync/consensus/consensusproto"
12+
"go.uber.org/atomic"
13+
"go.uber.org/zap"
14+
)
15+
16+
func newAclObject(ctx context.Context, cs consensusclient.Service, id string) (*aclObject, error) {
17+
obj := &aclObject{
18+
id: id,
19+
consService: cs,
20+
ready: make(chan struct{}),
21+
}
22+
if err := cs.Watch(id, obj); err != nil {
23+
return nil, err
24+
}
25+
select {
26+
case <-obj.ready:
27+
if obj.consErr != nil {
28+
_ = cs.UnWatch(id)
29+
return nil, obj.consErr
30+
}
31+
return obj, nil
32+
case <-ctx.Done():
33+
_ = cs.UnWatch(id)
34+
return nil, ctx.Err()
35+
}
36+
}
37+
38+
type aclObject struct {
39+
id string
40+
store liststorage.ListStorage
41+
list.AclList
42+
43+
ready chan struct{}
44+
consErr error
45+
consService consensusclient.Service
46+
47+
lastUsage atomic.Time
48+
49+
mu sync.Mutex
50+
}
51+
52+
func (a *aclObject) AddConsensusRecords(recs []*consensusproto.RawRecordWithId) {
53+
a.mu.Lock()
54+
defer a.mu.Unlock()
55+
if a.store == nil {
56+
defer close(a.ready)
57+
if a.store, a.consErr = liststorage.NewInMemoryAclListStorage(a.id, recs); a.consErr != nil {
58+
return
59+
}
60+
if a.AclList, a.consErr = list.BuildAclList(a.store, list.NoOpAcceptorVerifier{}); a.consErr != nil {
61+
return
62+
}
63+
} else {
64+
a.Lock()
65+
defer a.Unlock()
66+
if err := a.AddRawRecords(recs); err != nil {
67+
log.Warn("unable to add consensus records", zap.Error(err), zap.String("spaceId", a.id))
68+
return
69+
}
70+
}
71+
}
72+
73+
func (a *aclObject) AddConsensusError(err error) {
74+
a.mu.Lock()
75+
defer a.mu.Unlock()
76+
if a.store == nil {
77+
a.consErr = err
78+
close(a.ready)
79+
} else {
80+
log.Warn("got consensus error", zap.Error(err))
81+
}
82+
}
83+
84+
func (a *aclObject) Close() (err error) {
85+
return a.consService.UnWatch(a.id)
86+
}
87+
88+
func (a *aclObject) TryClose(objectTTL time.Duration) (res bool, err error) {
89+
if a.lastUsage.Load().Before(time.Now().Add(-objectTTL)) {
90+
return true, a.Close()
91+
}
92+
return false, nil
93+
}

cmd/coordinator/coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/anyproto/any-sync/app"
1515
"github.com/anyproto/any-sync/app/logger"
16+
"github.com/anyproto/any-sync/consensus/consensusclient"
1617
"github.com/anyproto/any-sync/metric"
1718
"github.com/anyproto/any-sync/net/peerservice"
1819
"github.com/anyproto/any-sync/net/pool"
@@ -25,6 +26,7 @@ import (
2526
"go.uber.org/zap"
2627

2728
"github.com/anyproto/any-sync-coordinator/account"
29+
"github.com/anyproto/any-sync-coordinator/acl"
2830
"github.com/anyproto/any-sync-coordinator/config"
2931
"github.com/anyproto/any-sync-coordinator/coordinator"
3032
"github.com/anyproto/any-sync-coordinator/coordinatorlog"
@@ -118,6 +120,8 @@ func Bootstrap(a *app.App) {
118120
Register(server.New()).
119121
Register(coordinatorlog.New()).
120122
Register(spacestatus.New()).
123+
Register(consensusclient.New()).
124+
Register(acl.New()).
121125
Register(filelimit.New()).
122126
Register(identityrepo.New()).
123127
Register(coordinator.New()).

coordinator/coordinator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"go.uber.org/zap"
2121
"storj.io/drpc"
2222

23+
"github.com/anyproto/any-sync-coordinator/acl"
2324
"github.com/anyproto/any-sync-coordinator/config"
2425
"github.com/anyproto/any-sync-coordinator/coordinatorlog"
2526
"github.com/anyproto/any-sync-coordinator/deletionlog"
@@ -60,6 +61,7 @@ type coordinator struct {
6061
metric metric.Metric
6162
fileLimit filelimit.FileLimit
6263
deletionLog deletionlog.DeletionLog
64+
acl acl.Acl
6365
}
6466

6567
func (c *coordinator) Init(a *app.App) (err error) {
@@ -73,6 +75,7 @@ func (c *coordinator) Init(a *app.App) (err error) {
7375
c.metric = a.MustComponent(metric.CName).(metric.Metric)
7476
c.fileLimit = a.MustComponent(filelimit.CName).(filelimit.FileLimit)
7577
c.deletionLog = app.MustComponent[deletionlog.DeletionLog](a)
78+
c.acl = app.MustComponent[acl.Acl](a)
7679
return coordinatorproto.DRPCRegisterCoordinator(a.MustComponent(server.CName).(drpc.Mux), h)
7780
}
7881

0 commit comments

Comments
 (0)