Skip to content

Commit 8188c09

Browse files
committed
add rmb client in go
1 parent 2999358 commit 8188c09

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+4781
-0
lines changed

rmb-sdk-go/Makefile

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
PWD := $(shell pwd)
2+
GOPATH := $(shell go env GOPATH)
3+
4+
all: verifiers test
5+
6+
test:
7+
@echo "Running Tests"
8+
go test -v ./...
9+
10+
coverage: clean
11+
@echo "Installing gopherbadger" && go get -u github.com/jpoles1/gopherbadger && go install github.com/jpoles1/gopherbadger
12+
mkdir coverage
13+
go test -v -vet=off ./... -coverprofile=coverage/coverage.out
14+
go tool cover -html=coverage/coverage.out -o coverage/coverage.html
15+
@${GOPATH}/bin/gopherbadger -png=false -md="README.md"
16+
rm coverage.out
17+
go mod tidy
18+
19+
clean:
20+
rm ./coverage -rf
21+
rm ./bin -rf
22+
23+
getverifiers:
24+
@echo "Installing golangci-lint" && go install github.com/golangci/golangci-lint/cmd/golangci-lint
25+
go mod tidy
26+
27+
lint:
28+
@echo "Running $@"
29+
golangci-lint run -c ../.golangci.yml

