Skip to content

Commit fcd1abc

Browse files
committed
parse consistency from headers on kube request
1 parent b6e97eb commit fcd1abc

File tree

5 files changed

+214
-18
lines changed

5 files changed

+214
-18
lines changed

pkg/authz/check.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ func runAllMatchingChecks(ctx context.Context, matchingRules []*rules.RunnableRu
2828
return err
2929
}
3030
req := &v1.CheckPermissionRequest{
31-
Consistency: &v1.Consistency{
32-
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
33-
},
31+
Consistency: input.Consistency,
3432
Resource: &v1.ObjectReference{
3533
ObjectType: rel.ResourceType,
3634
ObjectId: rel.ResourceID,

pkg/authz/filter.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"slices"
10+
"time"
1011

1112
"github.com/kyverno/go-jmespath"
1213
"k8s.io/klog/v2"
@@ -79,9 +80,7 @@ func filterList(ctx context.Context, client v1.PermissionsServiceClient, filter
7980
defer close(authzData.removedNNC)
8081

8182
req := &v1.LookupResourcesRequest{
82-
Consistency: &v1.Consistency{
83-
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
84-
},
83+
Consistency: input.Consistency,
8584
ResourceObjectType: filter.Rel.ResourceType,
8685
Permission: filter.Rel.ResourceRelation,
8786
Subject: &v1.SubjectReference{
@@ -173,6 +172,10 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC
173172
defer close(authzData.allowedNNC)
174173
defer close(authzData.removedNNC)
175174

175+
logger := klog.LoggerWithValues(klog.FromContext(ctx), "request", "watch", "filter", filter).WithCallDepth(1)
176+
177+
logger.V(3).Info("started watch")
178+
176179
watchResource, err := watchClient.Watch(ctx, &v1.WatchRequest{
177180
OptionalObjectTypes: []string{filter.Rel.ResourceType},
178181
})
@@ -187,14 +190,18 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC
187190
}
188191

189192
if err != nil {
190-
fmt.Println(err)
193+
logger.V(2).Error(err, "watch error")
191194
return
192195
}
193196

197+
time.Sleep(input.WatchDelay)
198+
194199
for _, u := range resp.Updates {
195200
cr, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{
196201
Consistency: &v1.Consistency{
197-
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
202+
Requirement: &v1.Consistency_AtLeastAsFresh{
203+
AtLeastAsFresh: resp.ChangesThrough,
204+
},
198205
},
199206
Resource: &v1.ObjectReference{
200207
ObjectType: filter.Rel.ResourceType,
@@ -211,46 +218,45 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC
211218
},
212219
})
213220
if err != nil {
214-
fmt.Println(err)
221+
logger.V(2).Error(err, "check permission error")
215222
return
216223
}
217224

218225
byteIn, err := json.Marshal(wrapper{ResourceID: u.Relationship.Resource.ObjectId, SubjectID: u.Relationship.Subject.Object.ObjectId})
219226
if err != nil {
220-
fmt.Println(err)
227+
logger.V(2).Error(err, "marshal error")
221228
return
222229
}
223230
var data any
224231
if err := json.Unmarshal(byteIn, &data); err != nil {
225-
fmt.Println(err)
232+
logger.V(2).Error(err, "unmarshal error")
226233
return
227234
}
228-
fmt.Println(data)
229-
fmt.Println("RESPONSE", string(byteIn))
235+
236+
logger.V(5).Info("response", "data", data)
230237

231238
name, err := filter.Name.Search(data)
232239
if err != nil {
233-
fmt.Println(err)
240+
klog.V(2).ErrorS(err, "error extracting name")
234241
return
235242
}
236-
fmt.Println("GOT NAME", name)
237243
if name == nil || len(name.(string)) == 0 {
238244
return
239245
}
240246
namespace, err := filter.Namespace.Search(data)
241247
if err != nil {
242-
fmt.Println(err)
248+
logger.V(2).Error(err, "namespace extract error")
243249
return
244250
}
245-
fmt.Println("GOT NAMESPACE", namespace)
246251
if namespace == nil {
247252
namespace = ""
248253
}
249254
nn := types.NamespacedName{Name: name.(string), Namespace: namespace.(string)}
255+
logger.V(4).Info("response object", "namespacedName", nn.String())
250256

251257
// TODO: this should really be over a single channel to prevent
252258
// races on add/remove
253-
fmt.Println(u.Relationship.Resource.ObjectId, cr.Permissionship)
259+
logger.V(4).Info("result", "object", u.Relationship.Resource.ObjectId, "permission", cr.Permissionship)
254260
if cr.Permissionship == v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION {
255261
authzData.allowedNNC <- nn
256262
} else {

pkg/rules/consistency.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package rules
2+
3+
import (
4+
"net/http"
5+
"strings"
6+
"time"
7+
8+
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
9+
)
10+
11+
const (
12+
WatchDelayKey = "SpiceDB-Watch-Delay"
13+
ConsistencyHeaderKey = "SpiceDB-Consistency"
14+
FullyConsistentHeaderValue = "Full"
15+
AtExactSnapshotHeaderValuePrefix = "Exact "
16+
AtLeastAsFreshHeaderValuePrefix = "At-Least "
17+
)
18+
19+
var MinimizeLatency = &v1.Consistency{
20+
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
21+
}
22+
23+
var FullyConsistent = &v1.Consistency{
24+
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
25+
}
26+
27+
// ConsistencyFromHeaders returns a consistency block given a consistency
28+
// header. Defaults to Minimize Latency, otherwise returns the first match in order:
29+
// - Fully Consistent
30+
// - At Exact Snapshot
31+
// - At Least As Fresh
32+
// - Minimize Latency
33+
func ConsistencyFromHeaders(headers http.Header) *v1.Consistency {
34+
consistencyValue := headers.Get(ConsistencyHeaderKey)
35+
if len(consistencyValue) == 0 {
36+
return MinimizeLatency
37+
}
38+
39+
switch consistencyValue[0] {
40+
case 'F':
41+
fallthrough
42+
case 'f':
43+
return FullyConsistent
44+
case 'E':
45+
fallthrough
46+
case 'e':
47+
return &v1.Consistency{
48+
Requirement: &v1.Consistency_AtExactSnapshot{
49+
AtExactSnapshot: &v1.ZedToken{Token: strings.TrimPrefix(consistencyValue, AtExactSnapshotHeaderValuePrefix)},
50+
},
51+
}
52+
case 'A':
53+
fallthrough
54+
case 'a':
55+
return &v1.Consistency{
56+
Requirement: &v1.Consistency_AtLeastAsFresh{
57+
AtLeastAsFresh: &v1.ZedToken{Token: strings.TrimPrefix(consistencyValue, AtLeastAsFreshHeaderValuePrefix)},
58+
},
59+
}
60+
}
61+
62+
return MinimizeLatency
63+
}
64+
65+
// WatchDelayFromHeaders returns a time duration from the delay header.
66+
// This is used to delay sending the check request to SpiceDB on a watch event,
67+
// so that non-fully-consistent modes can be used with watch.
68+
func WatchDelayFromHeaders(headers http.Header) time.Duration {
69+
delay := headers.Get(WatchDelayKey)
70+
if len(delay) == 0 {
71+
return 0
72+
}
73+
74+
duration, err := time.ParseDuration(delay)
75+
if err != nil {
76+
return 0
77+
}
78+
return duration
79+
}

pkg/rules/consistency_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package rules
2+
3+
import (
4+
"net/http"
5+
"reflect"
6+
"testing"
7+
"time"
8+
9+
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
10+
"github.com/authzed/spicedb/pkg/datastore/revision"
11+
"github.com/authzed/spicedb/pkg/zedtoken"
12+
"github.com/shopspring/decimal"
13+
)
14+
15+
func MustIntZedToken(t int64) string {
16+
return zedtoken.MustNewFromRevision(revision.NewFromDecimal(decimal.NewFromInt(t))).String()
17+
}
18+
19+
func TestConsistencyFromHeader(t *testing.T) {
20+
tests := []struct {
21+
name string
22+
headers http.Header
23+
want *v1.Consistency
24+
}{
25+
{
26+
name: "defaults to minimize latency",
27+
want: MinimizeLatency,
28+
},
29+
{
30+
name: "fully consistent",
31+
headers: map[string][]string{
32+
ConsistencyHeaderKey: {FullyConsistentHeaderValue},
33+
},
34+
want: FullyConsistent,
35+
},
36+
{
37+
name: "at exact snapshot",
38+
headers: map[string][]string{
39+
ConsistencyHeaderKey: {AtExactSnapshotHeaderValuePrefix + MustIntZedToken(5)},
40+
},
41+
want: &v1.Consistency{
42+
Requirement: &v1.Consistency_AtExactSnapshot{
43+
AtExactSnapshot: &v1.ZedToken{Token: MustIntZedToken(5)},
44+
},
45+
},
46+
},
47+
{
48+
name: "at least as fresh",
49+
headers: map[string][]string{
50+
ConsistencyHeaderKey: {AtLeastAsFreshHeaderValuePrefix + MustIntZedToken(3)},
51+
},
52+
want: &v1.Consistency{
53+
Requirement: &v1.Consistency_AtLeastAsFresh{
54+
AtLeastAsFresh: &v1.ZedToken{Token: MustIntZedToken(3)},
55+
},
56+
},
57+
},
58+
}
59+
for _, tt := range tests {
60+
t.Run(tt.name, func(t *testing.T) {
61+
h := http.Header{}
62+
for k, v := range tt.headers {
63+
for _, vv := range v {
64+
h.Set(k, vv)
65+
}
66+
}
67+
if got := ConsistencyFromHeaders(h); !reflect.DeepEqual(got, tt.want) {
68+
t.Errorf("ConsistencyFromHeaders() = %v, want %v", got, tt.want)
69+
}
70+
})
71+
}
72+
}
73+
74+
func TestWatchDelayFromHeaders(t *testing.T) {
75+
tests := []struct {
76+
name string
77+
headers http.Header
78+
want time.Duration
79+
}{
80+
{
81+
name: "no headers",
82+
want: time.Duration(0),
83+
},
84+
{
85+
name: "bad fmt",
86+
headers: map[string][]string{WatchDelayKey: {"123qwewe"}},
87+
},
88+
{
89+
name: "set delay",
90+
headers: map[string][]string{WatchDelayKey: {"6s"}},
91+
want: time.Second * 6,
92+
},
93+
}
94+
for _, tt := range tests {
95+
t.Run(tt.name, func(t *testing.T) {
96+
h := http.Header{}
97+
for k, v := range tt.headers {
98+
for _, vv := range v {
99+
h.Set(k, vv)
100+
}
101+
}
102+
if got := WatchDelayFromHeaders(h); got != tt.want {
103+
t.Errorf("WatchDelayFromHeaders() = %v, want %v", got, tt.want)
104+
}
105+
})
106+
}
107+
}

pkg/rules/rules.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"regexp"
99
"slices"
1010
"strings"
11+
"time"
1112

13+
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
1214
"github.com/kyverno/go-jmespath"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1416
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -137,6 +139,8 @@ type ResolveInput struct {
137139
Object *metav1.PartialObjectMetadata `json:"object"`
138140
Body []byte `json:"body"`
139141
Headers http.Header `json:"headers"`
142+
Consistency *v1.Consistency `json:"consistency"`
143+
WatchDelay time.Duration `json:"watchDelay"`
140144
}
141145

142146
func NewResolveInputFromHttp(req *http.Request) (*ResolveInput, error) {
@@ -209,6 +213,8 @@ func NewResolveInput(req *request.RequestInfo, user *user.DefaultInfo, object *m
209213
Object: object,
210214
Body: body,
211215
Headers: headers,
216+
Consistency: ConsistencyFromHeaders(headers),
217+
WatchDelay: WatchDelayFromHeaders(headers),
212218
}
213219
}
214220

0 commit comments

Comments
 (0)