Skip to content

Commit 2c44759

Browse files
authored
Merge branch 'main' into pulsar-subscription-mode
2 parents e718afd + a9dafe6 commit 2c44759

File tree

7 files changed

+397
-3
lines changed

7 files changed

+397
-3
lines changed

bindings/gcp/bucket/bucket.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ const (
5454
metadataKeyBC = "name"
5555
signOperation = "sign"
5656
bulkGetOperation = "bulkGet"
57+
copyOperation = "copy"
58+
renameOperation = "rename"
59+
moveOperation = "move"
5760
)
5861

5962
// GCPStorage allows saving data to GCP bucket storage.
@@ -87,6 +90,7 @@ type listPayload struct {
8790
MaxResults int32 `json:"maxResults"`
8891
Delimiter string `json:"delimiter"`
8992
}
93+
9094
type signResponse struct {
9195
SignURL string `json:"signURL"`
9296
}
@@ -142,6 +146,9 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
142146
bindings.ListOperation,
143147
signOperation,
144148
bulkGetOperation,
149+
copyOperation,
150+
renameOperation,
151+
moveOperation,
145152
}
146153
}
147154

@@ -161,6 +168,12 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
161168
return g.sign(ctx, req)
162169
case bulkGetOperation:
163170
return g.bulkGet(ctx, req)
171+
case copyOperation:
172+
return g.copy(ctx, req)
173+
case renameOperation:
174+
return g.rename(ctx, req)
175+
case moveOperation:
176+
return g.move(ctx, req)
164177
default:
165178
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
166179
}
@@ -498,3 +511,110 @@ func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (
498511
Data: jsonResponse,
499512
}, nil
500513
}
514+
515+
type movePayload struct {
516+
DestinationBucket string `json:"destinationBucket"`
517+
}
518+
519+
func (g *GCPStorage) move(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
520+
var key string
521+
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
522+
key = val
523+
} else {
524+
return nil, errors.New("gcp bucket binding error: can't read key value")
525+
}
526+
527+
var payload movePayload
528+
err := json.Unmarshal(req.Data, &payload)
529+
if err != nil {
530+
return nil, errors.New("gcp bucket binding error: invalid move payload")
531+
}
532+
533+
if payload.DestinationBucket == "" {
534+
return nil, errors.New("gcp bucket binding error: required 'destinationBucket' missing")
535+
}
536+
537+
src := g.client.Bucket(g.metadata.Bucket).Object(key)
538+
dst := g.client.Bucket(payload.DestinationBucket).Object(key)
539+
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
540+
return nil, fmt.Errorf("gcp bucket binding error while copying object: %w", err)
541+
}
542+
543+
if err := src.Delete(ctx); err != nil {
544+
return nil, fmt.Errorf("gcp bucket binding error while deleting object: %w", err)
545+
}
546+
547+
return &bindings.InvokeResponse{
548+
Data: []byte(fmt.Sprintf("object %s moved to %s", key, payload.DestinationBucket)),
549+
}, nil
550+
}
551+
552+
type renamePayload struct {
553+
NewName string `json:"newName"`
554+
}
555+
556+
func (g *GCPStorage) rename(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
557+
var key string
558+
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
559+
key = val
560+
} else {
561+
return nil, errors.New("gcp bucket binding error: can't read key value")
562+
}
563+
564+
var payload renamePayload
565+
err := json.Unmarshal(req.Data, &payload)
566+
if err != nil {
567+
return nil, errors.New("gcp bucket binding error: invalid rename payload")
568+
}
569+
570+
if payload.NewName == "" {
571+
return nil, errors.New("gcp bucket binding error: required 'newName' missing")
572+
}
573+
574+
src := g.client.Bucket(g.metadata.Bucket).Object(key)
575+
dst := g.client.Bucket(g.metadata.Bucket).Object(payload.NewName)
576+
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
577+
return nil, fmt.Errorf("gcp bucket binding error while copying object: %w", err)
578+
}
579+
580+
if err := src.Delete(ctx); err != nil {
581+
return nil, fmt.Errorf("gcp bucket binding error while deleting object: %w", err)
582+
}
583+
584+
return &bindings.InvokeResponse{
585+
Data: []byte(fmt.Sprintf("object %s renamed to %s", key, payload.NewName)),
586+
}, nil
587+
}
588+
589+
type copyPayload struct {
590+
DestinationBucket string `json:"destinationBucket"`
591+
}
592+
593+
func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
594+
var key string
595+
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
596+
key = val
597+
} else {
598+
return nil, errors.New("gcp bucket binding error: can't read key value")
599+
}
600+
601+
var payload copyPayload
602+
err := json.Unmarshal(req.Data, &payload)
603+
if err != nil {
604+
return nil, errors.New("gcp bucket binding error: invalid copy payload")
605+
}
606+
607+
if payload.DestinationBucket == "" {
608+
return nil, errors.New("gcp bucket binding error: required 'destinationBucket' missing")
609+
}
610+
611+
src := g.client.Bucket(g.metadata.Bucket).Object(key)
612+
dst := g.client.Bucket(payload.DestinationBucket).Object(key)
613+
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
614+
return nil, fmt.Errorf("gcp bucket binding error while copying object: %w", err)
615+
}
616+
617+
return &bindings.InvokeResponse{
618+
Data: []byte(fmt.Sprintf("object %s copied to %s", key, payload.DestinationBucket)),
619+
}, nil
620+
}

