Skip to content

Commit

Permalink
meta: do not require metabase resync for v2 to v3 update
Browse files Browse the repository at this point in the history
Migration can be done automatically, let admin's life be a better thing. TS
expiration is taken as the current epoch + 5. It is not critical if TS will live
more. In practice, side effects _may_ be seen only the first 5 hours after an
update, and only returned status code _may_ differ.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Sep 23, 2024
1 parent 064887c commit 3e8cc99
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 7 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ Changelog for NeoFS Node
- Go to 1.22 version (#2517, #2738)

### Updating from v0.43.0
It is required to resynchronize metabases due to changed metabase scheme; any
starts with old metabases will fail. See storage node's config documentation
for details.
Metabase version has been increased, auto migrating will be performed once
a v0.44.0 Storage Node is started with a v0.43.0 metabase. This action can
not be undone. No additional work should be done.

## [0.43.0] - 2024-08-20 - Jukdo

Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/metabase/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (db *DB) init(reset bool) error {
var err error
if !reset {
// Normal open, check version and update if not initialized.
err := checkVersion(tx, db.initialized)
err := db.checkVersion(tx)
if err != nil {
return err
}
Expand Down
46 changes: 43 additions & 3 deletions pkg/local_object_storage/metabase/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package meta

import (
"encoding/binary"
"errors"
"fmt"

objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
"go.etcd.io/bbolt"
)
Expand All @@ -18,7 +20,7 @@ var versionKey = []byte("version")
// the current code version.
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")

func checkVersion(tx *bbolt.Tx, initialized bool) error {
func (db *DB) checkVersion(tx *bbolt.Tx) error {
var knownVersion bool

b := tx.Bucket(shardInfoBucket)
Expand All @@ -29,12 +31,20 @@ func checkVersion(tx *bbolt.Tx, initialized bool) error {

stored := binary.LittleEndian.Uint64(data)
if stored != version {
return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored)
migrate, ok := migrateFrom[stored]
if !ok {
return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored)
}

err := migrate(db, tx)
if err != nil {
return fmt.Errorf("migrating from %d to %d version failed, consider database resync: %w", stored, version, err)

Check warning on line 41 in pkg/local_object_storage/metabase/version.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/version.go#L39-L41

Added lines #L39 - L41 were not covered by tests
}
}
}
}

if !initialized {
if !db.initialized {
// new database, write version
return updateVersion(tx, version)
} else if !knownVersion {
Expand Down Expand Up @@ -71,3 +81,33 @@ func getVersion(tx *bbolt.Tx) uint64 {

return 0
}

var migrateFrom = map[uint64]func(*DB, *bbolt.Tx) error{
2: migrateFrom2Version,
}

func migrateFrom2Version(db *DB, tx *bbolt.Tx) error {
tsExpiration := db.epochState.CurrentEpoch() + objectconfig.DefaultTombstoneLifetime
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil {
return errors.New("graveyard bucket is nil")

Check warning on line 93 in pkg/local_object_storage/metabase/version.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/version.go#L93

Added line #L93 was not covered by tests
}
c := bkt.Cursor()

for k, v := c.First(); k != nil; k, v = c.Next() {
if l := len(v); l != addressKeySize {
return fmt.Errorf("graveyard value with unexpected %d length", l)

Check warning on line 99 in pkg/local_object_storage/metabase/version.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/version.go#L99

Added line #L99 was not covered by tests
}

newVal := make([]byte, addressKeySize, addressKeySize+8)
copy(newVal, v)
newVal = binary.LittleEndian.AppendUint64(newVal, tsExpiration)

err := bkt.Put(k, newVal)
if err != nil {
return err

Check warning on line 108 in pkg/local_object_storage/metabase/version.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/version.go#L108

Added line #L108 was not covered by tests
}
}

return nil
}
241 changes: 241 additions & 0 deletions pkg/local_object_storage/metabase/version_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package meta

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"testing"

objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
Expand Down Expand Up @@ -85,3 +93,236 @@ func TestVersion(t *testing.T) {
})
})
}

type inhumeV2Prm struct {
tomb *oid.Address
target []oid.Address

lockObjectHandling bool

forceRemoval bool
}

