Browse Source

pass volume version when creating a volume

pull/6897/head
chrislu 4 months ago
parent
commit
2f1b3d68d7
  1. 7
      weed/command/backup.go
  2. 2
      weed/command/compact.go
  3. 4
      weed/server/volume_grpc_admin.go
  4. 2
      weed/storage/disk_location.go
  5. 2
      weed/storage/idx_binary_search_test.go
  6. 8
      weed/storage/store.go
  7. 4
      weed/storage/volume.go
  8. 8
      weed/storage/volume_loading.go
  9. 5
      weed/storage/volume_read.go
  10. 4
      weed/storage/volume_read_test.go
  11. 4
      weed/storage/volume_super_block.go
  12. 2
      weed/storage/volume_vacuum.go
  13. 4
      weed/storage/volume_vacuum_test.go
  14. 13
      weed/storage/volume_write_test.go

7
weed/command/backup.go

@ -115,7 +115,10 @@ func runBackup(cmd *Command, args []string) bool {
return true return true
} }
} }
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
ver := needle.Version(stats.Version)
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil { if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true return true
@ -142,7 +145,7 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error destroying volume: %v\n", err) fmt.Printf("Error destroying volume: %v\n", err)
} }
// recreate an empty volume // recreate an empty volume
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil { if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true return true

2
weed/command/compact.go

@ -41,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20) preallocate := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId) vid := needle.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0, 0)
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err) glog.Fatalf("Load Volume [ERROR] %s\n", err)
} }

4
weed/server/volume_grpc_admin.go

@ -3,10 +3,11 @@ package weed_server
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"path/filepath" "path/filepath"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/cluster"
@ -48,6 +49,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Replication, req.Replication,
req.Ttl, req.Ttl,
req.Preallocate, req.Preallocate,
needle.Version(req.Version),
req.MemoryMapMaxSizeMb, req.MemoryMapMaxSizeMb,
types.ToDiskType(req.DiskType), types.ToDiskType(req.DiskType),
vs.ldbTimout, vs.ldbTimout,

2
weed/storage/disk_location.go

@ -178,7 +178,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
} }
// load the volume // load the volume
v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, needle.GetCurrentVersion(), 0, ldbTimeout)
if e != nil { if e != nil {
glog.V(0).Infof("new volume %s error %s", volumeName, e) glog.V(0).Infof("new volume %s error %s", volumeName, e)
return false return false

2
weed/storage/idx_binary_search_test.go

@ -14,7 +14,7 @@ import (
func TestFirstInvalidIndex(t *testing.T) { func TestFirstInvalidIndex(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }

8
weed/storage/store.go

@ -107,7 +107,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
return return
} }
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil { if e != nil {
return e return e
@ -116,7 +116,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil { if e != nil {
return e return e
} }
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType, ldbTimeout)
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, ver, MemoryMapMaxSizeMb, diskType, ldbTimeout)
return e return e
} }
func (s *Store) DeleteCollection(collection string) (e error) { func (s *Store) DeleteCollection(collection string) (e error) {
@ -159,7 +159,7 @@ func (s *Store) FindFreeLocation(filterFn func(location *DiskLocation) bool) (re
} }
return ret return ret
} }
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, ver needle.Version, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
if s.findVolume(vid) != nil { if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid) return fmt.Errorf("Volume Id %d already exists!", vid)
} }
@ -168,7 +168,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
}); location != nil { }); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl) location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb, ldbTimeout); err == nil {
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, ver, memoryMapMaxSizeMb, ldbTimeout); err == nil {
location.SetVolume(vid, volume) location.SetVolume(vid, volume)
glog.V(0).Infof("add volume %d", vid) glog.V(0).Infof("add volume %d", vid)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{

4
weed/storage/volume.go

@ -55,14 +55,14 @@ type Volume struct {
lastIoError error lastIoError error
} }
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) {
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, ver needle.Version, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk // if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind v.needleMapKind = needleMapKind
v.ldbTimeout = ldbTimeout v.ldbTimeout = ldbTimeout
e = v.load(true, true, needleMapKind, preallocate)
e = v.load(true, true, needleMapKind, preallocate, ver)
v.startWorker() v.startWorker()
return return
} }

