Skip to content

Commit f73e776

Browse files
authored
Merge pull request #13 from CortexFoundation/dev
task channel for batch
2 parents d848e1c + 074161b commit f73e776

File tree

4 files changed

+53
-79
lines changed

4 files changed

+53
-79
lines changed

cmd/main.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,10 @@ func main() {
3535
//m.SwitchService(robot.SRV_RECORD)
3636
go func() {
3737
for {
38-
mm.SwitchService(robot.SRV_RECORD)
38+
//mm.SwitchService(robot.SRV_RECORD)
39+
//time.Sleep(30 * time.Second)
40+
//mm.SwitchService(robot.SRV_MODEL)
3941
time.Sleep(30 * time.Second)
40-
mm.SwitchService(robot.SRV_MODEL)
41-
time.Sleep(30 * time.Second)
42-
//m.SwitchService(robot.SRV_RECORD)
4342
}
4443
}()
4544

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/CortexFoundation/merkletree v0.0.0-20230724124840-b6e80265a137
88
github.com/CortexFoundation/torrentfs v1.0.52-0.20230711131012-594cb55f6538
99
github.com/google/uuid v1.3.0
10-
github.com/hashicorp/golang-lru v0.5.5-0.20221011183528-d4900dc688bf
10+
github.com/hashicorp/golang-lru/v2 v2.0.5-0.20230705200825-f4cd393e7b29
1111
github.com/ucwong/golang-kv v1.0.22-0.20230711090627-455356f3db9c
1212
go.etcd.io/bbolt v1.3.7
1313
)
@@ -37,7 +37,6 @@ require (
3737
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
3838
github.com/google/flatbuffers v23.5.26+incompatible // indirect
3939
github.com/gorilla/websocket v1.5.0 // indirect
40-
github.com/hashicorp/golang-lru/v2 v2.0.4 // indirect
4140
github.com/holiman/uint256 v1.2.3 // indirect
4241
github.com/klauspost/compress v1.16.7 // indirect
4342
github.com/kr/pretty v0.3.1 // indirect

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
109109
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
110110
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
111111
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
112-
github.com/hashicorp/golang-lru v0.5.5-0.20221011183528-d4900dc688bf h1:BQyif+/dqmbIGXyGhe5bDx/3grIchislVu5pK7j/bMQ=
113-
github.com/hashicorp/golang-lru v0.5.5-0.20221011183528-d4900dc688bf/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
114-
github.com/hashicorp/golang-lru/v2 v2.0.4 h1:7GHuZcgid37q8o5i3QI9KMT4nCWQQ3Kx3Ov6bb9MfK0=
115-
github.com/hashicorp/golang-lru/v2 v2.0.4/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
112+
github.com/hashicorp/golang-lru/v2 v2.0.5-0.20230705200825-f4cd393e7b29 h1:ms53+xNey/RHASahOxI1AdzSm5snVNH8xQGTDc1xV2I=
113+
github.com/hashicorp/golang-lru/v2 v2.0.5-0.20230705200825-f4cd393e7b29/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
116114
github.com/holiman/uint256 v1.2.3 h1:K8UWO1HUJpRMXBxbmaY1Y8IAMZC/RsKB+ArEnnK4l5o=
117115
github.com/holiman/uint256 v1.2.3/go.mod h1:SC8Ryt4n+UBbPbIBKaG9zbbDlp4jOru9xFZmPzLUTxw=
118116
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=

monitor.go

Lines changed: 47 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"github.com/CortexFoundation/robot/backend"
3131
"github.com/CortexFoundation/torrentfs/params"
3232
"github.com/CortexFoundation/torrentfs/types"
33-
lru "github.com/hashicorp/golang-lru"
33+
lru "github.com/hashicorp/golang-lru/v2"
3434
"github.com/ucwong/golang-kv"
3535
"math"
3636
"math/big"
@@ -74,10 +74,11 @@ type Monitor struct {
7474
wg sync.WaitGroup
7575
rpcWg sync.WaitGroup
7676

77-
//taskCh chan *types.Block
77+
taskCh chan *types.Block
78+
errCh chan error
7879
//newTaskHook func(*types.Block)
79-
blockCache *lru.Cache
80-
sizeCache *lru.Cache
80+
blockCache *lru.Cache[uint64, string]
81+
sizeCache *lru.Cache[string, uint64]
8182
ckp *params.TrustedCheckpoint
8283
start mclock.AbsTime
8384

@@ -124,10 +125,12 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
124125
srvCh: make(chan int),
125126
//exitSyncCh: make(chan any),
126127
scope: uint64(math.Min(float64(runtime.NumCPU()), float64(8))),
127-
//taskCh: make(chan *types.Block, batch),
128+
//taskCh: make(chan *types.Block, batch),
129+
//taskCh: make(chan *types.Block, 1),
128130
//start: mclock.Now(),
129131
}
130-
132+
m.errCh = make(chan error, m.scope)
133+
m.taskCh = make(chan *types.Block, m.scope)
131134
// TODO https://github.com/ucwong/golang-kv
132135
if fs_, err := backend.NewChainDB(flag); err != nil {
133136
log.Error("file storage failed", "err", err)
@@ -140,8 +143,8 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
140143
m.startNumber.Store(0)
141144

142145
m.terminated.Store(false)
143-
m.blockCache, _ = lru.New(delay)
144-
m.sizeCache, _ = lru.New(batch)
146+
m.blockCache, _ = lru.New[uint64, string](delay)
147+
m.sizeCache, _ = lru.New[string, uint64](batch)
145148
m.listen = listen
146149
m.callback = callback
147150

@@ -173,10 +176,6 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
173176
return m, nil
174177
}
175178

176-
//func (m *Monitor) DB() *backend.ChainDB {
177-
// return m.fs
178-
//}
179-
180179
func (m *Monitor) CurrentNumber() uint64 {
181180
return m.currentNumber.Load()
182181
}
@@ -288,27 +287,29 @@ func (m *Monitor) indexInit() error {
288287
return nil
289288
}
290289

291-
/*func (m *Monitor) taskLoop() {
290+
func (m *Monitor) taskLoop() {
291+
log.Info("Task channel started")
292292
defer m.wg.Done()
293293
for {
294294
select {
295295
case task := <-m.taskCh:
296-
if m.newTaskHook != nil {
297-
m.newTaskHook(task)
298-
}
296+
//if m.newTaskHook != nil {
297+
// m.newTaskHook(task)
298+
//}
299299

300-
if err := m.solve(task); err != nil {
301-
log.Warn("Block solved failed, try again", "err", err, "num", task.Number)
302-
}
300+
/*if err := m.solve(task); err != nil {
301+
m.errCh <- err
302+
log.Warn("Block solved failed, try again", "err", err, "num", task.Number, "last", m.lastNumber.Load())
303+
} else {
304+
m.errCh <- nil
305+
}*/
306+
m.errCh <- m.solve(task)
303307
case <-m.exitCh:
304-
if cap(m.taskCh) > 0 {
305-
continue
306-
}
307308
log.Info("Monitor task channel closed")
308309
return
309310
}
310311
}
311-
}*/
312+
}
312313

