Skip to content

Commit 9d252a5

Browse files
committed
feat: add E2E tests for cases that peers going offline
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
1 parent 4d2e929 commit 9d252a5

File tree

12 files changed

+258
-12
lines changed

12 files changed

+258
-12
lines changed

.github/workflows/compatibility-e2e-v2.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ jobs:
4545
image: client
4646
image-tag: v0.1.100
4747
chart-name: seed-client
48+
skip: "Clients go offline normally and abnormally"
4849

4950
steps:
5051
- name: Free Disk Space (Ubuntu)

deploy/docker-compose/template/scheduler.template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ scheduler:
6464
# then the task will also be reclaimed.
6565
taskGCInterval: 30m
6666
# hostGCInterval is the interval of host gc.
67-
hostGCInterval: 6h
67+
hostGCInterval: 5m
6868
# hostTTL is time to live of host. If host announces message to scheduler,
6969
# then HostTTl will be reset.
7070
hostTTL: 1h

pkg/rpc/scheduler/client/client_v1.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
9090
}, nil
9191
}
9292

93-
// GetV1ByAddr returns v2 version of the scheduler client by address.
93+
// GetV1ByAddr returns v1 version of the scheduler client by address.
9494
func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
9595
conn, err := grpc.DialContext(
9696
ctx,

pkg/rpc/scheduler/client/client_v2.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"golang.org/x/sync/errgroup"
3131
"google.golang.org/grpc"
3232
"google.golang.org/grpc/balancer"
33+
"google.golang.org/protobuf/types/known/emptypb"
3334

3435
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
3536
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
@@ -145,6 +146,9 @@ type V2 interface {
145146
// AnnounceHost announces host to scheduler.
146147
AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error
147148

149+
// ListHosts lists hosts in scheduler.
150+
ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error)
151+
148152
// DeleteHost releases host in scheduler.
149153
DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error
150154

@@ -250,6 +254,18 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
250254
return eg.Wait()
251255
}
252256

257+
// ListHosts lists host in all schedulers.
258+
func (v *v2) ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) {
259+
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
260+
defer cancel()
261+
262+
return v.SchedulerClient.ListHosts(
263+
context.WithValue(ctx, pkgbalancer.ContextKey, ""),
264+
new(emptypb.Empty),
265+
opts...,
266+
)
267+
}
268+
253269
// DeleteHost releases host in all schedulers.
254270
func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error {
255271
ctx, cancel := context.WithTimeout(ctx, contextTimeout)

pkg/rpc/scheduler/client/mocks/client_v2_mock.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/metrics/metrics.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,20 @@ var (
224224
}, []string{"os", "platform", "platform_family", "platform_version",
225225
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})
226226

227+
ListHostsCount = promauto.NewCounter(prometheus.CounterOpts{
228+
Namespace: types.MetricsNamespace,
229+
Subsystem: types.SchedulerMetricsName,
230+
Name: "list_hosts_total",
231+
Help: "Counter of the number of the list hosts.",
232+
})
233+
234+
ListHostsCountFailureCount = promauto.NewCounter(prometheus.CounterOpts{
235+
Namespace: types.MetricsNamespace,
236+
Subsystem: types.SchedulerMetricsName,
237+
Name: "list_hosts_failure_total",
238+
Help: "Counter of the number of failed of the list hosts.",
239+
})
240+
227241
LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{
228242
Namespace: types.MetricsNamespace,
229243
Subsystem: types.SchedulerMetricsName,

scheduler/rpcserver/scheduler_server_v2.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,18 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A
136136
return new(emptypb.Empty), nil
137137
}
138138

139-
// TODO Implement the following methods.
140139
// ListHosts lists hosts in scheduler.
141140
func (s *schedulerServerV2) ListHosts(ctx context.Context, _ *emptypb.Empty) (*schedulerv2.ListHostsResponse, error) {
142-
return nil, nil
141+
// Collect ListHostsCount metrics.
142+
metrics.ListHostsCount.Inc()
143+
resp, err := s.service.ListHosts(ctx)
144+
if err != nil {
145+
// Collect ListHostsFailureCount metrics.
146+
metrics.ListHostsCountFailureCount.Inc()
147+
return nil, err
148+
}
149+
150+
return resp, nil
143151
}
144152

145153
// DeleteHost releases host in scheduler.

