Browse Source

add lock variable

pull/1148/head
Chris Lu 5 years ago
parent
commit
6383b45bd0
  1. 2
      weed/server/volume_server_handlers_admin.go
  2. 2
      weed/server/volume_server_handlers_ui.go
  3. 40
      weed/storage/disk_location.go
  4. 14
      weed/storage/store.go

2
weed/server/volume_server_handlers_admin.go

@ -12,7 +12,7 @@ import (
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Version"] = util.VERSION m["Version"] = util.VERSION
m["Volumes"] = vs.store.Status()
m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m) writeJsonQuiet(w, r, http.StatusOK, m)
} }

2
weed/server/volume_server_handlers_ui.go

@ -31,7 +31,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
}{ }{
util.VERSION, util.VERSION,
vs.SeedMasterNodes, vs.SeedMasterNodes,
vs.store.Status(),
vs.store.VolumeInfos(),
vs.store.EcVolumes(), vs.store.EcVolumes(),
ds, ds,
infos, infos,

40
weed/storage/disk_location.go

@ -17,7 +17,7 @@ type DiskLocation struct {
Directory string Directory string
MaxVolumeCount int MaxVolumeCount int
volumes map[needle.VolumeId]*Volume volumes map[needle.VolumeId]*Volume
sync.RWMutex
volumesLock sync.RWMutex
// erasure coding // erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
@ -56,14 +56,14 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
vid, collection, err := l.volumeIdFromPath(fileInfo) vid, collection, err := l.volumeIdFromPath(fileInfo)
if err == nil { if err == nil {
l.RLock()
l.volumesLock.RLock()
_, found := l.volumes[vid] _, found := l.volumes[vid]
l.RUnlock()
l.volumesLock.RUnlock()
if !found { if !found {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil {
l.Lock()
l.volumesLock.Lock()
l.volumes[vid] = v l.volumes[vid] = v
l.Unlock()
l.volumesLock.Unlock()
size, _, _ := v.FileStat() size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
@ -115,17 +115,17 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.Lock()
l.volumesLock.Lock()
for k, v := range l.volumes { for k, v := range l.volumes {
if v.Collection == collection { if v.Collection == collection {
e = l.deleteVolumeById(k) e = l.deleteVolumeById(k)
if e != nil { if e != nil {
l.Unlock()
l.volumesLock.Unlock()
return return
} }
} }
} }
l.Unlock()
l.volumesLock.Unlock()
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
for k, v := range l.ecVolumes { for k, v := range l.ecVolumes {
@ -170,8 +170,8 @@ func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapTy
} }
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
l.Lock()
defer l.Unlock()
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
_, ok := l.volumes[vid] _, ok := l.volumes[vid]
if !ok { if !ok {
@ -181,8 +181,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
} }
func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
l.Lock()
defer l.Unlock()
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
v, ok := l.volumes[vid] v, ok := l.volumes[vid]
if !ok { if !ok {
@ -194,33 +194,33 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
} }
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
l.Lock()
defer l.Unlock()
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
l.volumes[vid] = volume l.volumes[vid] = volume
} }
func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) { func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
l.RLock()
defer l.RUnlock()
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()
v, ok := l.volumes[vid] v, ok := l.volumes[vid]
return v, ok return v, ok
} }
func (l *DiskLocation) VolumesLen() int { func (l *DiskLocation) VolumesLen() int {
l.RLock()
defer l.RUnlock()
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()
return len(l.volumes) return len(l.volumes)
} }
func (l *DiskLocation) Close() { func (l *DiskLocation) Close() {
l.Lock()
l.volumesLock.Lock()
for _, v := range l.volumes { for _, v := range l.volumes {
v.Close() v.Close()
} }
l.Unlock()
l.volumesLock.Unlock()
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
for _, ecVolume := range l.ecVolumes { for _, ecVolume := range l.ecVolumes {

14
weed/storage/store.go

@ -126,10 +126,10 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
return fmt.Errorf("No more free space left") return fmt.Errorf("No more free space left")
} }
func (s *Store) Status() []*VolumeInfo {
func (s *Store) VolumeInfos() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for _, location := range s.Locations { for _, location := range s.Locations {
location.RLock()
location.volumesLock.RLock()
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{ s := &VolumeInfo{
Id: needle.VolumeId(k), Id: needle.VolumeId(k),
@ -146,7 +146,7 @@ func (s *Store) Status() []*VolumeInfo {
} }
stats = append(stats, s) stats = append(stats, s)
} }
location.RUnlock()
location.volumesLock.RUnlock()
} }
sortVolumeInfos(stats) sortVolumeInfos(stats)
return stats return stats
@ -167,7 +167,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
for _, location := range s.Locations { for _, location := range s.Locations {
var deleteVids []needle.VolumeId var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.RLock()
location.volumesLock.RLock()
for _, v := range location.volumes { for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() { if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey() maxFileKey = v.MaxFileKey()
@ -184,16 +184,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
fileSize, _, _ := v.FileStat() fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize collectionVolumeSize[v.Collection] += fileSize
} }
location.RUnlock()
location.volumesLock.RUnlock()
if len(deleteVids) > 0 { if len(deleteVids) > 0 {
// delete expired volumes. // delete expired volumes.
location.Lock()
location.volumesLock.Lock()
for _, vid := range deleteVids { for _, vid := range deleteVids {
location.deleteVolumeById(vid) location.deleteVolumeById(vid)
glog.V(0).Infoln("volume", vid, "is deleted.") glog.V(0).Infoln("volume", vid, "is deleted.")
} }
location.Unlock()
location.volumesLock.Unlock()
} }
} }

Loading…
Cancel
Save