Skip to content

init messenger #1370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
# - rmb-sdk-go
- user-contracts-mon
- tfrobot
- messenger

steps:
- name: Checkout the repo
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ jobs:
- rmb-sdk-go
- user-contracts-mon
- tfrobot
- messenger
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ jobs:
- rmb-sdk-go
- user-contracts-mon
- tfrobot
- messenger
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
DIRS := "activation-service" "farmerbot" "grid-cli" "grid-client" "grid-proxy" "gridify" "monitoring-bot" "rmb-sdk-go" "user-contracts-mon" "tfrobot" "node-registrar"
DIRS := activation-service \
farmerbot \
grid-cli \
grid-client \
grid-proxy \
gridify \
monitoring-bot \
rmb-sdk-go \
user-contracts-mon \
tfrobot \
messenger

mainnet-release:
cd grid-client && go get github.com/threefoldtech/tfchain/clients/tfchain-client-go@5d6a2dd
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This repo contains the go clients for Threefold grid.
- [user contracts mon](./user-contracts-mon/README.md)
- [activation service](./activation-service/README.md)
- [farmerbot](./farmerbot/README.md)
- [messenger](./messenger/README.md)

## Release

Expand Down
27 changes: 0 additions & 27 deletions \

This file was deleted.

1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use (
./grid-client
./grid-proxy
./gridify
./messenger
./monitoring-bot
./rmb-sdk-go
./tfrobot
Expand Down
51 changes: 29 additions & 22 deletions go.work.sum

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions messenger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Messenger Package

The Messenger package provides a Go SDK for building distributed messaging applications on top of the Mycelium network infrastructure. It offers a topic-based protocol registration system that enables developers to create custom server/client implementations with optional blockchain identity integration.

## Overview

The Messenger package serves as a high-level abstraction over the Mycelium messaging infrastructure, providing:

- **Topic-based message routing**: Register handlers for specific message topics
- **Bidirectional communication**: Send messages and receive replies
- **Optional blockchain identity**: Integrate with ThreeFold Chain for identity management
- **JSON-RPC support**: Built-in JSON-RPC server/client implementation

## Mycelium Infrastructure

The Mycelium daemon provides distinct communication methods:

1. HTTP REST server `:8989`
2. RPC server `:9090`
3. CLI `mycelium message` calling the reset server

For more info check [mycelium docs](https://github.com/threefoldtech/mycelium/tree/master/docs)


### Usage Patterns

- **Sending Messages**: Uses CLI command `mycelium message send <destination> <payload> [--topic <topic>] [--wait] [--timeout <seconds>]`
- **Receiving Messages**: Uses CLI command `mycelium message receive` in a polling loop
- **Sending Replies**: Uses HTTP POST to `/api/v1/messages/reply/{messageId}`
- **Getting Identity**: Uses HTTP GET to `/api/v1/admin`

## Messenger Core Component

The Messenger serves as the main orchestration component with the following key features:

### Topic-Based Protocol Registration

The Messenger implements a topic-based routing system where different message handlers can be registered for specific topics:

```go
// Register a handler for a specific topic
messenger.RegisterHandler("my-topic", func(ctx context.Context, message *Message) ([]byte, error) {
// Handle the message
return response, nil
})
```

### Message Structure

```go
type Message struct {
ID string `json:"id,omitempty"` // Unique message identifier
Topic string `json:"topic,omitempty"` // Message topic for routing
SrcIP string `json:"srcIp,omitempty"` // Source IP address
SrcPK string `json:"srcPk,omitempty"` // Source public key
DstIP string `json:"dstIp,omitempty"` // Destination IP address
DstPK string `json:"dstPk,omitempty"` // Destination public key
Payload string `json:"payload,omitempty"` // Message payload (raw string)
}
```

### Core Operations

- **Send Message**: Send a message to a destination with optional reply waiting
- **Register Handler**: Register topic-specific message handlers
- **Start/Stop Receiver**: Control the message listening loop
- **Send Reply**: Reply to a received message

## Chain Identity Management

The package functions independently without chain identity integration by default. However, it offers optional blockchain identity features:

### Enabling Chain Identity

To enable chain identity management, use the `WithEnableTwinIdentity(true)` configuration option:

```go
messenger, err := messenger.NewMessenger(
messenger.WithSubstrateManager(manager),
messenger.WithMnemonic(mnemonic),
messenger.WithEnableTwinIdentity(true), // Enable chain identity
)
```

### Required Configuration

When chain identity is enabled, the following are required:
- **Substrate Manager**: Connection to ThreeFold Chain
- **Identity or Mnemonic**: Either a substrate identity or mnemonic phrase

### Identity Lifecycle

1. **Startup**: Automatic identity updates on chain during messenger initialization
- Retrieves Mycelium node information via HTTP API
- Updates the MyceliumTwin mapping on the blockchain

2. **Message Processing**: Identity retrieval and context storage
- For each incoming message, retrieves the sender's twin ID from the blockchain
- Stores twin ID in the message context for handler use
- Accessible via `TwinIdContextKey` context key

### MyceliumTwin Storage Mapping

The chain identity feature leverages the MyceliumTwin storage mapping on the ThreeFold blockchain, which:
- Maps Mycelium public keys to Twin IDs
- Enables identity verification and authorization
- Provides a bridge between Mycelium network and blockchain identity

## JSON-RPC Implementation

The package includes a complete JSON-RPC server/client implementation registered under the 'rpc' topic key:

### JSON-RPC Server

```go
server := messenger.NewJSONRPCServer(msgr)
server.RegisterHandler("calculator.add", func(ctx context.Context, params json.RawMessage) (interface{}, error) {
// Handle RPC method
return result, nil
})
server.Start(ctx)
```

### JSON-RPC Client

```go
client := messenger.NewJSONRPCClient(msgr)
var result float64
err := client.Call(ctx, destination, "calculator.add", []float64{10, 20}, &result)
```
## Configuration

### Configuration Options

The Messenger supports various configuration options through functional options:

```go
type MessengerOpt func(*Messenger)

// Available options:
messenger.WithMnemonic(mnemonic) // Set mnemonic phrase
messenger.WithIdentity(identity) // Set substrate identity directly
messenger.WithEnableTwinIdentity(true) // Enable chain identity management
messenger.WithSubstrateManager(manager) // Set substrate manager
messenger.WithBinaryPath("/path/to/mycelium") // Set custom mycelium binary path
messenger.WithAPIAddress("http://localhost:8989") // Set custom API address
```

### Default Values

```go
const (
DefaultMessengerBinary = "mycelium" // Mycelium binary path
DefaultAPIAddress = "http://127.0.0.1:8989" // Mycelium HTTP API address
DefaultTimeout = 60 // Default timeout in seconds
DefaultRetryListenerInterval = 100 * time.Millisecond // Retry interval for message listener
)
```
51 changes: 51 additions & 0 deletions messenger/examples/jsonrpc/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/messenger"
)

const (
chainUrl = "ws://192.168.1.10:9944"

// destination is mycelium pk or ip
destination = "22b45ca2c6c40650fa4c739942a7c863deeb4a88a6a2cb38b8c9b273f4ad7b0c"
)

func main() {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04"}).With().Logger()
mnemonic := os.Getenv("MNEMONIC")

man := substrate.NewManager(chainUrl)

msgr, err := messenger.NewMessenger(
messenger.WithSubstrateManager(man),
messenger.WithMnemonic(mnemonic),
messenger.WithEnableTwinIdentity(true),
)
if err != nil {
fmt.Printf("Failed to create Mycelium client: %v\n", err)
os.Exit(1)
}
defer msgr.Close()

rpcClient := messenger.NewJSONRPCClient(msgr)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

var addResult float64
// should timeout if no response
err = rpcClient.Call(ctx, destination, "calculator.add", []float64{10, 20}, &addResult)
if err != nil {
fmt.Printf("Failed to call calculator.add: %v\n", err)
os.Exit(1)
}
fmt.Printf("10 + 20 = %f\n", addResult)
}
84 changes: 84 additions & 0 deletions messenger/examples/jsonrpc/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/messenger"
)

