Skip to content

Commit eb31eb7

Browse files
committed
feat: implement Destroy and Get endpoints for PersistentCache
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
1 parent 128f2d9 commit eb31eb7

File tree

8 files changed

+508
-18
lines changed

8 files changed

+508
-18
lines changed

manager/handlers/persistent_cache.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,61 @@ import (
2424
"d7y.io/dragonfly/v2/manager/types"
2525
)
2626

27+
// @Summary Destroy PersistentCache
28+
// @Description Destroy PersistentCache by id
29+
// @Tags PersistentCache
30+
// @Accept json
31+
// @Produce json
32+
// @Param scheduler_cluster_id path string true "scheduler cluster id"
33+
// @Param task_id path string true "task id"
34+
// @Success 200
35+
// @Failure 400
36+
// @Failure 404
37+
// @Failure 500
38+
// @Router /api/v1/persistent-caches/{scheduler_cluster_id}/{task_id} [delete]
39+
func (h *Handlers) DestroyPersistentCache(ctx *gin.Context) {
40+
var params types.PersistentCacheParams
41+
if err := ctx.ShouldBindUri(&params); err != nil {
42+
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
43+
return
44+
}
45+
46+
if err := h.service.DestroyPersistentCache(ctx.Request.Context(), params); err != nil {
47+
ctx.Error(err) // nolint: errcheck
48+
return
49+
}
50+
51+
ctx.Status(http.StatusOK)
52+
}
53+
54+
// @Summary Get PersistentCache
55+
// @Description Get PersistentCache by id
56+
// @Tags PersistentCache
57+
// @Accept json
58+
// @Produce json
59+
// @Param scheduler_cluster_id path string true "scheduler cluster id"
60+
// @Param task_id path string true "task id"
61+
// @Success 200 {object} types.GetPersistentCacheResponse
62+
// @Failure 400
63+
// @Failure 404
64+
// @Failure 500
65+
// @Router /api/v1/persistent-caches/{scheduler_cluster_id}/{task_id} [get]
66+
func (h *Handlers) GetPersistentCache(ctx *gin.Context) {
67+
var params types.PersistentCacheParams
68+
if err := ctx.ShouldBindUri(&params); err != nil {
69+
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
70+
return
71+
}
72+
73+
persistentCache, err := h.service.GetPersistentCache(ctx.Request.Context(), params)
74+
if err != nil {
75+
ctx.Error(err) // nolint: errcheck
76+
return
77+
}
78+
79+
ctx.JSON(http.StatusOK, persistentCache)
80+
}
81+
2782
// @Summary Get PersistentCaches
2883
// @Description Get PersistentCaches
2984
// @Tags PersistentCache
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
/*
2+
* Copyright 2025 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package handlers
18+
19+
import (
20+
"bytes"
21+
"encoding/json"
22+
"errors"
23+
"net/http"
24+
"net/http/httptest"
25+
"testing"
26+
"time"
27+
28+
"github.com/bits-and-blooms/bitset"
29+
"github.com/gin-gonic/gin"
30+
"github.com/stretchr/testify/assert"
31+
"go.uber.org/mock/gomock"
32+
33+
"d7y.io/dragonfly/v2/manager/service/mocks"
34+
"d7y.io/dragonfly/v2/manager/types"
35+
)
36+
37+
func mockPersistentCacheRouter(h *Handlers) *gin.Engine {
38+
r := gin.Default()
39+
r.DELETE("/api/v1/persistent-caches/:scheduler_cluster_id/:task_id", h.DestroyPersistentCache)
40+
r.GET("/api/v1/persistent-caches/:scheduler_cluster_id/:task_id", h.GetPersistentCache)
41+
r.GET("/api/v1/persistent-caches", h.GetPersistentCaches)
42+
return r
43+
}
44+
45+
func TestHandlers_DestroyPersistentCache(t *testing.T) {
46+
tests := []struct {
47+
name string
48+
mock func(mockService *mocks.MockService)
49+
req *http.Request
50+
expect func(t *testing.T, w *httptest.ResponseRecorder)
51+
payload interface{}
52+
}{
53+
{
54+
name: "destroy persistent cache successfully",
55+
mock: func(mockService *mocks.MockService) {
56+
mockService.EXPECT().DestroyPersistentCache(gomock.Any(), gomock.Any()).
57+
Return(nil)
58+
},
59+
req: httptest.NewRequest(http.MethodDelete, "/api/v1/persistent-caches/1/task-123", nil),
60+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
61+
assert := assert.New(t)
62+
assert.Equal(http.StatusOK, w.Code)
63+
},
64+
},
65+
{
66+
name: "invalid URI params",
67+
mock: func(mockService *mocks.MockService) {
68+
// Not expecting any service calls
69+
},
70+
req: httptest.NewRequest(http.MethodDelete, "/api/v1/persistent-caches/invalid/task-123", nil),
71+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
72+
assert := assert.New(t)
73+
assert.Equal(http.StatusUnprocessableEntity, w.Code)
74+
},
75+
},
76+
{
77+
name: "service returns error",
78+
mock: func(mockService *mocks.MockService) {
79+
mockService.EXPECT().DestroyPersistentCache(gomock.Any(), gomock.Any()).
80+
Return(errors.New("service error"))
81+
},
82+
req: httptest.NewRequest(http.MethodDelete, "/api/v1/persistent-caches/1/task-123", nil),
83+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
84+
assert := assert.New(t)
85+
assert.NotEqual(http.StatusOK, w.Code)
86+
},
87+
},
88+
}
89+
90+
for _, tc := range tests {
91+
t.Run(tc.name, func(t *testing.T) {
92+
ctl := gomock.NewController(t)
93+
defer ctl.Finish()
94+
svc := mocks.NewMockService(ctl)
95+
tc.mock(svc)
96+
w := httptest.NewRecorder()
97+
h := New(svc)
98+
mockRouter := mockPersistentCacheRouter(h)
99+
100+
mockRouter.ServeHTTP(w, tc.req)
101+
tc.expect(t, w)
102+
})
103+
}
104+
}
105+
106+
func TestHandlers_GetPersistentCache(t *testing.T) {
107+
mockResponse := &types.GetPersistentCacheResponse{
108+
TaskID: "task-123",
109+
PersistentReplicaCount: 3,
110+
Tag: "tag1",
111+
Application: "app1",
112+
PieceLength: 1024,
113+
ContentLength: 10240,
114+
TotalPieceCount: 10,
115+
State: "Active",
116+
TTL: time.Hour,
117+
CreatedAt: time.Now(),
118+
UpdatedAt: time.Now(),
119+
PersistentCachePeers: []types.PersistentCachePeer{},
120+
}
121+
122+
tests := []struct {
123+
name string
124+
mock func(mockService *mocks.MockService)
125+
req *http.Request
126+
expect func(t *testing.T, w *httptest.ResponseRecorder)
127+
payload interface{}
128+
}{
129+
{
130+
name: "get persistent cache successfully",
131+
mock: func(mockService *mocks.MockService) {
132+
mockService.EXPECT().GetPersistentCache(gomock.Any(), gomock.Any()).
133+
Return(mockResponse, nil)
134+
},
135+
req: httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches/1/task-123", nil),
136+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
137+
assert := assert.New(t)
138+
assert.Equal(http.StatusOK, w.Code)
139+
140+
var resp types.GetPersistentCacheResponse
141+
err := json.Unmarshal(w.Body.Bytes(), &resp)
142+
assert.Nil(err)
143+
assert.Equal("task-123", resp.TaskID)
144+
assert.Equal(uint64(3), resp.PersistentReplicaCount)
145+
assert.Equal("tag1", resp.Tag)
146+
assert.Equal("app1", resp.Application)
147+
},
148+
},
149+
{
150+
name: "invalid URI params",
151+
mock: func(mockService *mocks.MockService) {
152+
// Not expecting any service calls
153+
},
154+
req: httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches/invalid/task-123", nil),
155+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
156+
assert := assert.New(t)
157+
assert.Equal(http.StatusUnprocessableEntity, w.Code)
158+
},
159+
},
160+
{
161+
name: "service returns error",
162+
mock: func(mockService *mocks.MockService) {
163+
mockService.EXPECT().GetPersistentCache(gomock.Any(), gomock.Any()).
164+
Return(nil, errors.New("service error"))
165+
},
166+
req: httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches/1/task-123", nil),
167+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
168+
assert := assert.New(t)
169+
assert.NotEqual(http.StatusOK, w.Code)
170+
},
171+
},
172+
}
173+
174+
for _, tc := range tests {
175+
t.Run(tc.name, func(t *testing.T) {
176+
ctl := gomock.NewController(t)
177+
defer ctl.Finish()
178+
svc := mocks.NewMockService(ctl)
179+
tc.mock(svc)
180+
w := httptest.NewRecorder()
181+
h := New(svc)
182+
mockRouter := mockPersistentCacheRouter(h)
183+
184+
mockRouter.ServeHTTP(w, tc.req)
185+
tc.expect(t, w)
186+
})
187+
}
188+
}
189+
190+
func TestHandlers_GetPersistentCaches(t *testing.T) {
191+
// Create mock bitset for finished pieces
192+
bs := bitset.New(10)
193+
bs.Set(0)
194+
bs.Set(1)
195+
bs.Set(2)
196+
197+
mockResponses := []types.GetPersistentCacheResponse{
198+
{
199+
TaskID: "task-123",
200+
PersistentReplicaCount: 3,
201+
Tag: "tag1",
202+
Application: "app1",
203+
PieceLength: 1024,
204+
ContentLength: 10240,
205+
TotalPieceCount: 10,
206+
State: "Active",
207+
TTL: time.Hour,
208+
CreatedAt: time.Now(),
209+
UpdatedAt: time.Now(),
210+
PersistentCachePeers: []types.PersistentCachePeer{
211+
{
212+
ID: "peer-1",
213+
Persistent: true,
214+
FinishedPieces: bs,
215+
State: "Running",
216+
BlockParents: []string{},
217+
Cost: time.Second * 5,
218+
CreatedAt: time.Now(),
219+
UpdatedAt: time.Now(),
220+
Host: types.PersistentCachePeerHost{
221+
ID: "host-1",
222+
Type: "peer",
223+
Hostname: "peer-host-1",
224+
IP: "192.168.1.1",
225+
},
226+
},
227+
},
228+
},
229+
}
230+
231+
tests := []struct {
232+
name string
233+
mock func(mockService *mocks.MockService)
234+
req *http.Request
235+
expect func(t *testing.T, w *httptest.ResponseRecorder)
236+
payload interface{}
237+
}{
238+
{
239+
name: "get persistent caches successfully",
240+
mock: func(mockService *mocks.MockService) {
241+
mockService.EXPECT().GetPersistentCaches(gomock.Any(), gomock.Any()).
242+
Return(mockResponses, int64(1), nil)
243+
},
244+
req: httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches?page=1&per_page=10", nil),
245+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
246+
assert := assert.New(t)
247+
assert.Equal(http.StatusOK, w.Code)
248+
249+
var resp []types.GetPersistentCacheResponse
250+
err := json.Unmarshal(w.Body.Bytes(), &resp)
251+
assert.Nil(err)
252+
assert.Equal(1, len(resp))
253+
assert.Equal("task-123", resp[0].TaskID)
254+
assert.Equal(uint64(3), resp[0].PersistentReplicaCount)
255+
assert.Equal("tag1", resp[0].Tag)
256+
assert.Equal(1, len(resp[0].PersistentCachePeers))
257+
assert.Equal("peer-1", resp[0].PersistentCachePeers[0].ID)
258+
assert.Equal(true, resp[0].PersistentCachePeers[0].Persistent)
259+
},
260+
},
261+
{
262+
name: "invalid query params",
263+
mock: func(mockService *mocks.MockService) {
264+
// Not expecting any service calls
265+
},
266+
req: httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches?page=-1", nil),
267+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
268+
assert := assert.New(t)
269+
assert.Equal(http.StatusUnprocessableEntity, w.Code)
270+
},
271+
},
272+
{
273+
name: "service returns error",
274+
mock: func(mockService *mocks.MockService) {
275+
mockService.EXPECT().GetPersistentCaches(gomock.Any(), gomock.Any()).
276+
Return(nil, int64(0), errors.New("service error"))
277+
},
278+
req: httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches?page=1&per_page=10", nil),
279+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
280+
assert := assert.New(t)
281+
assert.NotEqual(http.StatusOK, w.Code)
282+
},
283+
},
284+
{
285+
name: "with scheduler cluster IDs",
286+
mock: func(mockService *mocks.MockService) {
287+
mockService.EXPECT().GetPersistentCaches(gomock.Any(), gomock.Any()).
288+
Return(mockResponses, int64(1), nil)
289+
},
290+
req: func() *http.Request {
291+
body := map[string]interface{}{
292+
"scheduler_cluster_ids": []uint{1, 2, 3},
293+
}
294+
b, _ := json.Marshal(body)
295+
req := httptest.NewRequest(http.MethodGet, "/api/v1/persistent-caches?page=1&per_page=10", bytes.NewReader(b))
296+
req.Header.Set("Content-Type", "application/json")
297+
return req
298+
}(),
299+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
300+
assert := assert.New(t)
301+
assert.Equal(http.StatusOK, w.Code)
302+
303+
var resp []types.GetPersistentCacheResponse
304+
err := json.Unmarshal(w.Body.Bytes(), &resp)
305+
assert.Nil(err)
306+
assert.Equal(1, len(resp))
307+
},
308+
},
309+
}
310+
311+
for _, tc := range tests {
312+
t.Run(tc.name, func(t *testing.T) {
313+
ctl := gomock.NewController(t)
314+
defer ctl.Finish()
315+
svc := mocks.NewMockService(ctl)
316+
tc.mock(svc)
317+
w := httptest.NewRecorder()
318+
h := New(svc)
319+
mockRouter := mockPersistentCacheRouter(h)
320+
321+
mockRouter.ServeHTTP(w, tc.req)
322+
tc.expect(t, w)
323+
})
324+
}
325+
}

manager/router/router.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,8 @@ func Init(cfg *config.Config, logDir string, service service.Service, database *
235235

236236
// Persistent Cache.
237237
pc := apiv1.Group("/persistent-caches", jwt.MiddlewareFunc(), rbac)
238-
//pc.DELETE(":id", h.DestroyPersistentCache)
239-
//pc.GET(":id", h.GetPersistentCache)
238+
pc.DELETE(":id", h.DestroyPersistentCache)
239+
pc.GET(":id", h.GetPersistentCache)
240240
pc.GET("", h.GetPersistentCaches)
241241

242242
// Open API router.

0 commit comments

Comments
 (0)