Skip to content

Commit ee9a8c9

Browse files
author
Tural Devrishev
committed
cli: add processing for missing blocks in audit
Close #3941. Signed-off-by: Tural Devrishev <tural@nspcc.ru>
1 parent 296b4ef commit ee9a8c9

File tree

2 files changed

+112
-169
lines changed

2 files changed

+112
-169
lines changed

cli/util/audit-bin.go

Lines changed: 105 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package util
33
import (
44
"errors"
55
"fmt"
6-
"io"
76
"strconv"
7+
"time"
88

99
"github.com/nspcc-dev/neo-go/cli/cmdargs"
1010
"github.com/nspcc-dev/neo-go/cli/options"
11-
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
11+
"github.com/nspcc-dev/neo-go/pkg/core/block"
12+
"github.com/nspcc-dev/neo-go/pkg/io"
13+
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
1214
"github.com/nspcc-dev/neo-go/pkg/wallet"
1315
"github.com/nspcc-dev/neofs-sdk-go/client"
1416
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
@@ -23,14 +25,12 @@ func auditBin(ctx *cli.Context) error {
2325
if err := cmdargs.EnsureNone(ctx); err != nil {
2426
return err
2527
}
26-
indexAttrKey := ctx.String("index-attribute")
27-
indexFileSize := ctx.Uint("index-file-size")
2828
retries := ctx.Uint("retries")
2929
cnrID := ctx.String("container")
3030
debug := ctx.Bool("debug")
3131
dryRun := ctx.Bool("dry-run")
3232
blockAttr := ctx.String("block-attribute")
33-
skip := ctx.Int("skip")
33+
curH := uint64(ctx.Uint("skip"))
3434

3535
acc, _, err := options.GetAccFromContext(ctx)
3636
if err != nil {
@@ -57,180 +57,140 @@ func auditBin(ctx *cli.Context) error {
5757
return cli.Exit(fmt.Errorf("failed to get container %s: %w", containerID, err), 1)
5858
}
5959

60-
if skip > 0 {
61-
fmt.Fprintf(ctx.App.Writer, "Skipping %d index files\n", skip)
60+
if curH != 0 {
61+
fmt.Fprintf(ctx.App.Writer, "Skipping first %d blocks\n", curH)
6262
}
63-
filters := object.NewSearchFilters()
64-
filters.AddFilter(indexAttrKey, fmt.Sprintf("%d", skip), object.MatchNumGE)
65-
results, errs := neofs.ObjectSearch(ctx.Context, neoFSPool, acc.PrivateKey(), containerID, filters, []string{indexAttrKey})
6663

67-
var (
68-
originalID uint64
69-
originalOID oid.ID
70-
)
71-
loop:
72-
for {
73-
select {
74-
case <-ctx.Done():
75-
return cli.Exit("context cancelled", 1)
76-
case err, ok := <-errs:
77-
if !ok {
78-
break loop
79-
}
80-
if err != nil {
81-
return cli.Exit(fmt.Sprintf("search index files: %v", err), 1)
82-
}
83-
case itm, ok := <-results:
84-
if !ok {
85-
break loop
86-
}
87-
duplicateID, err := strconv.ParseUint(itm.Attributes[0], 10, 32)
88-
if err != nil {
89-
return cli.Exit(fmt.Errorf("failed to parse index file ID (%s): %w", itm.ID, err), 1)
90-
}
91-
92-
if !originalOID.IsZero() && duplicateID == originalID {
93-
if dryRun {
94-
fmt.Fprintf(ctx.App.Writer, "[dry-run] index file duplicate %s / %s (%d)\n", itm.ID, originalOID, originalID)
95-
} else {
96-
_, err := neoFSPool.ObjectDelete(ctx.Context, containerID, itm.ID, signer, client.PrmObjectDelete{})
97-
if err != nil {
98-
return cli.Exit(fmt.Errorf("failed to remove index file duplicate %s / %s (%d): %w", itm.ID, originalOID, originalID, err), 1)
99-
}
100-
fmt.Fprintf(ctx.App.Writer, "Index file duplicate %s / %s (%d) is removed\n", itm.ID, originalOID, originalID)
101-
}
102-
continue
103-
}
104-
originalID = duplicateID
105-
originalOID = itm.ID
106-
fmt.Fprintf(ctx.App.Writer, "Processing index file %d (%s)\n", originalID, originalOID)
107-
108-
originalOIDs, err := getBlockIDs(ctx, neoFSPool, containerID, originalOID, indexFileSize, signer, retries, debug)
109-
if err != nil {
110-
return cli.Exit(fmt.Errorf("failed to retrieve block OIDs for index file %d (%s): %w", originalID, originalOID, err), 1)
111-
}
112-
113-
startHeight := uint32(duplicateID) * uint32(indexFileSize)
114-
endHeight := startHeight + uint32(indexFileSize)
115-
err = deleteOrphans(ctx, neoFSPool, signer, containerID, blockAttr, originalOIDs,
116-
int(startHeight), int(endHeight), int(retries), debug, dryRun)
117-
if err != nil {
118-
return cli.Exit(fmt.Errorf("failed to remove block duplicates: %w", err), 1)
119-
}
120-
}
121-
}
122-
fmt.Fprintln(ctx.App.Writer, "Audit is completed.")
123-
return nil
124-
}
125-
126-
func getBlockIDs(ctx *cli.Context, p *pool.Pool, containerID cid.ID, indexFileID oid.ID, indexFileSize uint, signer user.Signer, maxRetries uint, debug bool) ([]oid.ID, error) {
127-
var rc io.ReadCloser
128-
129-
err := retry(func() error {
130-
var e error
131-
_, rc, e = p.ObjectGetInit(ctx.Context, containerID, indexFileID, signer, client.PrmObjectGet{})
132-
return e
133-
}, maxRetries, debug)
64+
gctx, cancel := options.GetTimeoutContext(ctx)
65+
defer cancel()
66+
rpc, err := options.GetRPCClient(gctx, ctx)
13467
if err != nil {
135-
return nil, fmt.Errorf("failed to get index file %s: %w", indexFileID, err)
136-
}
137-
defer rc.Close()
138-
139-
raw, err := io.ReadAll(rc)
140-
if err != nil {
141-
return nil, err
142-
}
143-
if len(raw) != int(indexFileSize)*oid.Size {
144-
return nil, fmt.Errorf("index file %s: size mismatch: expected %d bytes, got %d", indexFileID, int(indexFileSize)*oid.Size, len(raw))
145-
}
146-
147-
out := make([]oid.ID, 0, indexFileSize)
148-
for i := range indexFileSize {
149-
out = append(out, oid.ID(raw[i*oid.Size:(i+1)*oid.Size]))
68+
return cli.Exit(fmt.Errorf("failed to create RPC client: %w", err), 1)
15069
}
151-
return out, nil
152-
}
15370

154-
// deleteOrphans removes every block object those OID differs from the one
155-
// specified in the index file for the given height. It prints a WARN if the
156-
// expected object is missing. If dryRun is enabled, it prints duplicate OIDs
157-
// instead of removing them.
158-
func deleteOrphans(ctx *cli.Context, p *pool.Pool, signer user.Signer, containerID cid.ID, blockAttr string, originalOIDs []oid.ID, start, end, maxRetries int, debug, dryRun bool) error {
15971
var (
160-
cursor string
161-
oidIndex int
72+
prevH uint64
73+
cursor string
74+
curOID oid.ID
75+
f = object.NewSearchFilters()
16276
)
163-
164-
// Search for block objects with height matching the expected one.
165-
f := object.NewSearchFilters()
166-
f.AddFilter(blockAttr, strconv.Itoa(start), object.MatchNumGE)
167-
f.AddFilter(blockAttr, strconv.Itoa(end), object.MatchNumLT)
77+
f.AddFilter(blockAttr, strconv.FormatUint(curH, 10), object.MatchNumGE)
16878

16979
for {
170-
var (
171-
err error
172-
nextCursor string
173-
page []client.SearchResultItem
174-
)
175-
80+
var page []client.SearchResultItem
17681
err = retry(func() error {
177-
page, nextCursor, err = p.SearchObjects(ctx.Context, containerID, f, []string{blockAttr}, cursor, signer, client.SearchObjectsOptions{})
82+
page, cursor, err = neoFSPool.SearchObjects(ctx.Context, containerID, f, []string{blockAttr}, cursor, signer, client.SearchObjectsOptions{})
17883
if err != nil {
17984
return fmt.Errorf("failed to search objects: %w", err)
18085
}
18186
return nil
182-
}, uint(maxRetries), debug)
87+
}, retries, debug)
18388
if err != nil {
184-
return err
89+
return cli.Exit(fmt.Errorf("search block objects: %w", err), 1)
18590
}
91+
18692
for _, itm := range page {
18793
select {
18894
case <-ctx.Done():
189-
return nil
95+
return cli.Exit("context cancelled", 1)
19096
default:
19197
}
192-
foundID := itm.ID
193-
foundIndex, err := strconv.Atoi(itm.Attributes[0])
98+
99+
h, err := strconv.ParseUint(itm.Attributes[0], 10, 64)
194100
if err != nil {
195-
return fmt.Errorf("incorrect index in result: %q", itm.Attributes[0])
196-
}
197-
if debug {
198-
fmt.Fprintf(ctx.App.Writer, "found block %d (%s), expected %d (%s)\n", foundIndex, foundID, oidIndex, originalOIDs[oidIndex])
199-
}
200-
if foundIndex == start+oidIndex && foundID == originalOIDs[oidIndex] {
201-
oidIndex++
202-
continue
101+
return cli.Exit(fmt.Errorf("failed to parse block OID (%s): %w", itm.ID, err), 1)
203102
}
204103

205-
if foundIndex > start+oidIndex {
206-
for start+oidIndex < foundIndex {
207-
fmt.Fprintf(ctx.App.Writer, "WARN: block %d (%s) is listed in the index file but missing from the storage\n", start+oidIndex, originalOIDs[oidIndex])
208-
oidIndex++
209-
}
210-
if foundID == originalOIDs[oidIndex] {
211-
continue
104+
if !curOID.IsZero() && prevH == h {
105+
if dryRun {
106+
fmt.Fprintf(ctx.App.Writer, "[dry-run] block duplicate %s / %s (%d)\n", itm.ID, curOID, prevH)
107+
} else {
108+
err = retry(func() error {
109+
_, e := neoFSPool.ObjectDelete(ctx.Context, containerID, itm.ID, signer, client.PrmObjectDelete{})
110+
return e
111+
}, retries, debug)
112+
if err != nil {
113+
return cli.Exit(fmt.Errorf("failed to remove block duplicate %s / %s (%d): %w", itm.ID, curOID, prevH, err), 1)
114+
}
115+
if debug {
116+
fmt.Fprintf(ctx.App.Writer, "block duplicate %s / %s (%d) is removed\n", itm.ID, curOID, prevH)
117+
}
212118
}
119+
continue
213120
}
214-
if dryRun {
215-
fmt.Fprintf(ctx.App.Writer, "[dry-run] block duplicate %s / %s (%d)\n", foundID, originalOIDs[oidIndex], foundIndex)
216-
} else {
217-
err := retry(func() error {
218-
_, errDelete := p.ObjectDelete(ctx.Context, containerID, foundID, signer, client.PrmObjectDelete{})
219-
return errDelete
220-
}, uint(maxRetries), debug)
221121

122+
for ; curH < h; curH++ {
123+
err = restoreMissingBlock(ctx, rpc, neoFSPool, signer, containerID, blockAttr, retries, curH, dryRun, debug)
222124
if err != nil {
223-
fmt.Fprintf(ctx.App.Writer, "WARN: failed to remove block %s / %s (%d): %s\n", foundID, originalOIDs[foundIndex-start], foundIndex, err)
224-
} else if debug {
225-
fmt.Fprintf(ctx.App.Writer, "Block duplicate %s / %s (%d) is removed\n", foundID, originalOIDs[foundIndex-start], foundIndex)
125+
return fmt.Errorf("can't restore missing block %d: %w", curH, err)
226126
}
227127
}
128+
curOID = itm.ID
129+
prevH = curH
130+
curH++
228131
}
229-
if nextCursor == "" {
132+
if cursor == "" {
230133
break
231134
}
232-
cursor = nextCursor
233135
}
234136

137+
fmt.Fprintln(ctx.App.Writer, "Audit is completed.")
235138
return nil
236139
}
140+
141+
func restoreMissingBlock(ctx *cli.Context, rpc *rpcclient.Client, p *pool.Pool, signer user.Signer, containerID cid.ID,
142+
blockAttr string, retries uint, index uint64, dryRun, debug bool) error {
143+
if dryRun {
144+
fmt.Fprintf(ctx.App.Writer, "[dry-run] block %d is missing\n", index)
145+
return nil
146+
}
147+
var (
148+
b *block.Block
149+
err error
150+
)
151+
err = retry(func() error {
152+
b, err = rpc.GetBlockByIndex(uint32(index))
153+
return err
154+
}, retries, debug)
155+
if err != nil {
156+
return fmt.Errorf("failed to fetch block %d: %w", index, err)
157+
}
158+
159+
bw := io.NewBufBinWriter()
160+
b.EncodeBinary(bw.BinWriter)
161+
if bw.Err != nil {
162+
return fmt.Errorf("failed to encode block %d: %w", index, bw.Err)
163+
}
164+
165+
_, err = createBlockAndUpload(ctx, p, signer, containerID, b, bw, blockAttr, retries, index, debug)
166+
return err
167+
}
168+
169+
func createBlockAndUpload(ctx *cli.Context, p *pool.Pool, signer user.Signer, containerID cid.ID, b *block.Block,
170+
bw *io.BufBinWriter, blockAttr string, retries uint, index uint64, debug bool) (oid.ID, error) {
171+
attrs := []object.Attribute{
172+
object.NewAttribute(blockAttr, strconv.FormatUint(uint64(b.Index), 10)),
173+
object.NewAttribute("Primary", strconv.FormatUint(uint64(b.PrimaryIndex), 10)),
174+
object.NewAttribute("Hash", b.Hash().StringLE()),
175+
object.NewAttribute("PrevHash", b.PrevHash.StringLE()),
176+
object.NewAttribute("BlockTime", strconv.FormatUint(b.Timestamp, 10)),
177+
object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
178+
}
179+
180+
var (
181+
objBytes = bw.Bytes()
182+
OID oid.ID
183+
)
184+
err := retry(func() error {
185+
var e error
186+
OID, e = uploadObj(ctx.Context, p, signer, containerID, objBytes, attrs)
187+
return e
188+
}, retries, debug)
189+
if err != nil {
190+
return oid.ID{}, fmt.Errorf("failed to upload block %d: %w", index, err)
191+
}
192+
if debug {
193+
fmt.Fprintf(ctx.App.Writer, "block %d is uploaded: %s", index, OID)
194+
}
195+
return OID, nil
196+
}

cli/util/convert.go

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ var neoFSFlags = append([]cli.Flag{
3838
}
3939
return nil
4040
},
41-
},
42-
&cli.UintFlag{
43-
Name: "searchers",
44-
Usage: "Number of concurrent searches for objects",
45-
Value: 100,
4641
}}, options.NeoFSRPC...)
4742

4843
// NewCommands returns util commands for neo-go CLI.
@@ -105,25 +100,14 @@ func NewCommands() []*cli.Command {
105100
Value: neofs.DefaultBlockAttribute,
106101
Action: cmdargs.EnsureNotEmpty("block-attribute"),
107102
},
108-
&cli.StringFlag{
109-
Name: "index-attribute",
110-
Usage: "Attribute key of the index object",
111-
Value: "Index",
112-
Action: cmdargs.EnsureNotEmpty("index-attribute"),
113-
},
114-
&cli.UintFlag{
115-
Name: "index-file-size",
116-
Usage: "Number of blocks OIDs in the index file",
117-
Value: neofs.DefaultBatchSize,
118-
},
119103
&cli.BoolFlag{
120104
Name: "dry-run",
121105
Usage: "If set, the command will not delete any objects, but will print the list of objects to be deleted",
122106
Value: false,
123107
},
124108
&cli.IntFlag{
125109
Name: "skip",
126-
Usage: "Number of index files to skip audit for",
110+
Usage: "Number of blocks to skip audit for",
127111
Value: 0,
128112
Action: func(context *cli.Context, i int) error {
129113
if i < 0 {
@@ -136,6 +120,7 @@ func NewCommands() []*cli.Command {
136120
options.ForceTimestampLogs,
137121
}, neoFSFlags...)
138122
auditBinFlags = append(auditBinFlags, options.Wallet...)
123+
auditBinFlags = append(auditBinFlags, options.RPC...)
139124
return []*cli.Command{
140125
{
141126
Name: "util",
@@ -222,15 +207,13 @@ func NewCommands() []*cli.Command {
222207
},
223208
{
224209
Name: "audit-bin",
225-
Usage: "Audit NeoFS container for duplicating block or index file objects",
226-
UsageText: "neo-go util audit-bin -fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> [--block-attribute Block] [--index-attribute Index] [--wallet <wallet>] [--wallet-config <config>] [--address <address>] [--searchers <num>] [--retries <num>] [--debug] [--dry-run]",
210+
Usage: "Audit NeoFS container for duplicate or missing block objects",
211+
UsageText: "neo-go util audit-bin -fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> [--block-attribute Block] [--wallet <wallet>] [--wallet-config <config>] [--address <address>] [--retries <num>] [--debug] [--dry-run]",
227212
Action: auditBin,
228213
Flags: auditBinFlags,
229-
Description: `Searches for the duplicating index files or blocks. Logs duplicating
230-
index files OIDs without removal if --dry-run is enabled. If --debug is enabled, then duplicating
231-
block OIDs are also printed. Preserves index files consistency: the first index file returned by
232-
SEARCH for the given height considered to be the original one, all duplicating blocks that are not
233-
mentioned in this file will be removed.
214+
Description: `Searches for the duplicating or missing blocks. Logs duplicating or missing
215+
blocks without removal or uploading them respectively if --dry-run is enabled. If --debug is enabled, then
216+
duplicating or missing block OIDs are also printed.
234217
`,
235218
},
236219
{

0 commit comments

Comments
 (0)