Skip to content

Commit 6ba1837

Browse files
authored
acc: support pipeline update (#2934)
## Changes - Support editing pipelines in testserver. - Rewrite pipeline code to match best practice in testserver (separate file in libs/testserver and separate block in handlers.go). ## Why Enables new acceptance tests, I'm using this in terraform-removal branch #2926 ## Tests New edit functionality is tested via #2926
1 parent f4edb94 commit 6ba1837

File tree

3 files changed

+101
-63
lines changed

3 files changed

+101
-63
lines changed

acceptance/internal/handlers.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/databricks/databricks-sdk-go/service/catalog"
1010
"github.com/databricks/databricks-sdk-go/service/iam"
11-
"github.com/databricks/databricks-sdk-go/service/pipelines"
1211

1312
"github.com/databricks/databricks-sdk-go/service/compute"
1413
"github.com/databricks/databricks-sdk-go/service/jobs"
@@ -188,32 +187,11 @@ func addDefaultHandlers(server *testserver.Server) {
188187
return req.Workspace.JobsReset(request)
189188
})
190189

191-
server.Handle("POST", "/api/2.0/pipelines", func(req testserver.Request) any {
192-
var request pipelines.PipelineSpec
193-
if err := json.Unmarshal(req.Body, &request); err != nil {
194-
return testserver.Response{
195-
Body: fmt.Sprintf("internal error: %s", err),
196-
StatusCode: 400,
197-
}
198-
}
199-
200-
return req.Workspace.PipelinesCreate(request)
201-
})
202-
203-
server.Handle("DELETE", "/api/2.0/pipelines/{pipeline_id}", func(req testserver.Request) any {
204-
return testserver.MapDelete(req.Workspace, req.Workspace.Pipelines, req.Vars["pipeline_id"])
205-
})
206-
207190
server.Handle("GET", "/api/2.2/jobs/get", func(req testserver.Request) any {
208191
jobId := req.URL.Query().Get("job_id")
209192
return req.Workspace.JobsGet(jobId)
210193
})
211194

212-
server.Handle("GET", "/api/2.0/pipelines/{pipeline_id}", func(req testserver.Request) any {
213-
pipelineId := req.Vars["pipeline_id"]
214-
return req.Workspace.PipelinesGet(pipelineId)
215-
})
216-
217195
server.Handle("GET", "/api/2.2/jobs/list", func(req testserver.Request) any {
218196
return req.Workspace.JobsList()
219197
})
@@ -274,6 +252,24 @@ func addDefaultHandlers(server *testserver.Server) {
274252
}
275253
})
276254

255+
// Pipelines:
256+
257+
server.Handle("GET", "/api/2.0/pipelines/{pipeline_id}", func(req testserver.Request) any {
258+
return req.Workspace.PipelineGet(req.Vars["pipeline_id"])
259+
})
260+
261+
server.Handle("POST", "/api/2.0/pipelines", func(req testserver.Request) any {
262+
return req.Workspace.PipelineCreate(req)
263+
})
264+
265+
server.Handle("PUT", "/api/2.0/pipelines/{pipeline_id}", func(req testserver.Request) any {
266+
return req.Workspace.PipelineUpdate(req, req.Vars["pipeline_id"])
267+
})
268+
269+
server.Handle("DELETE", "/api/2.0/pipelines/{pipeline_id}", func(req testserver.Request) any {
270+
return testserver.MapDelete(req.Workspace, req.Workspace.Pipelines, req.Vars["pipeline_id"])
271+
})
272+
277273
// Quality monitors:
278274

279275
server.Handle("GET", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any {

libs/testserver/fake_workspace.go

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/databricks/databricks-sdk-go/service/jobs"
1616
"github.com/databricks/databricks-sdk-go/service/pipelines"
1717
"github.com/databricks/databricks-sdk-go/service/workspace"
18-
"github.com/google/uuid"
1918
)
2019

2120
// FakeWorkspace holds a state of a workspace for acceptance tests.
@@ -232,28 +231,6 @@ func (s *FakeWorkspace) JobsReset(request jobs.ResetJob) Response {
232231
}
233232
}
234233

