Skip to content

Commit d1474d1

Browse files
author
Tural Devrishev
committed
cli: change audit behaviour
Close #3941. Signed-off-by: Tural Devrishev <tural@nspcc.ru>
1 parent 296b4ef commit d1474d1

File tree

2 files changed

+113
-158
lines changed

2 files changed

+113
-158
lines changed

cli/util/audit-bin.go

Lines changed: 106 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package util
33
import (
44
"errors"
55
"fmt"
6-
"io"
76
"strconv"
7+
"time"
88

99
"github.com/nspcc-dev/neo-go/cli/cmdargs"
1010
"github.com/nspcc-dev/neo-go/cli/options"
11-
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
11+
"github.com/nspcc-dev/neo-go/pkg/core/block"
12+
"github.com/nspcc-dev/neo-go/pkg/io"
13+
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
1214
"github.com/nspcc-dev/neo-go/pkg/wallet"
1315
"github.com/nspcc-dev/neofs-sdk-go/client"
1416
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
@@ -23,8 +25,6 @@ func auditBin(ctx *cli.Context) error {
2325
if err := cmdargs.EnsureNone(ctx); err != nil {
2426
return err
2527
}
26-
indexAttrKey := ctx.String("index-attribute")
27-
indexFileSize := ctx.Uint("index-file-size")
2828
retries := ctx.Uint("retries")
2929
cnrID := ctx.String("container")
3030
debug := ctx.Bool("debug")
@@ -58,179 +58,146 @@ func auditBin(ctx *cli.Context) error {
5858
}
5959

6060
if skip > 0 {
61-
fmt.Fprintf(ctx.App.Writer, "Skipping %d index files\n", skip)
61+
fmt.Fprintf(ctx.App.Writer, "Skipping first %d blocks\n", skip)
6262
}
63-
filters := object.NewSearchFilters()
64-
filters.AddFilter(indexAttrKey, fmt.Sprintf("%d", skip), object.MatchNumGE)
65-
results, errs := neofs.ObjectSearch(ctx.Context, neoFSPool, acc.PrivateKey(), containerID, filters, []string{indexAttrKey})
6663

67-
var (
68-
originalID uint64
69-
originalOID oid.ID
70-
)
71-
loop:
72-
for {
73-
select {
74-
case <-ctx.Done():
75-
return cli.Exit("context cancelled", 1)
76-
case err, ok := <-errs:
77-
if !ok {
78-
break loop
79-
}
80-
if err != nil {
81-
return cli.Exit(fmt.Sprintf("search index files: %v", err), 1)
82-
}
83-
case itm, ok := <-results:
84-
if !ok {
85-
break loop
86-
}
87-
duplicateID, err := strconv.ParseUint(itm.Attributes[0], 10, 32)
88-
if err != nil {
89-
return cli.Exit(fmt.Errorf("failed to parse index file ID (%s): %w", itm.ID, err), 1)
90-
}
91-
92-
if !originalOID.IsZero() && duplicateID == originalID {
93-
if dryRun {
94-
fmt.Fprintf(ctx.App.Writer, "[dry-run] index file duplicate %s / %s (%d)\n", itm.ID, originalOID, originalID)
95-
} else {
96-
_, err := neoFSPool.ObjectDelete(ctx.Context, containerID, itm.ID, signer, client.PrmObjectDelete{})
97-
if err != nil {
98-
return cli.Exit(fmt.Errorf("failed to remove index file duplicate %s / %s (%d): %w", itm.ID, originalOID, originalID, err), 1)
99-
}
100-
fmt.Fprintf(ctx.App.Writer, "Index file duplicate %s / %s (%d) is removed\n", itm.ID, originalOID, originalID)
101-
}
102-
continue
103-
}
104-
originalID = duplicateID
105-
originalOID = itm.ID
106-
fmt.Fprintf(ctx.App.Writer, "Processing index file %d (%s)\n", originalID, originalOID)
107-
108-
originalOIDs, err := getBlockIDs(ctx, neoFSPool, containerID, originalOID, indexFileSize, signer, retries, debug)
109-
if err != nil {
110-
return cli.Exit(fmt.Errorf("failed to retrieve block OIDs for index file %d (%s): %w", originalID, originalOID, err), 1)
111-
}
112-
113-
startHeight := uint32(duplicateID) * uint32(indexFileSize)
114-
endHeight := startHeight + uint32(indexFileSize)
115-
err = deleteOrphans(ctx, neoFSPool, signer, containerID, blockAttr, originalOIDs,
116-
int(startHeight), int(endHeight), int(retries), debug, dryRun)
117-
if err != nil {
118-
return cli.Exit(fmt.Errorf("failed to remove block duplicates: %w", err), 1)
119-
}
120-
}
121-
}
122-
fmt.Fprintln(ctx.App.Writer, "Audit is completed.")
123-
return nil
124-
}
125-
126-
func getBlockIDs(ctx *cli.Context, p *pool.Pool, containerID cid.ID, indexFileID oid.ID, indexFileSize uint, signer user.Signer, maxRetries uint, debug bool) ([]oid.ID, error) {
127-
var rc io.ReadCloser
128-
129-
err := retry(func() error {
130-
var e error
131-
_, rc, e = p.ObjectGetInit(ctx.Context, containerID, indexFileID, signer, client.PrmObjectGet{})
132-
return e
133-
}, maxRetries, debug)
64+
gctx, cancel := options.GetTimeoutContext(ctx)
65+
defer cancel()
66+
rpc, err := options.GetRPCClient(gctx, ctx)
13467
if err != nil {
135-
return nil, fmt.Errorf("failed to get index file %s: %w", indexFileID, err)
136-
}
137-
defer rc.Close()
138-
139-
raw, err := io.ReadAll(rc)
140-
if err != nil {
141-
return nil, err
142-
}
143-
if len(raw) != int(indexFileSize)*oid.Size {
144-
return nil, fmt.Errorf("index file %s: size mismatch: expected %d bytes, got %d", indexFileID, int(indexFileSize)*oid.Size, len(raw))
68+
return cli.Exit(fmt.Errorf("failed to create RPC client: %w", err), 1)
14569
}
14670

147-
out := make([]oid.ID, 0, indexFileSize)
148-
for i := range indexFileSize {
149-
out = append(out, oid.ID(raw[i*oid.Size:(i+1)*oid.Size]))
150-
}
151-
return out, nil
152-
}
153-
154-
// deleteOrphans removes every block object those OID differs from the one
155-
// specified in the index file for the given height. It prints a WARN if the
156-
// expected object is missing. If dryRun is enabled, it prints duplicate OIDs
157-
// instead of removing them.
158-
func deleteOrphans(ctx *cli.Context, p *pool.Pool, signer user.Signer, containerID cid.ID, blockAttr string, originalOIDs []oid.ID, start, end, maxRetries int, debug, dryRun bool) error {
15971
var (
160-
cursor string
161-
oidIndex int
72+
cursor string
73+
origHeight uint64
74+
originalOID oid.ID
75+
expectedHeight = uint64(skip)
76+
f = object.NewSearchFilters()
16277
)
163-
164-
// Search for block objects with height matching the expected one.
165-
f := object.NewSearchFilters()
166-
f.AddFilter(blockAttr, strconv.Itoa(start), object.MatchNumGE)
167-
f.AddFilter(blockAttr, strconv.Itoa(end), object.MatchNumLT)
78+
f.AddFilter(blockAttr, strconv.FormatUint(expectedHeight, 10), object.MatchNumGE)
16879

16980
for {
17081
var (
171-
err error
172-
nextCursor string
17382
page []client.SearchResultItem
83+
nextCursor string
17484
)
17585

17686
err = retry(func() error {
177-
page, nextCursor, err = p.SearchObjects(ctx.Context, containerID, f, []string{blockAttr}, cursor, signer, client.SearchObjectsOptions{})
178-
if err != nil {
179-
return fmt.Errorf("failed to search objects: %w", err)
87+
var e error
88+
page, nextCursor, e = neoFSPool.SearchObjects(ctx.Context, containerID, f, []string{blockAttr}, cursor, signer, client.SearchObjectsOptions{})
89+
if e != nil {
90+
return fmt.Errorf("failed to search objects: %w", e)
18091
}
18192
return nil
182-
}, uint(maxRetries), debug)
93+
}, retries, debug)
18394
if err != nil {
184-
return err
95+
return cli.Exit(fmt.Errorf("search block objects: %w", err), 1)
18596
}
97+
18698
for _, itm := range page {
18799
select {
188100
case <-ctx.Done():
189-
return nil
101+
return cli.Exit("context cancelled", 1)
190102
default:
191103
}
192-
foundID := itm.ID
193-
foundIndex, err := strconv.Atoi(itm.Attributes[0])
104+
105+
blockHeight, err := strconv.ParseUint(itm.Attributes[0], 10, 64)
194106
if err != nil {
195-
return fmt.Errorf("incorrect index in result: %q", itm.Attributes[0])
196-
}
197-
if debug {
198-
fmt.Fprintf(ctx.App.Writer, "found block %d (%s), expected %d (%s)\n", foundIndex, foundID, oidIndex, originalOIDs[oidIndex])
199-
}
200-
if foundIndex == start+oidIndex && foundID == originalOIDs[oidIndex] {
201-
oidIndex++
202-
continue
107+
return cli.Exit(fmt.Errorf("failed to parse block ID (%s): %w", itm.ID, err), 1)
203108
}
204109

205-
if foundIndex > start+oidIndex {
206-
for start+oidIndex < foundIndex {
207-
fmt.Fprintf(ctx.App.Writer, "WARN: block %d (%s) is listed in the index file but missing from the storage\n", start+oidIndex, originalOIDs[oidIndex])
208-
oidIndex++
209-
}
210-
if foundID == originalOIDs[oidIndex] {
211-
continue
110+
if !originalOID.IsZero() && origHeight == blockHeight {
111+
if dryRun {
112+
fmt.Fprintf(ctx.App.Writer, "[dry-run] block duplicate %s / %s (%d)\n", itm.ID, originalOID, origHeight)
113+
} else {
114+
err = retry(func() error {
115+
_, e := neoFSPool.ObjectDelete(ctx.Context, containerID, itm.ID, signer, client.PrmObjectDelete{})
116+
return e
117+
}, retries, debug)
118+
if err != nil {
119+
return cli.Exit(fmt.Errorf("failed to remove block duplicate %s / %s (%d): %w", itm.ID, originalOID, origHeight, err), 1)
120+
}
121+
if debug {
122+
fmt.Fprintf(ctx.App.Writer, "block duplicate %s / %s (%d) is removed\n", itm.ID, originalOID, origHeight)
123+
}
212124
}
125+
continue
213126
}
214-
if dryRun {
215-
fmt.Fprintf(ctx.App.Writer, "[dry-run] block duplicate %s / %s (%d)\n", foundID, originalOIDs[oidIndex], foundIndex)
216-
} else {
217-
err := retry(func() error {
218-
_, errDelete := p.ObjectDelete(ctx.Context, containerID, foundID, signer, client.PrmObjectDelete{})
219-
return errDelete
220-
}, uint(maxRetries), debug)
221127

128+
for ; expectedHeight < blockHeight; expectedHeight++ {
129+
err = restoreMissingBlock(ctx, rpc, neoFSPool, signer, containerID, blockAttr, retries, expectedHeight, dryRun, debug)
222130
if err != nil {
223-
fmt.Fprintf(ctx.App.Writer, "WARN: failed to remove block %s / %s (%d): %s\n", foundID, originalOIDs[foundIndex-start], foundIndex, err)
224-
} else if debug {
225-
fmt.Fprintf(ctx.App.Writer, "Block duplicate %s / %s (%d) is removed\n", foundID, originalOIDs[foundIndex-start], foundIndex)
131+
return fmt.Errorf("can't restore missing block %d: %w", expectedHeight, err)
226132
}
227133
}
134+
135+
origHeight = blockHeight
136+
originalOID = itm.ID
137+
expectedHeight++
228138
}
139+
229140
if nextCursor == "" {
230141
break
231142
}
232143
cursor = nextCursor
233144
}
234145

146+
currentBlockHeight, err := rpc.GetBlockCount()
147+
if err != nil {
148+
return cli.Exit(fmt.Errorf("failed to get current block height from RPC: %w", err), 1)
149+
}
150+
151+
for ; expectedHeight < uint64(currentBlockHeight); expectedHeight++ {
152+
err = restoreMissingBlock(ctx, rpc, neoFSPool, signer, containerID, blockAttr, retries, expectedHeight, dryRun, debug)
153+
if err != nil {
154+
return fmt.Errorf("can't restore missing block %d: %w", expectedHeight, err)
155+
}
156+
}
157+
158+
fmt.Fprintln(ctx.App.Writer, "Audit is completed.")
159+
return nil
160+
}
161+
162+
func restoreMissingBlock(ctx *cli.Context, rpc *rpcclient.Client, p *pool.Pool, signer user.Signer, containerID cid.ID,
163+
blockAttr string, retries uint, blockHeight uint64, dryRun, debug bool) error {
164+
if dryRun {
165+
fmt.Fprintf(ctx.App.Writer, "[dry-run] block with height %d is missing\n", blockHeight)
166+
return nil
167+
}
168+
var blk *block.Block
169+
err := retry(func() error {
170+
var e error
171+
blk, e = rpc.GetBlockByIndex(uint32(blockHeight))
172+
return e
173+
}, retries, debug)
174+
if err != nil {
175+
return fmt.Errorf("failed to fetch block %d: %w", blockHeight, err)
176+
}
177+
178+
bw := io.NewBufBinWriter()
179+
blk.EncodeBinary(bw.BinWriter)
180+
181+
attrs := []object.Attribute{
182+
object.NewAttribute(blockAttr, strconv.FormatUint(uint64(blk.Index), 10)),
183+
object.NewAttribute("Primary", strconv.FormatUint(uint64(blk.PrimaryIndex), 10)),
184+
object.NewAttribute("Hash", blk.Hash().StringLE()),
185+
object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
186+
object.NewAttribute("BlockTime", strconv.FormatUint(blk.Timestamp, 10)),
187+
object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
188+
}
189+
190+
var objBytes = bw.Bytes()
191+
err = retry(func() error {
192+
var e error
193+
_, e = uploadObj(ctx.Context, p, signer, containerID, objBytes, attrs)
194+
return e
195+
}, retries, debug)
196+
if err != nil {
197+
return fmt.Errorf("failed to upload block %d: %w", blockHeight, err)
198+
}
199+
if debug {
200+
fmt.Fprintf(ctx.App.Writer, "missing block %d: added object with block height=%d\n", blockHeight, blockHeight)
201+
}
235202
return nil
236203
}

cli/util/convert.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,25 +105,14 @@ func NewCommands() []*cli.Command {
105105
Value: neofs.DefaultBlockAttribute,
106106
Action: cmdargs.EnsureNotEmpty("block-attribute"),
107107
},
108-
&cli.StringFlag{
109-
Name: "index-attribute",
110-
Usage: "Attribute key of the index object",
111-
Value: "Index",
112-
Action: cmdargs.EnsureNotEmpty("index-attribute"),
113-
},
114-
&cli.UintFlag{
115-
Name: "index-file-size",
116-
Usage: "Number of blocks OIDs in the index file",
117-
Value: neofs.DefaultBatchSize,
118-
},
119108
&cli.BoolFlag{
120109
Name: "dry-run",
121110
Usage: "If set, the command will not delete any objects, but will print the list of objects to be deleted",
122111
Value: false,
123112
},
124113
&cli.IntFlag{
125114
Name: "skip",
126-
Usage: "Number of index files to skip audit for",
115+
Usage: "Number of blocks to skip audit for",
127116
Value: 0,
128117
Action: func(context *cli.Context, i int) error {
129118
if i < 0 {
@@ -136,6 +125,7 @@ func NewCommands() []*cli.Command {
136125
options.ForceTimestampLogs,
137126
}, neoFSFlags...)
138127
auditBinFlags = append(auditBinFlags, options.Wallet...)
128+
auditBinFlags = append(auditBinFlags, options.RPC...)
139129
return []*cli.Command{
140130
{
141131
Name: "util",
@@ -222,15 +212,13 @@ func NewCommands() []*cli.Command {
222212
},
223213
{
224214
Name: "audit-bin",
225-
Usage: "Audit NeoFS container for duplicating block or index file objects",
226-
UsageText: "neo-go util audit-bin -fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> [--block-attribute Block] [--index-attribute Index] [--wallet <wallet>] [--wallet-config <config>] [--address <address>] [--searchers <num>] [--retries <num>] [--debug] [--dry-run]",
215+
Usage: "Audit NeoFS container for duplicate and missing block objects",
216+
UsageText: "neo-go util audit-bin -fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> [--block-attribute Block] [--wallet <wallet>] [--wallet-config <config>] [--address <address>] [--retries <num>] [--debug] [--dry-run]",
227217
Action: auditBin,
228218
Flags: auditBinFlags,
229-
Description: `Searches for the duplicating index files or blocks. Logs duplicating
230-
index files OIDs without removal if --dry-run is enabled. If --debug is enabled, then duplicating
231-
block OIDs are also printed. Preserves index files consistency: the first index file returned by
232-
SEARCH for the given height considered to be the original one, all duplicating blocks that are not
233-
mentioned in this file will be removed.
219+
Description: `Searches for the duplicating blocks. Logs duplicating and missing
220+
block OIDs without removal if --dry-run is enabled. If --debug is enabled, then duplicating and missing
221+
block OIDs are also printed.
234222
`,
235223
},
236224
{

0 commit comments

Comments
 (0)