Skip to content

draft #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft

draft #703

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions monitor/event/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package event

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

type EventInterface interface {
Contract() common.Address
Topic() common.Hash
HandleEvent(log types.Log) error
Channel() <-chan interface{}
}
75 changes: 75 additions & 0 deletions monitor/event/tasksettle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package event

import (
"errors"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"

"github.com/iotexproject/w3bstream/smartcontracts/go/taskmanager"
)

type settleTask struct {
contractAddr common.Address
contract *taskmanager.Taskmanager
contractABI abi.ABI
channel chan interface{}
}

type TaskSettleEvent struct {
ProjectId uint64
TaskId [32]byte
TxHash [32]byte
BlockNumber uint64
}

func NewSettleTask(address common.Address, client *ethclient.Client) EventInterface {
ta, err := taskmanager.NewTaskmanager(address, client)
if err != nil {
panic(err)
}

tokenSOLCashierABI, err := abi.JSON(strings.NewReader(taskmanager.TaskmanagerABI))
if err != nil {
panic(err)
}
return &settleTask{
contract: ta,
contractAddr: address,
contractABI: tokenSOLCashierABI,
channel: make(chan interface{}, 1),
}
}

func (s *settleTask) Contract() common.Address {
return s.contractAddr
}

func (s *settleTask) Topic() common.Hash {
return s.contractABI.Events["TaskSettled"].ID
}

func (s *settleTask) Channel() <-chan interface{} {
return s.channel
}

func (s *settleTask) HandleEvent(log types.Log) error {
ret, err := s.contract.ParseTaskSettled(log)
if err != nil {
return err
}
select {
case s.channel <- TaskSettleEvent{
ProjectId: ret.ProjectId.Uint64(),
TaskId: ret.TaskId,
TxHash: log.TxHash,
BlockNumber: log.BlockNumber,
}:
return nil
default:
return errors.New("channel is full")
}
}
181 changes: 181 additions & 0 deletions monitor/monitor2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package monitor

import (
"context"
"log/slog"
"math/big"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"

"github.com/iotexproject/w3bstream/monitor/event"
)

type (
Monitor struct {
eventMap map[eventID]event.EventInterface
addresses []common.Address
topics []common.Hash
client *ethclient.Client

startHeight int64
cancel context.CancelFunc
wg sync.WaitGroup
}

eventID [52]byte // address + topic
)

const (
//TODO
_scanInterval = 500
)

func NewMonitor(events []event.EventInterface, client *ethclient.Client) *Monitor {
m := &Monitor{
client: client,
startHeight: -1,
}

m.eventMap = make(map[eventID]event.EventInterface)
addressMap := make(map[common.Address]struct{})
topicMap := make(map[common.Hash]struct{})
for _, e := range events {
addressMap[e.Contract()] = struct{}{}
topicMap[e.Topic()] = struct{}{}
m.eventMap[m.eventID(e.Contract(), e.Topic())] = e
}

m.addresses = make([]common.Address, 0, len(addressMap))
for a := range addressMap {
m.addresses = append(m.addresses, a)
}
m.topics = make([]common.Hash, 0, len(topicMap))
for t := range topicMap {
m.topics = append(m.topics, t)
}

return m
}

func (m *Monitor) SetStartHeight(height int64) {
m.startHeight = height
}

func (m *Monitor) Start() error {
header, err := m.client.HeaderByNumber(context.Background(), nil)
if err != nil {
return err
}
if m.startHeight < 0 {
m.startHeight = header.Number.Int64()
}

// Sync events from startHeight to the latest block
if m.startHeight < header.Number.Int64() {
slog.Info("scanning events", "from", m.startHeight, "to", header.Number.Int64())

var (
minHeight = uint64(m.startHeight)
maxHeight = header.Number.Uint64()
length = uint64(_scanInterval)
)
for from := minHeight; from <= maxHeight; from += length {
to := from + length - 1
if to > maxHeight {
to = maxHeight
}

m.scanEvents(from, to)

if to == maxHeight {
break
}
}
slog.Info("contract data synchronization completed", "current_height", maxHeight)
}

ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

go m.monitor(ctx)

return nil
}

func (m *Monitor) Stop() error {
m.cancel()
m.wg.Wait()
return nil
}

func (m *Monitor) eventID(contract common.Address, topic common.Hash) eventID {
b := make([]byte, 0, len(contract.Bytes())+len(topic.Bytes()))
b = append(b, contract.Bytes()...)
b = append(b, topic.Bytes()...)
var id eventID
copy(id[:], b)
return id
}

func (m *Monitor) monitor(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second) // TODO:
m.wg.Add(1)
defer m.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
header, err := m.client.HeaderByNumber(context.Background(), nil)
if err != nil {
slog.Error("failed to retrieve latest block header", "err", err)
continue
}
targetHeight := min(header.Number.Uint64(), uint64(m.startHeight)+_scanInterval)

if err := m.scanEvents(uint64(m.startHeight), targetHeight); err != nil {
slog.Error("failed to scan events", "err", err)
continue
}

m.startHeight = int64(targetHeight)
}
}

}

func (m *Monitor) scanEvents(from uint64, to uint64) error {
query := ethereum.FilterQuery{
Addresses: m.addresses,
Topics: [][]common.Hash{m.topics},
FromBlock: new(big.Int).SetUint64(from),
ToBlock: new(big.Int).SetUint64(to),
}
logs, err := m.client.FilterLogs(context.Background(), query)
if err != nil {
return err
}
sort.Slice(logs, func(i, j int) bool {
if logs[i].BlockNumber != logs[j].BlockNumber {
return logs[i].BlockNumber < logs[j].BlockNumber
}
return logs[i].TxIndex < logs[j].TxIndex
})

for _, log := range logs {
e := m.eventMap[m.eventID(log.Address, log.Topics[0])]
if e == nil {
continue
}
if err := e.HandleEvent(log); err != nil {
return err
}
}

return nil
}
Loading