Skip to content

Commit

Permalink
neofs-lens/storage: add a sanity storage check
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Jul 5, 2024
1 parent e6eeda1 commit 135cef2
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Changelog for NeoFS Node

### Added
- Indexes inspection command to neofs-lens (#2882)
- Add objects sanity checker to neofs-lens (#2506)

### Fixed
- Control service's Drop call does not clean metabase (#2822)
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-lens/internal/storage/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func init() {
storageGetObjCMD,
storageListObjsCMD,
storageStatusObjCMD,
storageSanityCMD,
)
}

Expand Down
224 changes: 224 additions & 0 deletions cmd/neofs-lens/internal/storage/sanity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package storage

import (
"bytes"
"context"
"errors"
"fmt"
"time"

"github.com/mr-tron/base58"
common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
commonb "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)

var storageSanityCMD = &cobra.Command{
Use: "sanity",
Short: "Check stored objects consistency",
Args: cobra.NoArgs,
Run: sanityCheck,
}

func init() {
common.AddConfigFileFlag(storageSanityCMD, &vConfig)
}

type storageShard struct {
m *meta.DB
fsT *fstree.FSTree
p *peapod.Peapod
}

func sanityCheck(cmd *cobra.Command, _ []string) {
var shards []storageShard
appCfg := config.New(config.Prm{}, config.WithConfigFile(vConfig))

err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
var sh storageShard

blobStorCfg := sc.BlobStor()
metaCfg := sc.Metabase()

sh.m = meta.New(
meta.WithPath(metaCfg.Path()),
meta.WithPermissions(metaCfg.BoltDB().Perm()),
meta.WithMaxBatchSize(metaCfg.BoltDB().MaxBatchSize()),
meta.WithMaxBatchDelay(metaCfg.BoltDB().MaxBatchDelay()),
meta.WithBoltDBOptions(&bbolt.Options{Timeout: time.Second}),
meta.WithLogger(zap.NewNop()),
meta.WithEpochState(epochState{}),
)

for _, subCfg := range blobStorCfg.Storages() {
switch subCfg.Type() {
default:
return fmt.Errorf("unsupported sub-storage type '%s'", subCfg.Type())
case peapod.Type:
peapodCfg := peapodconfig.From((*config.Config)(subCfg))
sh.p = peapod.New(subCfg.Path(), subCfg.Perm(), peapodCfg.FlushInterval())

var compressCfg compression.Config
err := compressCfg.Init()
common.ExitOnErr(cmd, common.Errf("failed to init compression config: %w", err))

sh.p.SetCompressor(&compressCfg)
case fstree.Type:
fstreeCfg := fstreeconfig.From((*config.Config)(subCfg))
sh.fsT = fstree.New(
fstree.WithPath(subCfg.Path()),
fstree.WithPerm(subCfg.Perm()),
fstree.WithDepth(fstreeCfg.Depth()),
fstree.WithNoSync(fstreeCfg.NoSync()),
)
}
}

common.ExitOnErr(cmd, common.Errf("open metabase: %w", sh.m.Open(true)))
common.ExitOnErr(cmd, common.Errf("open peapod: %w", sh.p.Open(true)))
common.ExitOnErr(cmd, common.Errf("open fstree: %w", sh.fsT.Open(true)))

// metabase.Open(true) does not set it mode to RO somehow
common.ExitOnErr(cmd, common.Errf("moving metabase in readonly mode", sh.m.SetMode(mode.ReadOnly)))

common.ExitOnErr(cmd, common.Errf("init metabase: %w", sh.m.Init()))
common.ExitOnErr(cmd, common.Errf("init peapod: %w", sh.p.Init()))
common.ExitOnErr(cmd, common.Errf("init fstree: %w", sh.fsT.Init()))

shards = append(shards, sh)

return nil
})
common.ExitOnErr(cmd, common.Errf("reading config: %w", err))

defer func() {
for _, sh := range shards {
_ = sh.m.Close()
_ = sh.p.Close()
_ = sh.fsT.Close()
}
}()

for _, sh := range shards {
idRaw, err := sh.m.ReadShardID()
if err != nil {
cmd.Printf("reading shard id: %s; skip this shard\n", err)
continue
}

id := base58.Encode(idRaw)
cmd.Printf("Checking %s shard\n", id)

err = checkShard(cmd, sh)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}

cmd.Printf("%s shard check: %s\n", id, err)
continue
}
}
}

func checkShard(cmd *cobra.Command, sh storageShard) error {
var mPrm meta.ListPrm
mPrm.SetCount(1024)

for {
listRes, err := sh.m.ListWithCursor(mPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) {
return nil
}

return fmt.Errorf("listing objects in meta: %w", err)
}

for _, obj := range listRes.AddressList() {
select {
case <-cmd.Context().Done():
return cmd.Context().Err()
default:
}

var sIDPrm meta.StorageIDPrm
sIDPrm.SetAddress(obj.Address)

sIDRes, err := sh.m.StorageID(sIDPrm)
if err != nil {
return fmt.Errorf("reading storage ID in meta: %w", err)
}

var mGet meta.GetPrm
mGet.SetAddress(obj.Address)

getRes, err := sh.m.Get(mGet)
if err != nil {
return fmt.Errorf("reading object in meta: %w", err)
}

header := *getRes.Header()

if sIDRes.StorageID() == nil {
err = checkObject(header, sh.fsT)
} else {
err = checkObject(header, sh.p)
}
if err != nil {
cmd.Printf("%s object check: %s\n", obj.Address, err)
continue
}
}

mPrm.SetCursor(listRes.Cursor())
}
}

func checkObject(obj object.Object, storage commonb.Storage) error {
// header len check

raw, err := obj.Marshal()
if err != nil {
return fmt.Errorf("object from metabase cannot be marshaled: %w", err)
}

if len(raw) > object.MaxHeaderLen {
return fmt.Errorf("header cannot be larger than %d bytes", object.MaxHeaderLen)
}

// object real presence

var getPrm commonb.GetPrm
getPrm.Address = objectcore.AddressOf(&obj)

res, err := storage.Get(getPrm)
if err != nil {
return fmt.Errorf("object get from %s storage: %w", storage.Type(), err)
}

storageRaw, err := res.Object.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("object from %s storage cannot be marshaled: %w", storage.Type(), err)
}

if !bytes.Equal(raw, storageRaw) {
return errors.New("object from metabase does not match object from storage")
}

return nil
}

0 comments on commit 135cef2

Please sign in to comment.