From e791e5395204ead6d05a9e6da81dff2714ba66d4 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sun, 28 Feb 2016 10:52:50 +0800 Subject: [PATCH] make Disklocation is concurrent safe --- go/storage/disk_location.go | 126 +++++++++++++++++++++++++++ go/storage/store.go | 99 ++++++--------------- go/storage/store_task_replication.go | 2 +- go/storage/volume.go | 6 +- 4 files changed, 156 insertions(+), 77 deletions(-) create mode 100644 go/storage/disk_location.go diff --git a/go/storage/disk_location.go b/go/storage/disk_location.go new file mode 100644 index 000000000..9267343e4 --- /dev/null +++ b/go/storage/disk_location.go @@ -0,0 +1,126 @@ +package storage + +import ( + "io/ioutil" + "strings" + "sync" + + "github.com/golang/glog" +) + +// DiskLocation is concurrent safe +type DiskLocation struct { + Directory string + MaxVolumeCount int + volumes map[VolumeId]*Volume + lock sync.RWMutex +} + +func NewDiskLocation(dir string, maxVolCount int) *DiskLocation { + return &DiskLocation{ + Directory: dir, + MaxVolumeCount: maxVolCount, + volumes: make(map[VolumeId]*Volume), + } +} + +func (l *DiskLocation) LoadExistingVolumes(needleMapKind NeedleMapType) { + if dirs, err := ioutil.ReadDir(l.Directory); err == nil { + for _, dir := range dirs { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + collection := "" + base := name[:len(name)-len(".dat")] + i := strings.LastIndex(base, "_") + if i > 0 { + collection, base = base[0:i], base[i+1:] + } + if vid, err := NewVolumeId(base); err == nil { + if !l.HasVolume(vid) { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil { + l.AddVolume(vid, v) + glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String()) + } else { + glog.V(0).Infof("new volume %s error %s", name, e) + } + } + } + } + } + } + glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) +} + +func (l *DiskLocation) AddVolume(vid VolumeId, v *Volume) { + l.lock.Lock() + defer l.lock.Unlock() + l.volumes[vid] = v +} + +func (l *DiskLocation) DeleteVolume(vid VolumeId) (e error) { + l.lock.Lock() + defer l.lock.Unlock() + if v, ok := l.volumes[vid]; ok { + e = v.Destroy() + } + delete(l.volumes, vid) + return +} + +func (l *DiskLocation) DeleteCollection(collection string) (e error) { + l.lock.Lock() + defer l.lock.Unlock() + for k, v := range l.volumes { + if v.Collection == collection { + e = v.Destroy() + if e != nil { + return + } + delete(l.volumes, k) + } + } + return +} + +func (l *DiskLocation) HasVolume(vid VolumeId) bool { + l.lock.RLock() + defer l.lock.RUnlock() + _, ok := l.volumes[vid] + return ok +} + +func (l *DiskLocation) GetVolume(vid VolumeId) (v *Volume, ok bool) { + l.lock.RLock() + defer l.lock.RUnlock() + v, ok = l.volumes[vid] + return +} + +func (l *DiskLocation) VolumeCount() int { + l.lock.RLock() + defer l.lock.RUnlock() + return len(l.volumes) +} + +func (l *DiskLocation) CloseAllVolume() { + l.lock.RLock() + defer l.lock.RUnlock() + for _, v := range l.volumes { + v.Close() + } +} + +// break walk when walker fuc return an error +type VolumeWalker func(v *Volume) (e error) + +// must not add or delete volume in walker +func (l *DiskLocation) WalkVolume(vw VolumeWalker) (e error) { + l.lock.RLock() + defer l.lock.RUnlock() + for _, v := range l.volumes { + if e = vw(v); e != nil { + return e + } + } + return +} diff --git a/go/storage/store.go b/go/storage/store.go index 702db99fa..872d5e44b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" "math/rand" "strconv" "strings" @@ -20,15 +19,6 @@ const ( MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes ) -type DiskLocation struct { - Directory string - MaxVolumeCount int - volumes map[VolumeId]*Volume -} - -func (mn *DiskLocation) reset() { -} - type MasterNodes struct { nodes []string lastNode int @@ -107,9 +97,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} - location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes(needleMapKind) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) + location.LoadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) } return @@ -148,29 +137,17 @@ func (s *Store) AddVolume(volumeListString string, collection string, ttlString } func (s *Store) DeleteCollection(collection string) (e error) { for _, location := range s.Locations { - for k, v := range location.volumes { - if v.Collection == collection { - e = v.Destroy() - if e != nil { - return - } - delete(location.volumes, k) - } - } + location.DeleteCollection(collection) } return } -func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) { - e = v.Destroy() - if e != nil { - return - } - delete(volumes, v.Id) - return +func (s *Store) DeleteVolume(dl *DiskLocation, v *Volume) (e error) { + + return dl.DeleteVolume(v.Id) } func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { - if v, found := location.volumes[vid]; found { + if v, found := location.GetVolume(vid); found { return v } } @@ -179,7 +156,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume { func (s *Store) findFreeLocation() (ret *DiskLocation) { max := 0 for _, location := range s.Locations { - currentFreeCount := location.MaxVolumeCount - len(location.volumes) + currentFreeCount := location.MaxVolumeCount - location.VolumeCount() if currentFreeCount > max { max = currentFreeCount ret = location @@ -195,7 +172,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error { glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v", location.Directory, vid, collection, ttl) if volume, err := NewVolume(location.Directory, collection, vid, s.needleMapKind, ttl); err == nil { - location.volumes[vid] = volume + location.AddVolume(vid, volume) return nil } else { return err @@ -204,38 +181,12 @@ func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error { return fmt.Errorf("No more free space left") } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { - if dirs, err := ioutil.ReadDir(l.Directory); err == nil { - for _, dir := range dirs { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - collection := "" - base := name[:len(name)-len(".dat")] - i := strings.LastIndex(base, "_") - if i > 0 { - collection, base = base[0:i], base[i+1:] - } - if vid, err := NewVolumeId(base); err == nil { - if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil { - l.volumes[vid] = v - glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String()) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } - } - } - } - } - } - glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) -} func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for _, location := range s.Locations { - for k, v := range location.volumes { + location.WalkVolume(func(v *Volume) (e error) { s := &VolumeInfo{ - Id: VolumeId(k), + Id: VolumeId(v.Id), Size: v.ContentSize(), Collection: v.Collection, Version: v.Version(), @@ -245,7 +196,8 @@ func (s *Store) Status() []*VolumeInfo { ReadOnly: v.readOnly, Ttl: v.Ttl} stats = append(stats, s) - } + return nil + }) } sortVolumeInfos(stats) return stats @@ -271,13 +223,14 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S var maxFileKey uint64 for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount - for k, v := range location.volumes { + volumeToDelete := []VolumeId{} + location.WalkVolume(func(v *Volume) (e error) { if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } if !v.expired(s.volumeSizeLimit) { volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), + Id: proto.Uint32(uint32(v.Id)), Size: proto.Uint64(uint64(v.Size())), Collection: proto.String(v.Collection), FileCount: proto.Uint64(uint64(v.nm.FileCount())), @@ -289,13 +242,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } volumeMessages = append(volumeMessages, volumeMessage) } else { - if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { - s.DeleteVolume(location.volumes, v) + if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + volumeToDelete = append(volumeToDelete, v.Id) glog.V(0).Infoln("volume", v.Id, "is deleted.") } else { glog.V(0).Infoln("volume", v.Id, "is expired.") } } + return nil + }) + for _, vid := range volumeToDelete { + location.DeleteVolume(vid) } } @@ -341,9 +298,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } func (s *Store) Close() { for _, location := range s.Locations { - for _, v := range location.volumes { - v.Close() - } + location.CloseAllVolume() } } func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { @@ -390,14 +345,10 @@ func (s *Store) HasVolume(i VolumeId) bool { return v != nil } -type VolumeWalker func(v *Volume) (e error) - func (s *Store) WalkVolume(walker VolumeWalker) error { for _, location := range s.Locations { - for _, v := range location.volumes { - if e := walker(v); e != nil { - return e - } + if e := location.WalkVolume(walker); e != nil { + return e } } return nil diff --git a/go/storage/store_task_replication.go b/go/storage/store_task_replication.go index 0931c831e..aa01e79bf 100644 --- a/go/storage/store_task_replication.go +++ b/go/storage/store_task_replication.go @@ -88,7 +88,7 @@ func (t *ReplicaTask) Commit() error { } volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil) if e == nil { - t.location.volumes[t.VID] = volume + t.location.AddVolume(t.VID, volume) t.s.SendHeartbeatToMaster() } return e diff --git a/go/storage/volume.go b/go/storage/volume.go index 33c77a38f..a51543cc8 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -24,7 +24,7 @@ type Volume struct { SuperBlock - dataFileAccessLock sync.Mutex + dataFileAccessLock sync.RWMutex lastModifiedTime uint64 //unix time in seconds } @@ -284,7 +284,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { if !ok || nv.Offset == 0 { return -1, errors.New("Not Found") } + v.dataFileAccessLock.RLock() err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + v.dataFileAccessLock.RUnlock() if err != nil { return 0, err } @@ -410,7 +412,7 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool { } // wait either maxDelayMinutes or 10% of ttl minutes -func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { +func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false }