rmb-sdk-go/README.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
[![Go Documentation](https://godocs.io/github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go?status.svg)](https://godocs.io/github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go)
2+
3+
# Introduction
4+
5+
This is a `GO` sdk that can be used to build both **services**, and **clients**
6+
that can talk over the `rmb`.
7+
8+
[RMB](https://github.com/threefoldtech/rmb-rs) is a message bus that enable secure
9+
and reliable `RPC` calls across the globe.
10+
11+
`RMB` itself does not implement an RPC protocol, but just the secure and reliable messaging
12+
hence it's up to server and client to implement their own data format.
13+
14+
## How it works
15+
16+
If two processes needed to communicate over `RMB`, they both need to have some sort of a connection to an `rmb-relay`.\
17+
This connection could be established using a `direct-client`, or an `rmb-peer`.
18+
19+
### Direct client
20+
21+
A process could connect to an `rmb-relay` using a direct client.\
22+
To create a new direct client instance, a process needs to have:
23+
24+
- A valid mnemonics, with an activated account on the TFChain.
25+
- The key type of these mnemonics.
26+
- A relay URL that the direct client will connect to.
27+
- A session id. This could be anything, but a twin must only have a unique session id per connection.
28+
- A substrate connection.
29+
30+
#### **Example**
31+
32+
Creating a new direct client instance:
33+
34+
```Go
35+
subManager := substrate.NewManager("wss://tfchain.dev.grid.tf/ws")
36+
sub, err := subManager.Substrate()
37+
if err != nil {
38+
return fmt.Errorf("failed to connect to substrate: %w", err)
39+
}
40+
41+
defer sub.Close()
42+
client, err := direct.NewRpcClient(direct.KeyTypeSr25519, mnemonics, "wss://relay.dev.grid.tf", "test-client", sub, false)
43+
if err != nil {
44+
return fmt.Errorf("failed to create direct client: %w", err)
45+
}
46+
```
47+
48+
Assuming there is a remote calculator process that could add two integers, an rmb call using the direct client would look like this:
49+
50+
```Go
51+
x := 1
52+
y := 2
53+
var sum int
54+
err := client.Call(ctx, destinationTwinID, "calculator.add", []int{x, y}, &sum)
55+
```

rmb-sdk-go/client.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package rmb
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"encoding/json"
7+
"fmt"
8+
"time"
9+
10+
"github.com/gomodule/redigo/redis"
11+
"github.com/google/uuid"
12+
"github.com/pkg/errors"
13+
)
14+
15+
const (
16+
DefaultSchema = "application/json"
17+
18+
systemLocalBus = "msgbus.system.local"
19+
20+
// DefaultAddress default redis address when no address is passed
21+
DefaultAddress = "tcp://127.0.0.1:6379"
22+
)
23+
24+
type redisClient struct {
25+
pool *redis.Pool
26+
}
27+
28+
// Default return instance of to default (local) rmb
29+
// shortcut for NewClient(DefaultAddress)
30+
func Default() (Client, error) {
31+
return NewRMBClient(DefaultAddress)
32+
}
33+
34+
// NewRMBClient creates a new rmb client that runs behind an rmb-peer. This
35+
// client does not talk to the rmb relay directly, instead talk to an rmb-peer
36+
// instance (like a gateway) that itself maintains a connection to the relay.
37+
// the rmb-peer does all the heavy lifting, including signing, encryption,
38+
// validation of the response, etc...
39+
//
40+
// hence the address in this case, is an address to the local redis that must
41+
// be the same one used with the rmb-peer process.
42+
//
43+
// for more details about rmb-peer please check https://github.com/threefoldtech/rmb-rs
44+
// Since the rmb protocol does not specify a "payload" format this Client and the DefaultRouter
45+
// both uses json to encode and decode the rpc body. Hence this client should be always
46+
// 100% compatible with services built with the DefaultRouter.
47+
func NewRMBClient(address string, poolSize ...uint32) (Client, error) {
48+
49+
if len(address) == 0 {
50+
address = DefaultAddress
51+
}
52+
53+
pool, err := NewRedisPool(address, poolSize...)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
return &redisClient{
59+
pool: pool,
60+
}, nil
61+
}
62+
63+
// Close closes the rmb client
64+
func (c *redisClient) Close() error {
65+
return c.pool.Close()
66+
}
67+
68+
// Call calls the twin with given function and message. Can return a RemoteError if error originated by remote peer
69+
// in that case it should also include extra Code
70+
func (c *redisClient) Call(ctx context.Context, twin uint32, fn string, data interface{}, result interface{}) error {
71+
bytes, err := json.Marshal(data)
72+
if err != nil {
73+
return errors.Wrap(err, "failed to serialize request data")
74+
}
75+
76+
var ttl uint64 = 5 * 60
77+
deadline, ok := ctx.Deadline()
78+
if ok {
79+
ttl = uint64(time.Until(deadline).Seconds())
80+
}
81+
82+
queue := uuid.NewString()
83+
msg := Request{
84+
Version: 1,
85+
Expiration: int(ttl),
86+
Command: fn,
87+
TwinDest: []uint32{twin},
88+
Data: base64.StdEncoding.EncodeToString(bytes),
89+
Schema: DefaultSchema,
90+
RetQueue: queue,
91+
}
92+
93+
bytes, err = json.Marshal(msg)
94+
if err != nil {
95+
return errors.Wrap(err, "failed to serialize message")
96+
}
97+
con := c.pool.Get()
98+
defer con.Close()
99+
100+
_, err = con.Do("RPUSH", systemLocalBus, bytes)
101+
if err != nil {
102+
return errors.Wrap(err, "failed to push message to local twin")
103+
}
104+
105+
// now wait for response.
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
return ctx.Err()
110+
default:
111+
}
112+
113+
slice, err := redis.ByteSlices(con.Do("BLPOP", queue, 5))
114+
if err != nil && err != redis.ErrNil {
115+
return errors.Wrap(err, "unexpected error during waiting for the response")
116+
}
117+
118+
if err == redis.ErrNil || slice == nil {
119+
//timeout, just try again immediately
120+
continue
121+
}
122+
123+
// found a response
124+
bytes = slice[1]
125+
break
126+
}
127+
128+
var ret IncomingResponse
129+
130+
// we have a response, so load or fail
131+
if err := json.Unmarshal(bytes, &ret); err != nil {
132+
return errors.Wrap(err, "failed to load response message")
133+
}
134+
// errorred ?
135+
if ret.Error != nil {
136+
return RemoteError{
137+
Code: ret.Error.Code,
138+
Message: ret.Error.Message,
139+
}
140+
}
141+
142+
// not expecting a result
143+
if result == nil {
144+
return nil
145+
}
146+
147+
if ret.Schema != DefaultSchema {
148+
return fmt.Errorf("received invalid schema '%s' was expecting %s", ret.Schema, DefaultSchema)
149+
}
150+
151+
if len(ret.Data) == 0 {
152+
return fmt.Errorf("no response body was returned")
153+
}
154+
155+
bytes, err = base64.StdEncoding.DecodeString(ret.Data)
156+
if err != nil {
157+
return errors.Wrap(err, "invalid data body encoding")
158+
}
159+
160+
if err := json.Unmarshal(bytes, result); err != nil {
161+
return errors.Wrap(err, "failed to decode response body")
162+
}
163+
164+
return nil
165+
}
166+
167+
type RemoteError struct {
168+
Code uint32
169+
Message string
170+
}
171+
172+
func (e RemoteError) Error() string {
173+
return e.Message
174+
}

