diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go index 3fbeebc9e..a4104716e 100644 --- a/go/replication/volume_growth_test.go +++ b/go/replication/volume_growth_test.go @@ -80,7 +80,11 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) + topo, err := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", + "/tmp", "testing", 32*1024, 5) + if err != nil { + panic("error: " + err.Error()) + } mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) diff --git a/go/storage/store.go b/go/storage/store.go index 142ee84e2..ccb6a89fb 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -120,9 +120,10 @@ func (s *Store) loadExistingVolumes() { func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = - VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), + RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter, + DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter, + ReadOnly: v.readOnly} stats = append(stats, s) } return stats @@ -138,9 +139,10 @@ func (s *Store) SetMaster(mserver string) { func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = - VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), + RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter, + DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter, + ReadOnly: v.readOnly} *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) @@ -171,7 +173,7 @@ func (s *Store) Close() { } } func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { - if v := s.volumes[i]; v != nil { + if v := s.volumes[i]; v != nil && !v.readOnly { size, err = v.write(n) if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) @@ -185,7 +187,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { return } func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { - if v := s.volumes[i]; v != nil { + if v := s.volumes[i]; v != nil && !v.readOnly { return v.delete(n) } return 0, nil diff --git a/go/storage/volume.go b/go/storage/volume.go index b9c7484d7..42a931a6d 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "log" "os" "path" "sync" @@ -30,6 +31,7 @@ type Volume struct { dir string dataFile *os.File nm *NeedleMap + readOnly bool SuperBlock @@ -53,7 +55,14 @@ func (v *Volume) load(alsoLoadIndex bool) error { fileName := path.Join(v.dir, v.Id.String()) v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) if e != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + if !os.IsPermission(e) { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + if v.dataFile, e = os.Open(fileName + ".dat"); e != nil { + return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e) + } + log.Printf("opening " + fileName + ".dat in READONLY mode") + v.readOnly = true } if v.ReplicaType == CopyNil { e = v.readSuperBlock() @@ -97,6 +106,14 @@ func (v *Volume) maybeWriteSuperBlock() error { if stat.Size() == 0 { v.SuperBlock.Version = CurrentVersion _, e = v.dataFile.Write(v.SuperBlock.Bytes()) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { + if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { + v.readOnly = false + } + } + } } return e } @@ -123,6 +140,10 @@ func (v *Volume) NeedToReplicate() bool { } func (v *Volume) write(n *Needle) (size uint32, err error) { + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile) + return + } v.accessLock.Lock() defer v.accessLock.Unlock() var offset int64 @@ -142,6 +163,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { return } func (v *Volume) delete(n *Needle) (uint32, error) { + if v.readOnly { + return 0, fmt.Errorf("%s is read-only", v.dataFile) + } v.accessLock.Lock() defer v.accessLock.Unlock() nv, ok := v.nm.Get(n.Id) diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index e4c5f6ec4..5a83b6e36 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -10,4 +10,5 @@ type VolumeInfo struct { FileCount int DeleteCount int DeletedByteCount uint64 + ReadOnly bool } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 0791bed20..cd725c132 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -39,13 +39,15 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion + return uint64(v.Size) < vl.volumeSizeLimit && + v.Version == storage.CurrentVersion && + !v.ReadOnly } func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { - if location := vl.vid2location[vid]; location != nil { - return location.list - } + if location := vl.vid2location[vid]; location != nil { + return location.list + } return nil }