Skip to content

Commit b0b1d40

Browse files
committed
feat: add E2E tests for cases that peers going offline
1 parent c9180af commit b0b1d40

File tree

10 files changed

+231
-4
lines changed

10 files changed

+231
-4
lines changed

deploy/docker-compose/template/scheduler.template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ scheduler:
6161
# then the task will also be reclaimed.
6262
taskGCInterval: 30m
6363
# hostGCInterval is the interval of host gc.
64-
hostGCInterval: 6h
64+
hostGCInterval: 5m
6565
# hostTTL is time to live of host. If host announces message to scheduler,
6666
# then HostTTl will be reset.
6767
hostTTL: 1h

pkg/rpc/scheduler/client/client_v1.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
9090
}, nil
9191
}
9292

93-
// GetV1ByAddr returns v2 version of the scheduler client by address.
93+
// GetV1ByAddr returns v1 version of the scheduler client by address.
9494
func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
9595
conn, err := grpc.DialContext(
9696
ctx,

pkg/rpc/scheduler/client/client_v2.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package client
2121
import (
2222
"context"
2323
"math"
24+
"sync"
2425

2526
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
2627
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
@@ -30,6 +31,7 @@ import (
3031
"golang.org/x/sync/errgroup"
3132
"google.golang.org/grpc"
3233
"google.golang.org/grpc/balancer"
34+
"google.golang.org/protobuf/types/known/emptypb"
3335

3436
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
3537
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
@@ -145,6 +147,9 @@ type V2 interface {
145147
// AnnounceHost announces host to scheduler.
146148
AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error
147149

150+
// ListHosts lists hosts in scheduler.
151+
ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error)
152+
148153
// DeleteHost releases host in scheduler.
149154
DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error
150155

@@ -253,6 +258,56 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
253258
return eg.Wait()
254259
}
255260

261+
// ListHosts lists host in all schedulers.
262+
func (v *v2) ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) {
263+
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
264+
defer cancel()
265+
266+
circle, err := v.GetCircle()
267+
if err != nil {
268+
return nil, err
269+
}
270+
271+
responses := make([]*schedulerv2.ListHostsResponse, len(circle))
272+
mu := sync.Mutex{}
273+
274+
eg, _ := errgroup.WithContext(ctx)
275+
for _, virtualTaskID := range circle {
276+
virtualTaskID := virtualTaskID
277+
eg.Go(func() error {
278+
response, err := v.SchedulerClient.ListHosts(
279+
context.WithValue(ctx, pkgbalancer.ContextKey, virtualTaskID),
280+
new(emptypb.Empty),
281+
opts...,
282+
)
283+
if err != nil {
284+
return err
285+
}
286+
287+
mu.Lock()
288+
responses = append(responses, response)
289+
mu.Unlock()
290+
291+
return nil
292+
})
293+
}
294+
295+
idSet := map[string]struct{}{}
296+
var hosts []*commonv2.Host
297+
for _, response := range responses {
298+
for _, host := range response.Hosts {
299+
if _, exist := idSet[host.Id]; !exist {
300+
idSet[host.Id] = struct{}{}
301+
hosts = append(hosts, host)
302+
}
303+
}
304+
}
305+
306+
return &schedulerv2.ListHostsResponse{
307+
Hosts: hosts,
308+
}, eg.Wait()
309+
}
310+
256311
// DeleteHost releases host in all schedulers.
257312
func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error {
258313
ctx, cancel := context.WithTimeout(ctx, contextTimeout)

pkg/rpc/scheduler/client/mocks/client_v2_mock.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/metrics/metrics.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,20 @@ var (
224224
}, []string{"os", "platform", "platform_family", "platform_version",
225225
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})
226226

227+
ListHostsCount = promauto.NewCounter(prometheus.CounterOpts{
228+
Namespace: types.MetricsNamespace,
229+
Subsystem: types.SchedulerMetricsName,
230+
Name: "list_hosts_total",
231+
Help: "Counter of the number of the list hosts.",
232+
})
233+
234+
ListHostsCountFailureCount = promauto.NewCounter(prometheus.CounterOpts{
235+
Namespace: types.MetricsNamespace,
236+
Subsystem: types.SchedulerMetricsName,
237+
Name: "list_hosts_failure_total",
238+
Help: "Counter of the number of failed of the list hosts.",
239+
})
240+
227241
LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{
228242
Namespace: types.MetricsNamespace,
229243
Subsystem: types.SchedulerMetricsName,

scheduler/rpcserver/scheduler_server_v2.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,18 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A
138138
return new(emptypb.Empty), nil
139139
}
140140

141-
// TODO Implement the following methods.
142141
// ListHosts lists hosts in scheduler.
143142
func (s *schedulerServerV2) ListHosts(ctx context.Context, _ *emptypb.Empty) (*schedulerv2.ListHostsResponse, error) {
144-
return nil, nil
143+
// Collect ListHostsCount metrics.
144+
metrics.ListHostsCount.Inc()
145+
resp, err := s.service.ListHosts(ctx)
146+
if err != nil {
147+
// Collect ListHostsFailureCount metrics.
148+
metrics.ListHostsCountFailureCount.Inc()
149+
return nil, err
150+
}
151+
152+
return resp, nil
145153
}
146154

147155
// DeleteHost releases host in scheduler.

scheduler/service/service_v2.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,84 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
693693
return nil
694694
}
695695