// API
type Calculator struct{}

func (c *Calculator) Add(a, b float64) float64 {
return a + b
}

// HANDLERS
func addHandler(ctx context.Context, calc *Calculator, params json.RawMessage) (interface{}, error) {
var args []float64
if err := json.Unmarshal(params, &args); err != nil {
return nil, fmt.Errorf("invalid parameters: %w", err)
}

if len(args) != 2 {
return nil, fmt.Errorf("expected 2 parameters, got %d", len(args))
}

result := calc.Add(args[0], args[1])
return result, nil
}

const (
chainUrl = "ws://192.168.1.10:9944"
)

func main() {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04"}).With().Timestamp().Logger()
mnemonic := os.Getenv("MNEMONIC")

manager := substrate.NewManager(chainUrl)

msgr, err := messenger.NewMessenger(
messenger.WithSubstrateManager(manager),
messenger.WithMnemonic(mnemonic),
messenger.WithEnableTwinIdentity(true),
)

if err != nil {
fmt.Printf("Failed to create messenger: %v\n", err)
os.Exit(1)
}
defer msgr.Close()

server := messenger.NewJSONRPCServer(msgr)
calc := &Calculator{}

server.RegisterHandler("calculator.add", func(ctx context.Context, params json.RawMessage) (interface{}, error) {
return addHandler(ctx, calc, params)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := server.Start(ctx); err != nil {
fmt.Printf("Failed to start server: %v\n", err)
os.Exit(1)
}

fmt.Println("Server started. Press Ctrl+C to stop.")

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh

server.Stop()
fmt.Println("Server stopped.")
}
Loading