scheduler/service/service_v2.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,84 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
699699
return nil
700700
}
701701

702+
// ListHosts lists hosts in scheduler.
703+
func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) {
704+
hosts := v.resource.HostManager().LoadAll()
705+
706+
resHosts := make([]*commonv2.Host, len(hosts))
707+
for i, host := range hosts {
708+
resHosts[i] = &commonv2.Host{
709+
Id: host.ID,
710+
Type: uint32(host.Type),
711+
Hostname: host.Hostname,
712+
Ip: host.IP,
713+
Port: host.Port,
714+
DownloadPort: host.DownloadPort,
715+
Os: host.OS,
716+
Platform: host.Platform,
717+
PlatformFamily: host.PlatformFamily,
718+
PlatformVersion: host.PlatformVersion,
719+
KernelVersion: host.KernelVersion,
720+
Cpu: &commonv2.CPU{
721+
LogicalCount: host.CPU.LogicalCount,
722+
PhysicalCount: host.CPU.PhysicalCount,
723+
Percent: host.CPU.Percent,
724+
ProcessPercent: host.CPU.ProcessPercent,
725+
Times: &commonv2.CPUTimes{
726+
User: host.CPU.Times.User,
727+
System: host.CPU.Times.System,
728+
Idle: host.CPU.Times.Idle,
729+
Nice: host.CPU.Times.Nice,
730+
Iowait: host.CPU.Times.Iowait,
731+
Irq: host.CPU.Times.Irq,
732+
Softirq: host.CPU.Times.Softirq,
733+
Steal: host.CPU.Times.Steal,
734+
Guest: host.CPU.Times.Guest,
735+
GuestNice: host.CPU.Times.GuestNice,
736+
},
737+
},
738+
Memory: &commonv2.Memory{
739+
Total: host.Memory.Total,
740+
Available: host.Memory.Available,
741+
Used: host.Memory.Used,
742+
UsedPercent: host.Memory.UsedPercent,
743+
ProcessUsedPercent: host.Memory.ProcessUsedPercent,
744+
Free: host.Memory.Free,
745+
},
746+
Network: &commonv2.Network{
747+
TcpConnectionCount: host.Network.TCPConnectionCount,
748+
UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,
749+
Location: &host.Network.Location,
750+
Idc: &host.Network.IDC,
751+
},
752+
Disk: &commonv2.Disk{
753+
Total: host.Disk.Total,
754+
Free: host.Disk.Free,
755+
Used: host.Disk.Used,
756+
UsedPercent: host.Disk.UsedPercent,
757+
InodesTotal: host.Disk.InodesTotal,
758+
InodesUsed: host.Disk.InodesUsed,
759+
InodesFree: host.Disk.InodesFree,
760+
InodesUsedPercent: host.Disk.InodesUsedPercent,
761+
},
762+
Build: &commonv2.Build{
763+
GitVersion: host.Build.GitVersion,
764+
GitCommit: &host.Build.GitCommit,
765+
GoVersion: &host.Build.GoVersion,
766+
Platform: &host.Build.Platform,
767+
},
768+
SchedulerClusterId: host.SchedulerClusterID,
769+
DisableShared: host.DisableShared,
770+
}
771+
}
772+
773+
resp := &schedulerv2.ListHostsResponse{
774+
Hosts: resHosts,
775+
}
776+
777+
return resp, nil
778+
}
779+
702780
// DeleteHost releases host in scheduler.
703781
func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) error {
704782
log := logger.WithHostID(req.GetHostId())

test/e2e/v2/leave_host_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2024 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package e2e
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
. "github.com/onsi/ginkgo/v2" //nolint
26+
. "github.com/onsi/gomega" //nolint
27+
"google.golang.org/grpc"
28+
"google.golang.org/grpc/credentials/insecure"
29+
30+
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
31+
"d7y.io/dragonfly/v2/test/e2e/v2/util"
32+
)
33+
34+
var _ = Describe("Clients go offline normally and abnormally", func() {
35+
Context("scheduler clears peer metadata", func() {
36+
It("number of hosts should be ok", Label("host", "leave"), func() {
37+
getHostCountFromScheduler := func(schedulerClient schedulerclient.V2) (hostCount int) {
38+
response, err := schedulerClient.ListHosts(context.Background())
39+
fmt.Println(response, err)
40+
Expect(err).NotTo(HaveOccurred())
41+
42+
hosts := response.Hosts
43+
for _, host := range hosts {
44+
// HostID: "10.244.0.13-dragonfly-seed-client-0-seed"
45+
// PeerHostID: "3dba4916d8271d6b71bb20e95a0b5494c9a941ab7ef3567f805abca8614dc128"
46+
if strings.Contains(host.Id, "-") {
47+
hostCount++
48+
}
49+
}
50+
return
51+
}
52+
53+
grpcCredentials := insecure.NewCredentials()
54+
schedulerClient, err := schedulerclient.GetV2ByAddr(context.Background(), ":8002", grpc.WithTransportCredentials(grpcCredentials))
55+
Expect(err).NotTo(HaveOccurred())
56+
57+
time.Sleep(3 * time.Minute)
58+
hostCount := util.Servers[util.SeedClientServerName].Replicas + util.Servers[util.ClientServerName].Replicas
59+
Expect(getHostCountFromScheduler(schedulerClient)).To(Equal(hostCount))
60+
61+
podName, err := util.GetClientPodName()
62+
Expect(err).NotTo(HaveOccurred())
63+
64+
out, err := util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName).CombinedOutput()
65+
fmt.Println(string(out))
66+
Expect(err).NotTo(HaveOccurred())
67+
Expect(getHostCountFromScheduler(schedulerClient)).To(Equal(hostCount))
68+
69+
podName, err = util.GetClientPodName()
70+
Expect(err).NotTo(HaveOccurred())
71+
72+
out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--force", "--grace-period=0").CombinedOutput()
73+
fmt.Println(string(out))
74+
Expect(err).NotTo(HaveOccurred())
75+
time.Sleep(6 * time.Minute)
76+
Expect(getHostCountFromScheduler(schedulerClient)).To(Equal(hostCount))
77+
})
78+
})
79+
})

