@@ -83,6 +83,8 @@ type Monitor struct {
8383 lock sync.RWMutex
8484
8585 callback chan any
86+
87+ srv atomic.Int32
8688}
8789
8890// NewMonitor creates a new instance of monitor.
@@ -116,6 +118,7 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
116118 //start: mclock.Now(),
117119 }
118120
121+ // TODO https://github.com/ucwong/golang-kv
119122 if fs_ , err := backend .NewChainDB (flag ); err != nil {
120123 log .Error ("file storage failed" , "err" , err )
121124 return nil , err
@@ -137,6 +140,8 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
137140
138141 m .mode = flag .Mode
139142
143+ m .srv .Store (SRV_MODEL )
144+
140145 /*torrents, _ := fs.initTorrents()
141146 if m.mode != params.LAZY {
142147 for k, v := range torrents {
@@ -222,7 +227,7 @@ func (m *Monitor) indexCheck() error {
222227 // return err
223228 //}
224229 }
225- log .Warn ("Fs storage is reloading ..." , "name" , m .ckp .Name , "number" , checkpoint .TfsCheckPoint , "version" , common .BytesToHash (version ), "checkpoint" , checkpoint .TfsRoot , "blocks" , len (m .fs .Blocks ()), "files" , len (m .fs .Files ()), "txs" , m .fs .Txs (), "lastNumber" , m .lastNumber .Load (), "last in db " , m .fs .LastListenBlockNumber ())
230+ log .Warn ("Fs storage is reloading ..." , "name" , m .ckp .Name , "number" , checkpoint .TfsCheckPoint , "version" , common .BytesToHash (version ), "checkpoint" , checkpoint .TfsRoot , "blocks" , len (m .fs .Blocks ()), "files" , len (m .fs .Files ()), "txs" , m .fs .Txs (), "lastNumber" , m .lastNumber .Load (), "last" , m .fs .LastListenBlockNumber ())
226231 } else {
227232 log .Info ("Fs storage version check passed" , "name" , m .ckp .Name , "number" , checkpoint .TfsCheckPoint , "version" , common .BytesToHash (version ), "blocks" , len (m .fs .Blocks ()), "files" , len (m .fs .Files ()), "txs" , m .fs .Txs ())
228233 }
@@ -646,7 +651,7 @@ func (m *Monitor) syncLatestBlock() {
646651 elapsed := time .Duration (mclock .Now ()) - time .Duration (m .start )
647652 log .Info ("Finish sync, listener will be paused" , "current" , m .currentNumber .Load (), "elapsed" , common .PrettyDuration (elapsed ), "progress" , progress , "end" , end , "last" , m .lastNumber .Load ())
648653 //return
649- timer .Reset (time .Millisecond * 1000 * 180 )
654+ timer .Reset (time .Millisecond * 1000 * 60 )
650655 end = false
651656 continue
652657 }
@@ -682,6 +687,10 @@ func (m *Monitor) currentBlock() (uint64, error) {
682687}
683688
684689func (m * Monitor ) skip (i uint64 ) bool {
690+ if m .srv .Load () != SRV_MODEL {
691+ return false
692+ }
693+
685694 if len (m .ckp .Skips ) == 0 || i > m .ckp .Skips [len (m .ckp .Skips )- 1 ].To || i < m .ckp .Skips [0 ].From {
686695 return false
687696 }
@@ -753,6 +762,8 @@ func (m *Monitor) syncLastBlock() uint64 {
753762 m .lastNumber .Store (i - 1 )
754763 return 0
755764 }
765+
766+ // batch blocks operation according service category
756767 for _ , rpcBlock := range blocks {
757768 if err := m .solve (rpcBlock ); err != nil {
758769 m .lastNumber .Store (i - 1 )
@@ -808,7 +819,48 @@ func (m *Monitor) syncLastBlock() uint64 {
808819 return uint64 (maxNumber - minNumber )
809820}
810821
822+ // solve block from node
811823func (m * Monitor ) solve (block * types.Block ) error {
824+ switch m .srv .Load () {
825+ case SRV_MODEL :
826+ return m .forModelService (block )
827+ //case 1:
828+ // return m.forExplorerService(block) // others service, explorer, exchange, zkp, nft, etc.
829+ //case 2:
830+ // return m.forExchangeService(block)
831+ case SRV_PRINT :
832+ return m .forPrintService (block )
833+ default :
834+ return errors .New ("no block operation service found" )
835+ }
836+ }
837+
838+ func (m * Monitor ) SwitchService (srv int ) error {
839+ m .srv .Store (int32 (srv ))
840+ return nil
841+ }
842+
843+ // only for examples
844+ func (m * Monitor ) forExplorerService (block * types.Block ) error {
845+ return errors .New ("not support" )
846+ }
847+
848+ func (m * Monitor ) forExchangeService (block * types.Block ) error {
849+ return errors .New ("not support" )
850+ }
851+
852+ func (m * Monitor ) forPrintService (block * types.Block ) error {
853+ log .Info ("Block print" , "num" , block .Number , "hash" , block .Hash , "txs" , len (block .Txs ))
854+ if len (block .Txs ) > 0 {
855+ for _ , t := range block .Txs {
856+ log .Info ("Tx print" , "hash" , t .Hash , "amount" , t .Amount , "gas" , t .GasLimit , "receipt" , t .Recipient , "payload" , t .Payload )
857+ }
858+ }
859+ m .fs .Anchor (block .Number )
860+ return nil
861+ }
862+
863+ func (m * Monitor ) forModelService (block * types.Block ) error {
812864 i := block .Number
813865 if i % 65536 == 0 {
814866 defer func () {
@@ -836,7 +888,6 @@ func (m *Monitor) solve(block *types.Block) error {
836888 log .Debug ("Seal fs record" , "number" , i , "record" , record , "root" , m .fs .Root ().Hex (), "blocks" , len (m .fs .Blocks ()), "txs" , m .fs .Txs (), "files" , len (m .fs .Files ()), "ckp" , m .fs .CheckPoint ())
837889 } else {
838890 if m .fs .LastListenBlockNumber () < i {
839- //m.fs.LastListenBlockNumber = i
840891 m .fs .Anchor (i )
841892 }
842893
0 commit comments