bindings/gcp/bucket/bucket_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,112 @@ func TestBulkGetOption(t *testing.T) {
265265
require.Error(t, err)
266266
})
267267
}
268+
269+
func TestCopyOption(t *testing.T) {
270+
gs := GCPStorage{logger: logger.NewLogger("test")}
271+
gs.metadata = &gcpMetadata{}
272+
273+
t.Run("return error if key is missing", func(t *testing.T) {
274+
r := bindings.InvokeRequest{}
275+
_, err := gs.copy(t.Context(), &r)
276+
require.Error(t, err)
277+
assert.Equal(t, "gcp bucket binding error: can't read key value", err.Error())
278+
})
279+
280+
t.Run("return error if data is not valid json", func(t *testing.T) {
281+
r := bindings.InvokeRequest{
282+
Metadata: map[string]string{
283+
"key": "my_key",
284+
},
285+
}
286+
_, err := gs.copy(t.Context(), &r)
287+
require.Error(t, err)
288+
assert.Equal(t, "gcp bucket binding error: invalid copy payload", err.Error())
289+
})
290+
291+
t.Run("return error if destinationBucket is missing", func(t *testing.T) {
292+
r := bindings.InvokeRequest{
293+
Data: []byte(`{}`),
294+
Metadata: map[string]string{
295+
"key": "my_key",
296+
},
297+
}
298+
_, err := gs.copy(t.Context(), &r)
299+
require.Error(t, err)
300+
assert.Equal(t, "gcp bucket binding error: required 'destinationBucket' missing", err.Error())
301+
})
302+
}
303+
304+
func TestRenameOption(t *testing.T) {
305+
gs := GCPStorage{logger: logger.NewLogger("test")}
306+
gs.metadata = &gcpMetadata{}
307+
308+
t.Run("return error if key is missing", func(t *testing.T) {
309+
r := bindings.InvokeRequest{
310+
Data: []byte(`{"newName": "my_new_name"}`),
311+
}
312+
_, err := gs.rename(t.Context(), &r)
313+
require.Error(t, err)
314+
assert.Equal(t, "gcp bucket binding error: can't read key value", err.Error())
315+
})
316+
317+
t.Run("return error if data is not valid json", func(t *testing.T) {
318+
r := bindings.InvokeRequest{
319+
Metadata: map[string]string{
320+
"key": "my_key",
321+
},
322+
}
323+
_, err := gs.rename(t.Context(), &r)
324+
require.Error(t, err)
325+
assert.Equal(t, "gcp bucket binding error: invalid rename payload", err.Error())
326+
})
327+
328+
t.Run("return error if newName is missing", func(t *testing.T) {
329+
r := bindings.InvokeRequest{
330+
Data: []byte(`{}`),
331+
Metadata: map[string]string{
332+
"key": "my_key",
333+
},
334+
}
335+
_, err := gs.rename(t.Context(), &r)
336+
require.Error(t, err)
337+
assert.Equal(t, "gcp bucket binding error: required 'newName' missing", err.Error())
338+
})
339+
}
340+
341+
func TestMoveOption(t *testing.T) {
342+
gs := GCPStorage{logger: logger.NewLogger("test")}
343+
gs.metadata = &gcpMetadata{}
344+
345+
t.Run("return error if key is missing", func(t *testing.T) {
346+
r := bindings.InvokeRequest{
347+
Data: []byte(`{"destinationBucket": "my_bucket"}`),
348+
}
349+
_, err := gs.move(t.Context(), &r)
350+
require.Error(t, err)
351+
assert.Equal(t, "gcp bucket binding error: can't read key value", err.Error())
352+
})
353+
354+
t.Run("return error if data is not valid json", func(t *testing.T) {
355+
r := bindings.InvokeRequest{
356+
Metadata: map[string]string{
357+
"key": "my_key",
358+
},
359+
}
360+
_, err := gs.move(t.Context(), &r)
361+
require.Error(t, err)
362+
assert.Equal(t, "gcp bucket binding error: invalid move payload", err.Error())
363+
})
364+
365+
t.Run("return error if destinationBucket is missing", func(t *testing.T) {
366+
r := bindings.InvokeRequest{
367+
Data: []byte(`{}`),
368+
Metadata: map[string]string{
369+
"key": "my_key",
370+
},
371+
}
372+
_, err := gs.move(t.Context(), &r)
373+
require.Error(t, err)
374+
assert.Equal(t, "gcp bucket binding error: required 'destinationBucket' missing", err.Error())
375+
})
376+
}