test/e2e/v2/util/exec.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,37 +103,61 @@ func KubeCtlCopyCommand(ns, pod, source, target string) *exec.Cmd {
103103
}
104104

105105
func ClientExec() (*PodExec, error) {
106+
podName, err := GetClientPodName()
107+
if err != nil {
108+
return nil, err
109+
}
110+
return NewPodExec(DragonflyNamespace, podName, "client"), nil
111+
}
112+
113+
func SeedClientExec(n int) (*PodExec, error) {
114+
podName, err := GetSeedClientPodName(n)
115+
if err != nil {
116+
return nil, err
117+
}
118+
return NewPodExec(DragonflyNamespace, podName, "seed-client"), nil
119+
}
120+
121+
func ManagerExec(n int) (*PodExec, error) {
122+
podName, err := GetManagerPodName(n)
123+
if err != nil {
124+
return nil, err
125+
}
126+
return NewPodExec(DragonflyNamespace, podName, "manager"), nil
127+
}
128+
129+
func GetClientPodName() (string, error) {
106130
out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=client",
107131
"-o", fmt.Sprintf("jsonpath='{range .items[0]}{.metadata.name}{end}'")).CombinedOutput()
108132
if err != nil {
109-
return nil, err
133+
return "", err
110134
}
111135

112136
podName := strings.Trim(string(out), "'")
113137
fmt.Println(podName)
114-
return NewPodExec(DragonflyNamespace, podName, "client"), nil
138+
return podName, nil
115139
}
116140

117-
func SeedClientExec(n int) (*PodExec, error) {
141+
func GetSeedClientPodName(n int) (string, error) {
118142
out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=seed-client",
119143
"-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput()
120144
if err != nil {
121-
return nil, err
145+
return "", err
122146
}
123147

124148
podName := strings.Trim(string(out), "'")
125149
fmt.Println(podName)
126-
return NewPodExec(DragonflyNamespace, podName, "seed-client"), nil
150+
return podName, nil
127151
}
128152

129-
func ManagerExec(n int) (*PodExec, error) {
153+
func GetManagerPodName(n int) (string, error) {
130154
out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=manager",
131155
"-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput()
132156
if err != nil {
133-
return nil, err
157+
return "", err
134158
}
135159

136160
podName := strings.Trim(string(out), "'")
137161
fmt.Println(podName)
138-
return NewPodExec(DragonflyNamespace, podName, "manager"), nil
162+
return podName, nil
139163
}

0 commit comments

Comments
 (0)