235-
func (s *FakeWorkspace) PipelinesCreate(r pipelines.PipelineSpec) Response {
236-
defer s.LockUnlock()()
237-
238-
pipelineId := uuid.New().String()
239-
240-
r.Id = pipelineId
241-
242-
// If the pipeline definition does not specify a catalog, it switches to Hive metastore mode
243-
// and if the storage location is not specified, API automatically generates a storage location
244-
// (ref: https://docs.databricks.com/gcp/en/dlt/hive-metastore#specify-a-storage-location)
245-
if r.Storage == "" && r.Catalog == "" {
246-
r.Storage = "dbfs:/pipelines/" + pipelineId
247-
}
248-
s.Pipelines[pipelineId] = r
249-
250-
return Response{
251-
Body: pipelines.CreatePipelineResponse{
252-
PipelineId: pipelineId,
253-
},
254-
}
255-
}
256-
257234
func (s *FakeWorkspace) JobsGet(jobId string) Response {
258235
id := jobId
259236

@@ -325,24 +302,6 @@ func (s *FakeWorkspace) JobsGetRun(runId int64) Response {
325302
}
326303
}
327304

328-
func (s *FakeWorkspace) PipelinesGet(pipelineId string) Response {
329-
defer s.LockUnlock()()
330-
331-
spec, ok := s.Pipelines[pipelineId]
332-
if !ok {
333-
return Response{
334-
StatusCode: 404,
335-
}
336-
}
337-
338-
return Response{
339-
Body: pipelines.GetPipelineResponse{
340-
PipelineId: pipelineId,
341-
Spec: &spec,
342-
},
343-
}
344-
}
345-
346305
func (s *FakeWorkspace) JobsList() Response {
347306
defer s.LockUnlock()()
348307

libs/testserver/pipelines.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package testserver
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/databricks/databricks-sdk-go/service/pipelines"
8+
"github.com/google/uuid"
9+
)
10+
11+
func (s *FakeWorkspace) PipelineGet(pipelineId string) Response {
12+
defer s.LockUnlock()()
13+
14+
spec, ok := s.Pipelines[pipelineId]
15+
if !ok {
16+
return Response{
17+
StatusCode: 404,
18+
}
19+
}
20+
21+
return Response{
22+
Body: pipelines.GetPipelineResponse{
23+
PipelineId: pipelineId,
24+
Spec: &spec,
25+
},
26+
}
27+
}
28+
29+
func (s *FakeWorkspace) PipelineCreate(req Request) Response {
30+
defer s.LockUnlock()()
31+
32+
var r pipelines.PipelineSpec
33+
err := json.Unmarshal(req.Body, &r)
34+
if err != nil {
35+
return Response{
36+
Body: fmt.Sprintf("cannot unmarshal request body: %s", err),
37+
StatusCode: 400,
38+
}
39+
}
40+
41+
pipelineId := uuid.New().String()
42+
r.Id = pipelineId
43+
44+
// If the pipeline definition does not specify a catalog, it switches to Hive metastore mode
45+
// and if the storage location is not specified, API automatically generates a storage location
46+
// (ref: https://docs.databricks.com/gcp/en/dlt/hive-metastore#specify-a-storage-location)
47+
if r.Storage == "" && r.Catalog == "" {
48+
r.Storage = "dbfs:/pipelines/" + pipelineId
49+
}
50+
s.Pipelines[pipelineId] = r
51+
52+
return Response{
53+
Body: pipelines.CreatePipelineResponse{
54+
PipelineId: pipelineId,
55+
},
56+
}
57+
}
58+
59+
func (s *FakeWorkspace) PipelineUpdate(req Request, pipelineId string) Response {
60+
defer s.LockUnlock()()
61+
62+
var request pipelines.PipelineSpec
63+
err := json.Unmarshal(req.Body, &request)
64+
if err != nil {
65+
return Response{
66+
Body: fmt.Sprintf("internal error: %s", err),
67+
StatusCode: 400,
68+
}
69+
}
70+
71+
_, exists := s.Pipelines[pipelineId]
72+
if !exists {
73+
return Response{
74+
StatusCode: 404,
75+
}
76+
}
77+
78+
s.Pipelines[pipelineId] = request
79+
80+
return Response{
81+
Body: pipelines.EditPipelineResponse{},
82+
}
83+
}

0 commit comments

Comments
 (0)