Skip to content

Commit a537a6f

Browse files
Get ClientAssign partially working end-to-end.
Still has a lot of unimplemented corner cases, but it's now possible to get a server assignment under some circumstances. Also: * PROTO ABI BREAK: swap Event fields 2 and 3 (location and unique) * PROTO ABI BREAK: change Event field 8 (weight) from float to double * PROTO API BREAK: rename [has_]shard_id → [has_]shard_number * PROTO API BREAK: rename unique → unique_id * PROTO API BREAK: rename Event weight → assigned_cost_per_second * GO API BREAK: rename ATCUniqueError → ATCUniqueIDError * GO API BREAK: rename ValidateATCUnique → ValidateATCUniqueID * GO API BREAK: rename Resolved Unique → UniqueID * GO API BREAK: rename Resolved [Has]ShardID → [Has]ShardNumber * GO API BREAK: rename ATCTarget Unique → UniqueID * GO API BREAK: rename ATCTarget [Has]ShardID → [Has]ShardNumber * GO API BREAK: move mainutil SetUniqueFile/UniqueID to atcclient * GO API BREAK: rename membership ServerSet/Roxy [Has]ShardID → [Has]ShardNumber * GO API BREAK: delete roxyresolver GetShardID/WithShardID * GO API BREAK: rename mainutil [Etcd|ZK]AnnounceConfig Unique → HostID * GO API BREAK: rename mainutil ATCAnnounceConfig Unique → UniqueID * GO API BREAK: rename atcclient Key [Has]ShardID → [Has]ShardNumber * JSON BREAK: rename membership gRPC metadata key "ShardID" → "Shard" * JSON BREAK: rename membership Roxy JSON key "shardID" → "shard" * DATA BREAK: rename mainutil [Etcd|ZK]AnnounceConfig "unique" → "hostID" * DATA BREAK: rename mainutil ATCAnnounceConfig "unique" → "uniqueID" * Fix roxyresolver WatchingResolver and "atc" scheme deadlocks * Fix bug where ATCClient ServerAnnounce/ClientAssign would forget to CloseSend, leading to hang * ATCClient now falls back on SetUniqueFile/UniqueID * InterceptorFactory cost-tallying interceptors now ignore all ATC RPCs and all healthcheck RPCs * Fix bug in demo-backend processing of HTTP HEAD requests * ATC does better sanity checking of ServerAnnounce inputs * ATC no longer hangs until dirty shutdown when there are outstanding ServerAnnounce/ClientAssign RPC streams * ATC now sends an empty ServerAnnounceResponse after successful connect, resulting in a retry counter reset to 0 in ATCClient
1 parent 97aa9e2 commit a537a6f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2240
-1141
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
/atcctl
55
/colorize-logs
66
/demo-backend
7+
/demo-client
78
/zkcp
89
/*.deb
910
/release-notes.txt

cmd/atc/server_clientassign.go

Lines changed: 179 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package main
22

33
import (
4+
"context"
5+
"errors"
6+
"io"
7+
8+
"github.com/rs/zerolog"
49
"google.golang.org/grpc/codes"
510
"google.golang.org/grpc/status"
611

@@ -39,12 +44,17 @@ func (s *ATCServer) ClientAssign(
3944
return status.Error(codes.InvalidArgument, "first is absent")
4045
}
4146

42-
err = roxyutil.ValidateATCLocation(first.Location)
47+
err = roxyutil.ValidateATCServiceName(first.ServiceName)
48+
if err != nil {
49+
return status.Error(codes.InvalidArgument, err.Error())
50+
}
51+
52+
err = roxyutil.ValidateATCUniqueID(first.UniqueId)
4353
if err != nil {
4454
return status.Error(codes.InvalidArgument, err.Error())
4555
}
4656

47-
err = roxyutil.ValidateATCUnique(first.Unique)
57+
err = roxyutil.ValidateATCLocation(first.Location)
4858
if err != nil {
4959
return status.Error(codes.InvalidArgument, err.Error())
5060
}
@@ -57,7 +67,7 @@ func (s *ATCServer) ClientAssign(
5767
}
5868
}()
5969

60-
key, svc, err := impl.ServiceMap.CheckInput(first.ServiceName, first.ShardId, first.HasShardId, false)
70+
key, svc, err := impl.ServiceMap.CheckInput(first.ServiceName, first.ShardNumber, first.HasShardNumber, false)
6171

6272
logger.Debug().
6373
Stringer("key", key).
@@ -72,5 +82,170 @@ func (s *ATCServer) ClientAssign(
7282
return err
7383
}
7484

75-
return status.Errorf(codes.Unimplemented, "method ClientAssign not implemented")
85+
shardData := s.ref.GetOrInsertShard(key, svc, impl.CostMap)
86+
87+
shardData.Mutex.Lock()
88+
clientData := shardData.LockedGetOrInsertClient(first)
89+
clientData.CostHistory.UpdateAbsolute(req.CostCounter)
90+
clientData.LockedUpdate(true, req.IsServing)
91+
clientData.LockedOnConnect()
92+
goAwayCh := (<-chan *roxy_v0.GoAway)(clientData.GoAwayCh)
93+
shardData.Mutex.Unlock()
94+
95+
needImplRelease = false
96+
s.ref.ReleaseSharedImpl()
97+
98+
active := &activeClientAssign{
99+
logger: logger,
100+
s: s,
101+
cas: cas,
102+
shardData: shardData,
103+
clientData: clientData,
104+
flushCh: clientData.FlushCh,
105+
errCh: make(chan error),
106+
}
107+
108+
go active.recvThread(req.IsServing)
109+
110+
err = active.doSend(&roxy_v0.ClientAssignResponse{})
111+
if err != nil {
112+
return err
113+
}
114+
115+
for {
116+
select {
117+
case <-gMultiServer.Done():
118+
err := status.Error(codes.Unavailable, "ATC server is shutting down")
119+
logger.Trace().
120+
Err(err).
121+
Msg("Return")
122+
return err
123+
124+
case err := <-active.errCh:
125+
return err
126+
127+
case goAway, ok := <-goAwayCh:
128+
if !ok {
129+
return <-active.errCh
130+
}
131+
132+
if goAway == nil {
133+
err := status.Errorf(codes.NotFound, "Key %v no longer exists", shardData.Key)
134+
logger.Trace().
135+
Err(err).
136+
Msg("Return")
137+
return err
138+
}
139+
140+
err := active.doSend(&roxy_v0.ClientAssignResponse{GoAway: goAway})
141+
if err == nil {
142+
err = <-active.errCh
143+
}
144+
return err
145+
146+
case <-active.flushCh:
147+
shardData.Mutex.Lock()
148+
events := clientData.EventQueue
149+
clientData.EventQueue = nil
150+
shardData.Mutex.Unlock()
151+
152+
if len(events) != 0 {
153+
err := active.doSend(&roxy_v0.ClientAssignResponse{Events: events})
154+
if err != nil {
155+
return err
156+
}
157+
}
158+
}
159+
}
160+
}
161+
162+
type activeClientAssign struct {
163+
logger zerolog.Logger
164+
s *ATCServer
165+
cas roxy_v0.AirTrafficControl_ClientAssignServer
166+
shardData *ShardData
167+
clientData *ClientData
168+
flushCh chan struct{}
169+
errCh chan error
170+
}
171+
172+
func (active *activeClientAssign) doSend(resp *roxy_v0.ClientAssignResponse) error {
173+
active.logger.Trace().
174+
Str("func", "ClientAssign.Send").
175+
Interface("resp", resp).
176+
Msg("Call")
177+
178+
err := active.cas.Send(resp)
179+
if err != nil {
180+
active.logger.Error().
181+
Str("func", "ClientAssign.Send").
182+
Err(err).
183+
Msg("Error")
184+
}
185+
return err
186+
}
187+
188+
func (active *activeClientAssign) recvThread(lastIsServing bool) {
189+
var err error
190+
var sendErr bool
191+
defer func() {
192+
active.shardData.Mutex.Lock()
193+
active.clientData.CostHistory.Update()
194+
active.clientData.LockedUpdate(false, lastIsServing)
195+
active.shardData.Mutex.Unlock()
196+
197+
if sendErr {
198+
select {
199+
case active.errCh <- err:
200+
default:
201+
}
202+
}
203+
204+
close(active.errCh)
205+
}()
206+
207+
for {
208+
active.logger.Trace().
209+
Str("func", "ClientAssign.Recv").
210+
Msg("Call")
211+
212+
var req *roxy_v0.ClientAssignRequest
213+
req, err = active.cas.Recv()
214+
s, ok := status.FromError(err)
215+
216+
switch {
217+
case err == nil:
218+
active.logger.Trace().
219+
Interface("req", req).
220+
Msg("Request")
221+
222+
case err == io.EOF:
223+
fallthrough
224+
case errors.Is(err, context.Canceled):
225+
fallthrough
226+
case errors.Is(err, context.DeadlineExceeded):
227+
fallthrough
228+
case ok && s.Code() == codes.Canceled:
229+
active.logger.Trace().
230+
Str("func", "ClientAssign.Recv").
231+
Err(err).
232+
Msg("Hangup")
233+
return
234+
235+
default:
236+
active.logger.Error().
237+
Str("func", "ClientAssign.Recv").
238+
Err(err).
239+
Msg("Error")
240+
sendErr = true
241+
return
242+
}
243+
244+
active.shardData.Mutex.Lock()
245+
active.clientData.CostHistory.UpdateAbsolute(req.CostCounter)
246+
active.clientData.LockedUpdate(true, req.IsServing)
247+
active.shardData.Mutex.Unlock()
248+
249+
lastIsServing = req.IsServing
250+
}
76251
}

cmd/atc/server_find.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (s *ATCServer) Find(
2222
impl := s.ref.AcquireSharedImpl()
2323
defer s.ref.ReleaseSharedImpl()
2424

25-
key, _, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardId, req.HasShardId, false)
25+
key, _, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardNumber, req.HasShardNumber, false)
2626

2727
logger.Debug().
2828
Stringer("key", key).

cmd/atc/server_lookup.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func (s *ATCServer) Lookup(
1919
impl := s.ref.AcquireSharedImpl()
2020
defer s.ref.ReleaseSharedImpl()
2121

22-
key, svc, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardId, req.HasShardId, true)
22+
key, svc, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardNumber, req.HasShardNumber, true)
2323

2424
logger.Debug().
2525
Stringer("key", key).
@@ -40,10 +40,10 @@ func (s *ATCServer) Lookup(
4040
AvgDemandedCostPerQuery: svc.AvgDemandedCPQ,
4141
}
4242

43-
if svc.IsSharded && !req.HasShardId {
44-
shardLimit := ShardID(svc.NumShards)
43+
if svc.IsSharded && !req.HasShardNumber {
44+
shardLimit := ShardNumber(svc.NumShards)
4545
s.ref.mu.Lock()
46-
for id := ShardID(0); id < shardLimit; id++ {
46+
for id := ShardNumber(0); id < shardLimit; id++ {
4747
key2 := Key{key.ServiceName, id, true}
4848
shardData := s.ref.shardsByKey[key2]
4949
if shardData != nil {
@@ -74,11 +74,11 @@ func (s *ATCServer) LookupClients(
7474
impl := s.ref.AcquireSharedImpl()
7575
defer s.ref.ReleaseSharedImpl()
7676

77-
key, _, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardId, req.HasShardId, false)
77+
key, _, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardNumber, req.HasShardNumber, false)
7878

7979
logger.Debug().
8080
Stringer("key", key).
81-
Str("uniqueID", req.Unique).
81+
Str("uniqueID", req.UniqueId).
8282
Msg("RPC")
8383

8484
if err != nil {
@@ -90,14 +90,14 @@ func (s *ATCServer) LookupClients(
9090
shardData := s.ref.Shard(key)
9191
if shardData != nil {
9292
shardData.Mutex.Lock()
93-
if req.Unique == "" {
94-
resp.Clients = make([]*roxy_v0.ClientData, 0, len(shardData.ClientsByUnique))
95-
for _, clientData := range shardData.ClientsByUnique {
93+
if req.UniqueId == "" {
94+
resp.Clients = make([]*roxy_v0.ClientData, 0, len(shardData.Clients))
95+
for _, clientData := range shardData.Clients {
9696
resp.Clients = append(resp.Clients, clientData.LockedToProto())
9797
}
9898
} else {
9999
resp.Clients = make([]*roxy_v0.ClientData, 0, 1)
100-
clientData := shardData.ClientsByUnique[req.Unique]
100+
clientData := shardData.Clients[req.UniqueId]
101101
if clientData != nil {
102102
resp.Clients = append(resp.Clients, clientData.LockedToProto())
103103
}
@@ -121,11 +121,11 @@ func (s *ATCServer) LookupServers(
121121
impl := s.ref.AcquireSharedImpl()
122122
defer s.ref.ReleaseSharedImpl()
123123

124-
key, _, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardId, req.HasShardId, false)
124+
key, _, err := impl.ServiceMap.CheckInput(req.ServiceName, req.ShardNumber, req.HasShardNumber, false)
125125

126126
logger.Debug().
127127
Stringer("key", key).
128-
Str("uniqueID", req.Unique).
128+
Str("uniqueID", req.UniqueId).
129129
Msg("RPC")
130130

131131
if err != nil {
@@ -137,14 +137,14 @@ func (s *ATCServer) LookupServers(
137137
shardData := s.ref.Shard(key)
138138
if shardData != nil {
139139
shardData.Mutex.Lock()
140-
if req.Unique == "" {
141-
resp.Servers = make([]*roxy_v0.ServerData, 0, len(shardData.ServersByUnique))
142-
for _, serverData := range shardData.ServersByUnique {
140+
if req.UniqueId == "" {
141+
resp.Servers = make([]*roxy_v0.ServerData, 0, len(shardData.Servers))
142+
for _, serverData := range shardData.Servers {
143143
resp.Servers = append(resp.Servers, serverData.LockedToProto())
144144
}
145145
} else {
146146
resp.Servers = make([]*roxy_v0.ServerData, 0, 1)
147-
serverData := shardData.ServersByUnique[req.Unique]
147+
serverData := shardData.Servers[req.UniqueId]
148148
if serverData != nil {
149149
resp.Servers = append(resp.Servers, serverData.LockedToProto())
150150
}

0 commit comments

Comments
 (0)