diff --git a/weed/command/backup.go b/weed/command/backup.go index f9b9fba64..d5599372e 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -115,7 +115,10 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) + + ver := needle.Version(stats.Version) + + v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true @@ -142,7 +145,7 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error destroying volume: %v\n", err) } // recreate an empty volume - v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) + v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index 6f5f2307a..59e69bc74 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -41,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool { preallocate := *compactVolumePreallocate * (1 << 20) vid := needle.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0, 0) + v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, needle.GetCurrentVersion(), 0, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index dd13e3465..52e085684 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,10 +3,11 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/version" "path/filepath" "time" + "github.com/seaweedfs/seaweedfs/weed/util/version" + "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -48,6 +49,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Replication, req.Ttl, req.Preallocate, + needle.Version(req.Version), req.MemoryMapMaxSizeMb, types.ToDiskType(req.DiskType), vs.ldbTimout, diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index cc89c4ca1..a3f0b6585 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -178,7 +178,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne } // load the volume - v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout) + v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, needle.GetCurrentVersion(), 0, ldbTimeout) if e != nil { glog.V(0).Infof("new volume %s error %s", volumeName, e) return false diff --git a/weed/storage/idx_binary_search_test.go b/weed/storage/idx_binary_search_test.go index 0f26cdd02..e04185bcd 100644 --- a/weed/storage/idx_binary_search_test.go +++ b/weed/storage/idx_binary_search_test.go @@ -14,7 +14,7 @@ import ( func TestFirstInvalidIndex(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } diff --git a/weed/storage/store.go b/weed/storage/store.go index 9bfcae7ba..69bb5bc3b 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -107,7 +107,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -116,7 +116,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType, ldbTimeout) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, ver, MemoryMapMaxSizeMb, diskType, ldbTimeout) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -159,7 +159,7 @@ func (s *Store) FindFreeLocation(filterFn func(location *DiskLocation) bool) (re } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, ver needle.Version, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } @@ -168,7 +168,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind }); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb, ldbTimeout); err == nil { + if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, ver, memoryMapMaxSizeMb, ldbTimeout); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ diff --git a/weed/storage/volume.go b/weed/storage/volume.go index e55564652..b495b379d 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -55,14 +55,14 @@ type Volume struct { lastIoError error } -func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, ver needle.Version, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind v.ldbTimeout = ldbTimeout - e = v.load(true, true, needleMapKind, preallocate) + e = v.load(true, true, needleMapKind, preallocate, ver) v.startWorker() return } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 3334159ed..ca690618b 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -16,15 +16,15 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) { +func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ver needle.Version) (v *Volume, err error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = super_block.SuperBlock{} v.needleMapKind = needleMapKind - err = v.load(false, false, needleMapKind, 0) + err = v.load(false, false, needleMapKind, 0, ver) return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) { alreadyHasSuperBlock := false hasLoadedVolume := false @@ -105,7 +105,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if !v.SuperBlock.Initialized() { return fmt.Errorf("volume %s not initialized", v.FileName(".dat")) } - err = v.maybeWriteSuperBlock() + err = v.maybeWriteSuperBlock(ver) } if err == nil && alsoLoadIndex { // adjust for existing volumes with .idx together with .dat files diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index f82e3e72d..26aa8ac8a 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -2,10 +2,11 @@ package storage import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "time" + "github.com/seaweedfs/seaweedfs/weed/util/mem" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/backend" @@ -225,7 +226,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, volumeFileScanner VolumeFileScanner) (err error) { var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind, needle.GetCurrentVersion()); err != nil { return fmt.Errorf("failed to load volume %d: %v", id, err) } if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { diff --git a/weed/storage/volume_read_test.go b/weed/storage/volume_read_test.go index 8bbd0058b..efedadc31 100644 --- a/weed/storage/volume_read_test.go +++ b/weed/storage/volume_read_test.go @@ -12,7 +12,7 @@ import ( func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -51,7 +51,7 @@ func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 1d411471f..baf490af8 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -10,7 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/super_block" ) -func (v *Volume) maybeWriteSuperBlock() error { +func (v *Volume) maybeWriteSuperBlock(ver needle.Version) error { datSize, _, e := v.DataBackend.GetStat() if e != nil { @@ -18,7 +18,7 @@ func (v *Volume) maybeWriteSuperBlock() error { return e } if datSize == 0 { - v.SuperBlock.Version = needle.GetCurrentVersion() + v.SuperBlock.Version = ver _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) if e != nil && os.IsPermission(e) { //read-only, but zero length - recreate it! diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 9f277d4f5..1d6cdf9e0 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -175,7 +175,7 @@ func (v *Volume) CommitCompact() error { os.RemoveAll(v.FileName(".ldb")) glog.V(3).Infof("Loading volume %d commit file...", v.Id) - if e = v.load(true, false, v.needleMapKind, 0); e != nil { + if e = v.load(true, false, v.needleMapKind, 0, v.Version()); e != nil { return e } glog.V(3).Infof("Finish committing volume %d", v.Id) diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 079b7ccf5..797452bb3 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -72,7 +72,7 @@ func TestLDBIndexCompaction(t *testing.T) { func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -115,7 +115,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { v.Close() - v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0, 0) + v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume reloading: %v", err) } diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go index 63815aecd..ce7c184bd 100644 --- a/weed/storage/volume_write_test.go +++ b/weed/storage/volume_write_test.go @@ -3,11 +3,12 @@ package storage import ( "errors" "fmt" - "github.com/stretchr/testify/assert" "os" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" @@ -16,7 +17,7 @@ import ( func TestSearchVolumesWithDeletedNeedles(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -78,7 +79,7 @@ func assertFileExist(t *testing.T, expected bool, path string) { func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -96,7 +97,7 @@ func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) { func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -114,7 +115,7 @@ func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) { func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -144,7 +145,7 @@ func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) { func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) }