func (db *DB) inhumeV2(prm inhumeV2Prm) (res InhumeRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() {
return InhumeRes{}, ErrReadOnlyMode
}

currEpoch := db.epochState.CurrentEpoch()
var inhumed uint64

err = db.boltDB.Update(func(tx *bbolt.Tx) error {
garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName)
garbageContainersBKT := tx.Bucket(garbageContainersBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)

var (
// target bucket of the operation, one of the:
// 1. Graveyard if Inhume was called with a Tombstone
// 2. Garbage if Inhume was called with a GC mark
bkt *bbolt.Bucket
// value that will be put in the bucket, one of the:
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
value []byte
)

if prm.tomb != nil {
bkt = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))

// it is forbidden to have a tomb-on-tomb in NeoFS,
// so graveyard keys must not be addresses of tombstones
data := bkt.Get(tombKey)
if data != nil {
err := bkt.Delete(tombKey)
if err != nil {
return fmt.Errorf("could not remove grave with tombstone key: %w", err)
}
}

value = tombKey
} else {
bkt = garbageObjectsBKT
value = zeroValue
}

buf := make([]byte, addressKeySize)
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()

// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return apistatus.ObjectLocked{}
}

var lockWasChecked bool

// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}

lockWasChecked = true
}

obj, err := db.get(tx, prm.target[i], buf, false, true, currEpoch)
targetKey := addressKey(prm.target[i], buf)
if err == nil {
if inGraveyardWithKey(targetKey, graveyardBKT, garbageObjectsBKT, garbageContainersBKT) == 0 {
// object is available, decrement the
// logical counter
inhumed++
}

// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == object.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
if err != nil {
return err
}
}
}

if prm.tomb != nil {
targetIsTomb := false

// iterate over graveyard and check if target address
// is the address of tombstone in graveyard.
err = bkt.ForEach(func(k, v []byte) error {
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, targetKey)

if targetIsTomb {
// break bucket iterator
return errBreakBucketForEach
}

return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return err
}

// do not add grave if target is a tombstone
if targetIsTomb {
continue
}

// if tombstone appears object must be
// additionally marked with GC
err = garbageObjectsBKT.Put(targetKey, zeroValue)
if err != nil {
return err
}
}

// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
return err
}

if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}

if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
}

return db.updateCounter(tx, logical, inhumed, false)
})

res.availableImhumed = inhumed

return
}

const testEpoch = 123

type epochState struct{}

func (s epochState) CurrentEpoch() uint64 {
return testEpoch
}

func newDB(t testing.TB, opts ...Option) *DB {
p := path.Join(t.TempDir(), "meta.db")

bdb := New(
append([]Option{
WithPath(p),
WithPermissions(0o600),
WithEpochState(epochState{}),
}, opts...)...,
)

require.NoError(t, bdb.Open(false))
require.NoError(t, bdb.Init())

t.Cleanup(func() {
bdb.Close()
os.Remove(bdb.DumpInfo().Path)
})

return bdb
}

func TestMigrate2to3(t *testing.T) {
expectedEpoch := uint64(testEpoch + objectconfig.DefaultTombstoneLifetime)
expectedEpochRaw := make([]byte, 8)
binary.LittleEndian.PutUint64(expectedEpochRaw, expectedEpoch)

db := newDB(t)

testObjs := oidtest.Addresses(1024)
tomb := oidtest.Address()
tombRaw := addressKey(tomb, make([]byte, addressKeySize))

_, err := db.inhumeV2(inhumeV2Prm{
target: testObjs,
tomb: &tomb,
})
require.NoError(t, err)

err = db.boltDB.Update(func(tx *bbolt.Tx) error {
err = updateVersion(tx, 2)
if err != nil {
return err
}

return migrateFrom2Version(db, tx)
})
require.NoError(t, err)

err = db.boltDB.View(func(tx *bbolt.Tx) error {
return tx.Bucket(graveyardBucketName).ForEach(func(k, v []byte) error {
require.Len(t, v, addressKeySize+8)
require.Equal(t, v[:addressKeySize], tombRaw)
require.Equal(t, v[addressKeySize:], expectedEpochRaw)

return nil
})
})
require.NoError(t, err)
}

0 comments on commit 3e8cc99

Please sign in to comment.