rmb-sdk-go/examples/client/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Introduction
2+
3+
This is a `Go` example for the `RMB` [default local client](https://github.com/threefoldtech/tfgrid-sdk-go/blob/development/rmb-sdk-go/client.go) that is used send `RMB` messages.
4+
5+
## How it works
6+
7+
To use the example, you needs to:
8+
9+
- A twinId of a remote calculator process that could add two integers
10+
11+
## Usage
12+
13+
Make sure to have a remote process that the client can call with valid twinId.
14+
15+
Run the client and wait for the response.

rmb-sdk-go/examples/client/main.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"time"
8+
9+
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
10+
)
11+
12+
func app() error {
13+
14+
client, err := rmb.Default()
15+
16+
if err != nil {
17+
return err
18+
}
19+
20+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
21+
defer cancel()
22+
23+
const dst = 7 // <- replace this with the twin id of where the service is running
24+
// it's okay to run both the server and the client behind the same rmb-peer
25+
var output float64
26+
if err := client.Call(ctx, dst, "calculator.add", []float64{10, 20}, &output); err != nil {
27+
return err
28+
}
29+
30+
fmt.Printf("output: %f\n", output)
31+
32+
return nil
33+
}
34+
35+
func main() {
36+
if err := app(); err != nil {
37+
fmt.Fprintln(os.Stderr, err)
38+
os.Exit(1)
39+
}
40+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Introduction
2+
3+
This is a `Go` example for the `RMB` [rpc client](https://github.com/threefoldtech/tfgrid-sdk-go/blob/development/rmb-sdk-go/peer/rpc.go) that can send `RMB` messages through working with zos peer
4+
5+
## How it works
6+
7+
To use the example, you needs to:
8+
9+
- Set the mnemonics variable to a valid mnemonics, with an activated account on the TFChain.
10+
- A node id to send the call to
11+
12+
## Usage
13+
14+
Make sure to use a valid node id that the client can call with valid twinId.
15+
16+
Run the client and wait for the response.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
10+
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
11+
)
12+
13+
type version struct {
14+
ZOS string `json:"zos"`
15+
ZInit string `json:"zinit"`
16+
}
17+
18+
func app() error {
19+
mnemonics := "<mnemonics goes here>"
20+
subNodeURL := "wss://tfchain.dev.grid.tf/ws"
21+
relayURL := "wss://relay.dev.grid.tf"
22+
23+
subManager := substrate.NewManager(subNodeURL)
24+
25+
client, err := peer.NewRpcClient(context.Background(), mnemonics, subManager, peer.WithRelay(relayURL), peer.WithSession("test-client"))
26+
if err != nil {
27+
return fmt.Errorf("failed to create direct client: %w", err)
28+
}
29+
30+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
31+
defer cancel()
32+
33+
const dstTwin uint32 = 11 // <- replace this with any node i
34+
var ver version
35+
if err := client.Call(ctx, dstTwin, "zos.system.version", nil, &ver); err != nil {
36+
return err
37+
}
38+
39+
fmt.Printf("output: %s\n", ver)
40+
return nil
41+
}
42+
43+
func main() {
44+
if err := app(); err != nil {
45+
log.Fatal(err)
46+
}
47+
}

0 commit comments

Comments
 (0)