From 6892842021b660b196d64b32bd296f8186375705 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 13 Sep 2012 01:33:47 -0700 Subject: [PATCH] verify adding columes should work well --- weed-fs/src/pkg/admin/storage_test.go | 2 +- weed-fs/src/pkg/storage/store.go | 23 ++++++++++- weed-fs/src/pkg/storage/volume.go | 22 ++++++++--- weed-fs/src/pkg/storage/volume_info.go | 53 +++++++++++++------------- 4 files changed, 66 insertions(+), 34 deletions(-) diff --git a/weed-fs/src/pkg/admin/storage_test.go b/weed-fs/src/pkg/admin/storage_test.go index f48de202f..ecc2ab22e 100644 --- a/weed-fs/src/pkg/admin/storage_test.go +++ b/weed-fs/src/pkg/admin/storage_test.go @@ -11,7 +11,7 @@ func TestXYZ(t *testing.T) { dn := topology.NewDataNode("server1") dn.Ip = "localhost" dn.Port = 8080 - vid, _:= storage.NewVolumeId("5") + vid, _:= storage.NewVolumeId("6") out := AllocateVolume(dn,vid,storage.Copy00) log.Println(out) } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index b2691e180..6bc1ec028 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -3,6 +3,7 @@ package storage import ( "encoding/json" "errors" + "io/ioutil" "log" "net/url" "pkg/util" @@ -22,6 +23,7 @@ func NewStore(port int, publicUrl, dirname string, volumeListString string) (s * s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} s.volumes = make(map[VolumeId]*Volume) + s.loadExistingVolumes() s.AddVolume(volumeListString, "00") log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString) @@ -48,7 +50,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) (e er } for id := start; id <= end; id++ { if err := s.addVolume(VolumeId(id), NewReplicationType(replicationType)); err != nil { - e = err + e = err } } } @@ -59,10 +61,27 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } - log.Println("In dir", s.dir, "adds volume = ", vid, ", replicationType =", replicationType) + log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) s.volumes[vid] = NewVolume(s.dir, vid, replicationType) return nil } +func (s *Store) loadExistingVolumes() { + if dirs, err := ioutil.ReadDir(s.dir); err == nil { + for _, dir := range dirs { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + base := name[:len(name)-len(".dat")] + if vid, err := NewVolumeId(base); err == nil { + if s.volumes[vid] == nil { + v := NewVolume(s.dir, vid, CopyNil) + s.volumes[vid] = v + log.Println("In dir", s.dir, "reads volume = ", vid, ", replicationType =", v.replicaType) + } + } + } + } + } +} func (s *Store) Status() *[]*VolumeInfo { stats := new([]*VolumeInfo) for k, v := range s.volumes { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index de39043b3..fd68d2bc3 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -18,23 +18,27 @@ type Volume struct { dataFile *os.File nm *NeedleMap - replicaType ReplicationType + replicaType ReplicationType accessLock sync.Mutex - + //transient locations []string } func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { var e error - v = &Volume{dir: dirname, Id: id, replicaType:replicationType} + v = &Volume{dir: dirname, Id: id, replicaType: replicationType} fileName := id.String() v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) } - v.maybeWriteSuperBlock() + if replicationType == CopyNil { + v.readSuperBlock() + } else { + v.maybeWriteSuperBlock() + } indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) if ie != nil { log.Fatalf("Write Volume Index [ERROR] %s\n", ie) @@ -58,10 +62,18 @@ func (v *Volume) maybeWriteSuperBlock() { stat, _ := v.dataFile.Stat() if stat.Size() == 0 { header := make([]byte, SuperBlockSize) - header[0] = byte(v.replicaType) + header[0] = 1 + header[1] = byte(v.replicaType) v.dataFile.Write(header) } } +func (v *Volume) readSuperBlock() { + v.dataFile.Seek(0, 0) + header := make([]byte, SuperBlockSize) + if _, error := v.dataFile.Read(header); error == nil { + v.replicaType = ReplicationType(header[1]) + } +} func (v *Volume) write(n *Needle) uint32 { v.accessLock.Lock() diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index 4bf2b2171..c9ed5265c 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -16,37 +16,38 @@ const ( Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center LengthRelicationType = 5 + CopyNil = ReplicationType(255) // nil value ) func NewReplicationType(t string) ReplicationType { - switch t { - case "00": - return Copy00 - case "01": - return Copy01 - case "10": - return Copy10 - case "11": - return Copy11 - case "20": - return Copy20 - } - return Copy00 + switch t { + case "00": + return Copy00 + case "01": + return Copy01 + case "10": + return Copy10 + case "11": + return Copy11 + case "20": + return Copy20 + } + return Copy00 } func (r *ReplicationType) String() string { - switch *r { - case Copy00: - return "00" - case Copy01: - return "01" - case Copy10: - return "10" - case Copy11: - return "11" - case Copy20: - return "20" - } - return "00" + switch *r { + case Copy00: + return "00" + case Copy01: + return "01" + case Copy10: + return "10" + case Copy11: + return "11" + case Copy20: + return "20" + } + return "00" } func GetReplicationLevelIndex(v *VolumeInfo) int {