313314
// SetConnection method builds connection to remote or local communicator.
314315
func (m *Monitor) buildConnection(ipcpath string, rpcuri string) (*rpc.Client, error) {
@@ -379,8 +380,8 @@ func (m *Monitor) rpcBatchBlockByNumber(from, to uint64) (result []*types.Block,
379380
}
380381

381382
func (m *Monitor) getRemainingSize(address string) (uint64, error) {
382-
if size, suc := m.sizeCache.Get(address); suc && size.(uint64) == 0 {
383-
return size.(uint64), nil
383+
if size, suc := m.sizeCache.Get(address); suc && size == 0 {
384+
return size, nil
384385
}
385386
var remainingSize hexutil.Uint64
386387
rpcUploadMeter.Mark(1)
@@ -520,13 +521,6 @@ func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) {
520521

521522
func (m *Monitor) exit() {
522523
m.closeOnce.Do(func() {
523-
/*if m.exitSyncCh != nil {
524-
close(m.exitSyncCh)
525-
m.exitSyncCh = nil
526-
} else {
527-
log.Warn("Listener sync has already been stopped")
528-
}*/
529-
530524
if m.exitCh != nil {
531525
close(m.exitCh)
532526
m.wg.Wait()
@@ -540,7 +534,6 @@ func (m *Monitor) exit() {
540534
func (m *Monitor) Stop() error {
541535
m.lock.Lock()
542536
defer m.lock.Unlock()
543-
//m.closeOnce.Do(func() {
544537
if m.terminated.Swap(true) {
545538
return nil
546539
}
@@ -567,7 +560,6 @@ func (m *Monitor) Stop() error {
567560
}
568561
log.Info("Fs listener synchronizing closed")
569562
return nil
570-
//})
571563
}
572564

573565
// Start ... start ListenOn on the rpc port of a blockchain full node
@@ -622,8 +614,8 @@ func (m *Monitor) run() error {
622614
//if err := m.loadHistory(); err != nil {
623615
// return err
624616
//}
625-
//m.wg.Add(1)
626-
//go m.taskLoop()
617+
m.wg.Add(1)
618+
go m.taskLoop()
627619
//m.wg.Add(1)
628620
//go m.listenLatestBlock()
629621
m.wg.Add(1)
@@ -685,7 +677,6 @@ func (m *Monitor) syncLatestBlock() {
685677
continue
686678
}
687679
m.fs.Flush()
688-
//go m.exit()
689680
elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)
690681
log.Debug("Finish sync, listener will be paused", "current", m.currentNumber.Load(), "elapsed", common.PrettyDuration(elapsed), "progress", progress, "end", end, "last", m.lastNumber.Load())
691682
//return
@@ -789,8 +780,8 @@ func (m *Monitor) syncLastBlock() uint64 {
789780
maxNumber = i - 1
790781
break
791782
}
792-
if maxNumber > minNumber && (i-minNumber)%128 == 0 {
793-
log.Debug("Running", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch, "i", i, "srv", m.srv.Load(), "size", maxNumber-minNumber, "progress", float64(i-minNumber)/float64(maxNumber-minNumber))
783+
if maxNumber > minNumber && i%2048 == 0 {
784+
log.Info("Running", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch, "i", i, "srv", m.srv.Load(), "size", maxNumber-minNumber, "progress", float64(i)/float64(currentNumber))
794785
}
795786
if m.ckp != nil && m.skip(i) {
796787
//m.lastNumber = i - 1
@@ -808,25 +799,24 @@ func (m *Monitor) syncLastBlock() uint64 {
808799

809800
// batch blocks operation according service category
810801
for _, rpcBlock := range blocks {
811-
if err := m.solve(rpcBlock); err != nil {
812-
log.Error("solve err", "err", err)
802+
m.taskCh <- rpcBlock
803+
}
804+
805+
for n := 0; n < len(blocks); n++ {
806+
select {
807+
case err := <-m.errCh:
808+
if err != nil {
809+
m.lastNumber.Store(i - 1)
810+
log.Error("solve err", "err", err, "last", m.lastNumber.Load(), "i", i, "scope", m.scope, "min", minNumber, "max", maxNumber, "cur", currentNumber)
811+
return 0
812+
}
813+
case <-m.exitCh:
813814
m.lastNumber.Store(i - 1)
815+
log.Info("Task checker quit")
814816
return 0
815817
}
816-
i++
817-
/*if len(m.taskCh) < cap(m.taskCh) {
818-
m.taskCh <- rpcBlock
819-
i++
820-
} else {
821-
m.lastNumber = i - 1
822-
if maxNumber-minNumber > delay/2 {
823-
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
824-
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
825-
log.Warn("Chain segment frozen", "from", minNumber, "to", i, "range", uint64(i-minNumber), "current", uint64(m.currentNumber), "progress", float64(i)/float64(m.currentNumber), "last", m.lastNumber, "elapsed", common.PrettyDuration(elapsed), "bps", float64(i-minNumber)*1000*1000*1000/float64(elapsed), "bps_a", float64(maxNumber)*1000*1000*1000/float64(elapsedA), "cap", len(m.taskCh))
826-
}
827-
return 0
828-
}*/
829818
}
819+
i += uint64(len(blocks))
830820
} else {
831821
rpcBlock, rpcErr := m.rpcBlockByNumber(i)
832822
if rpcErr != nil {
@@ -840,24 +830,10 @@ func (m *Monitor) syncLastBlock() uint64 {
840830
return 0
841831
}
842832
i++
843-
/*if len(m.taskCh) < cap(m.taskCh) {
844-
m.taskCh <- rpcBlock
845-
i++
846-
} else {
847-
m.lastNumber = i - 1
848-
if maxNumber-minNumber > delay/2 {
849-
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
850-
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
851-
log.Warn("Chain segment frozen", "from", minNumber, "to", i, "range", uint64(i-minNumber), "current", uint64(m.currentNumber), "progress", float64(i)/float64(m.currentNumber), "last", m.lastNumber, "elapsed", common.PrettyDuration(elapsed), "bps", float64(i-minNumber)*1000*1000*1000/float64(elapsed), "bps_a", float64(maxNumber)*1000*1000*1000/float64(elapsedA), "cap", len(m.taskCh))
852-
}
853-
return 0
854-
}*/
855833
}
856834
}
857835
log.Debug("Last number changed", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch)
858836
m.lastNumber.Store(maxNumber)
859-
//m.storeLastNumber(maxNumber)
860-
//if maxNumber-minNumber > batch-1 {
861837
if maxNumber-minNumber > delay {
862838
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
863839
log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber), "current", uint64(m.CurrentNumber()), "progress", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber.Load(), "bps", float64(maxNumber)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA))
@@ -951,7 +927,9 @@ func (m *Monitor) forExchangeService(block *types.Block) error {
951927
}
952928

953929
func (m *Monitor) forRecordService(block *types.Block) error {
954-
log.Debug("Block record", "num", block.Number, "hash", block.Hash, "txs", len(block.Txs), "last", m.lastNumber.Load())
930+
if block.Number%4096 == 0 {
931+
log.Info("Block record", "num", block.Number, "hash", block.Hash, "txs", len(block.Txs), "last", m.lastNumber.Load())
932+
}
955933
if len(block.Txs) > 0 {
956934
for _, t := range block.Txs {
957935
x := new(big.Float).Quo(new(big.Float).SetInt(t.Amount), new(big.Float).SetInt(big.NewInt(params1.Cortex)))

0 commit comments

Comments
 (0)