8
weed/storage/volume_loading.go

@ -16,15 +16,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) {
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ver needle.Version) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = super_block.SuperBlock{} v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind v.needleMapKind = needleMapKind
err = v.load(false, false, needleMapKind, 0)
err = v.load(false, false, needleMapKind, 0, ver)
return return
} }
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) {
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) {
alreadyHasSuperBlock := false alreadyHasSuperBlock := false
hasLoadedVolume := false hasLoadedVolume := false
@ -105,7 +105,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if !v.SuperBlock.Initialized() { if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat")) return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
} }
err = v.maybeWriteSuperBlock()
err = v.maybeWriteSuperBlock(ver)
} }
if err == nil && alsoLoadIndex { if err == nil && alsoLoadIndex {
// adjust for existing volumes with .idx together with .dat files // adjust for existing volumes with .idx together with .dat files

5
weed/storage/volume_read.go

@ -2,10 +2,11 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io" "io"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
@ -225,7 +226,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
needleMapKind NeedleMapKind, needleMapKind NeedleMapKind,
volumeFileScanner VolumeFileScanner) (err error) { volumeFileScanner VolumeFileScanner) (err error) {
var v *Volume var v *Volume
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind, needle.GetCurrentVersion()); err != nil {
return fmt.Errorf("failed to load volume %d: %v", id, err) return fmt.Errorf("failed to load volume %d: %v", id, err)
} }
if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {

4
weed/storage/volume_read_test.go

@ -12,7 +12,7 @@ import (
func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -51,7 +51,7 @@ func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) { func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }

4
weed/storage/volume_super_block.go

@ -10,7 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
) )
func (v *Volume) maybeWriteSuperBlock() error {
func (v *Volume) maybeWriteSuperBlock(ver needle.Version) error {
datSize, _, e := v.DataBackend.GetStat() datSize, _, e := v.DataBackend.GetStat()
if e != nil { if e != nil {
@ -18,7 +18,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
return e return e
} }
if datSize == 0 { if datSize == 0 {
v.SuperBlock.Version = needle.GetCurrentVersion()
v.SuperBlock.Version = ver
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) { if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it! //read-only, but zero length - recreate it!

2
weed/storage/volume_vacuum.go

@ -175,7 +175,7 @@ func (v *Volume) CommitCompact() error {
os.RemoveAll(v.FileName(".ldb")) os.RemoveAll(v.FileName(".ldb"))
glog.V(3).Infof("Loading volume %d commit file...", v.Id) glog.V(3).Infof("Loading volume %d commit file...", v.Id)
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
if e = v.load(true, false, v.needleMapKind, 0, v.Version()); e != nil {
return e return e
} }
glog.V(3).Infof("Finish committing volume %d", v.Id) glog.V(3).Infof("Finish committing volume %d", v.Id)

4
weed/storage/volume_vacuum_test.go

@ -72,7 +72,7 @@ func TestLDBIndexCompaction(t *testing.T) {
func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -115,7 +115,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
v.Close() v.Close()
v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0, 0)
v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume reloading: %v", err) t.Fatalf("volume reloading: %v", err)
} }

13
weed/storage/volume_write_test.go

@ -3,11 +3,12 @@ package storage
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/stretchr/testify/assert"
"os" "os"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
@ -16,7 +17,7 @@ import (
func TestSearchVolumesWithDeletedNeedles(t *testing.T) { func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -78,7 +79,7 @@ func assertFileExist(t *testing.T, expected bool, path string) {
func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) { func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -96,7 +97,7 @@ func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) {
func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) { func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -114,7 +115,7 @@ func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) {
func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) { func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -144,7 +145,7 @@ func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) {
func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) { func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }

Loading…
Cancel
Save