696+
// ListHosts lists hosts in scheduler.
697+
func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) {
698+
hosts := v.resource.HostManager().LoadAll()
699+
700+
resHosts := make([]*commonv2.Host, len(hosts))
701+
for i, host := range hosts {
702+
resHosts[i] = &commonv2.Host{
703+
Id: host.ID,
704+
Type: uint32(host.Type),
705+
Hostname: host.Hostname,
706+
Ip: host.IP,
707+
Port: host.Port,
708+
DownloadPort: host.DownloadPort,
709+
Os: host.OS,
710+
Platform: host.Platform,
711+
PlatformFamily: host.PlatformFamily,
712+
PlatformVersion: host.PlatformVersion,
713+
KernelVersion: host.KernelVersion,
714+
Cpu: &commonv2.CPU{
715+
LogicalCount: host.CPU.LogicalCount,
716+
PhysicalCount: host.CPU.PhysicalCount,
717+
Percent: host.CPU.Percent,
718+
ProcessPercent: host.CPU.ProcessPercent,
719+
Times: &commonv2.CPUTimes{
720+
User: host.CPU.Times.User,
721+
System: host.CPU.Times.System,
722+
Idle: host.CPU.Times.Idle,
723+
Nice: host.CPU.Times.Nice,
724+
Iowait: host.CPU.Times.Iowait,
725+
Irq: host.CPU.Times.Irq,
726+
Softirq: host.CPU.Times.Softirq,
727+
Steal: host.CPU.Times.Steal,
728+
Guest: host.CPU.Times.Guest,
729+
GuestNice: host.CPU.Times.GuestNice,
730+
},
731+
},
732+
Memory: &commonv2.Memory{
733+
Total: host.Memory.Total,
734+
Available: host.Memory.Available,
735+
Used: host.Memory.Used,
736+
UsedPercent: host.Memory.UsedPercent,
737+
ProcessUsedPercent: host.Memory.ProcessUsedPercent,
738+
Free: host.Memory.Free,
739+
},
740+
Network: &commonv2.Network{
741+
TcpConnectionCount: host.Network.TCPConnectionCount,
742+
UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,
743+
Location: &host.Network.Location,
744+
Idc: &host.Network.IDC,
745+
},
746+
Disk: &commonv2.Disk{
747+
Total: host.Disk.Total,
748+
Free: host.Disk.Free,
749+
Used: host.Disk.Used,
750+
UsedPercent: host.Disk.UsedPercent,
751+
InodesTotal: host.Disk.InodesTotal,
752+
InodesUsed: host.Disk.InodesUsed,
753+
InodesFree: host.Disk.InodesFree,
754+
InodesUsedPercent: host.Disk.InodesUsedPercent,
755+
},
756+
Build: &commonv2.Build{
757+
GitVersion: host.Build.GitVersion,
758+
GitCommit: &host.Build.GitCommit,
759+
GoVersion: &host.Build.GoVersion,
760+
Platform: &host.Build.Platform,
761+
},
762+
SchedulerClusterId: host.SchedulerClusterID,
763+
DisableShared: host.DisableShared,
764+
}
765+
}
766+
767+
resp := &schedulerv2.ListHostsResponse{
768+
Hosts: resHosts,
769+
}
770+
771+
return resp, nil
772+
}
773+
696774
// DeleteHost releases host in scheduler.
697775
func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) error {
698776
log := logger.WithHostID(req.GetHostId())

test/e2e/v2/leave_host_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2024 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package e2e
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
. "github.com/onsi/ginkgo/v2" //nolint
24+
. "github.com/onsi/gomega" //nolint
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/credentials/insecure"
27+
28+
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
29+
"d7y.io/dragonfly/v2/test/e2e/v2/util"
30+
)
31+
32+
var _ = Describe("Clients go offline normally and abnormally", func() {
33+
Context("scheduler clears peer metadata", func() {
34+
It("number of hosts should be ok", Label("host", "leave"), func() {
35+
hostCount := util.Servers[util.SeedClientServerName].Replicas + util.Servers[util.ClientServerName].Replicas + 2
36+
37+
grpcCredentials := insecure.NewCredentials()
38+
schedulerClient, err := schedulerclient.GetV2ByAddr(context.Background(), ":8002", grpc.WithTransportCredentials(grpcCredentials))
39+
Expect(err).NotTo(HaveOccurred())
40+
41+
response, err := schedulerClient.ListHosts(context.Background())
42+
Expect(err).NotTo(HaveOccurred())
43+
fmt.Println(response)
44+
45+
Expect(response.Hosts).To(Equal(hostCount))
46+
})
47+
})
48+
})

test/testdata/charts/config-v2.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ seedClient:
9696

9797
client:
9898
enable: true
99+
replicas: 3
99100
image:
100101
repository: dragonflyoss/client
101102
tag: latest

test/testdata/kind/config-v2.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ nodes:
1212
- containerPort: 4003
1313
hostPort: 4003
1414
protocol: TCP
15+
- containerPort: 8002
16+
hostPort: 8002
17+
protocol: TCP
1518
extraMounts:
1619
- hostPath: ./test/testdata/containerd/config-v2.toml
1720
containerPath: /etc/containerd/config.toml

0 commit comments

Comments
 (0)