conversation/metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ type LangchainMetadata struct {
2626
Key string `json:"key"`
2727
Model string `json:"model"`
2828
CacheTTL string `json:"cacheTTL"`
29+
Endpoint string `json:"endpoint"`
2930
}

conversation/metadata_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package conversation
15+
16+
import (
17+
"encoding/json"
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestLangchainMetadata(t *testing.T) {
25+
t.Run("json marshaling with endpoint", func(t *testing.T) {
26+
metadata := LangchainMetadata{
27+
Key: "test-key",
28+
Model: "gpt-4",
29+
CacheTTL: "10m",
30+
Endpoint: "https://custom-endpoint.example.com",
31+
}
32+
33+
bytes, err := json.Marshal(metadata)
34+
require.NoError(t, err)
35+
36+
var unmarshaled LangchainMetadata
37+
err = json.Unmarshal(bytes, &unmarshaled)
38+
require.NoError(t, err)
39+
40+
assert.Equal(t, metadata.Key, unmarshaled.Key)
41+
assert.Equal(t, metadata.Model, unmarshaled.Model)
42+
assert.Equal(t, metadata.CacheTTL, unmarshaled.CacheTTL)
43+
assert.Equal(t, metadata.Endpoint, unmarshaled.Endpoint)
44+
})
45+
46+
t.Run("json unmarshaling with endpoint", func(t *testing.T) {
47+
jsonStr := `{"key": "test-key", "endpoint": "https://api.openai.com/v1"}`
48+
49+
var metadata LangchainMetadata
50+
err := json.Unmarshal([]byte(jsonStr), &metadata)
51+
require.NoError(t, err)
52+
53+
assert.Equal(t, "test-key", metadata.Key)
54+
assert.Equal(t, "https://api.openai.com/v1", metadata.Endpoint)
55+
})
56+
}

conversation/openai/metadata.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ metadata:
2727
The OpenAI LLM to use. Defaults to gpt-4o
2828
type: string
2929
example: 'gpt-4-turbo'
30+
- name: endpoint
31+
required: false
32+
description: |
33+
Custom API endpoint URL for OpenAI API-compatible services. If not specified, the default OpenAI API endpoint will be used.
34+
type: string
35+
example: 'https://api.openai.com/v1'
3036
- name: cacheTTL
3137
required: false
3238
description: |

conversation/openai/openai.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,18 @@ func (o *OpenAI) Init(ctx context.Context, meta conversation.Metadata) error {
5454
if md.Model != "" {
5555
model = md.Model
5656
}
57-
58-
llm, err := openai.New(
57+
// Create options for OpenAI client
58+
options := []openai.Option{
5959
openai.WithModel(model),
6060
openai.WithToken(md.Key),
61-
)
61+
}
62+
63+
// Add custom endpoint if provided
64+
if md.Endpoint != "" {
65+
options = append(options, openai.WithBaseURL(md.Endpoint))
66+
}
67+
68+
llm, err := openai.New(options...)
6269
if err != nil {
6370
return err
6471
}

0 commit comments

Comments
 (0)