|
@ -50,29 +50,39 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI |
|
|
return collection, vol, err |
|
|
return collection, vol, err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) { |
|
|
|
|
|
|
|
|
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { |
|
|
name := fileInfo.Name() |
|
|
name := fileInfo.Name() |
|
|
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { |
|
|
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { |
|
|
vid, collection, err := l.volumeIdFromPath(fileInfo) |
|
|
vid, collection, err := l.volumeIdFromPath(fileInfo) |
|
|
if err == nil { |
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
glog.Warningf("get volume id failed, %s, err : %s", name, err) |
|
|
|
|
|
return false |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// void loading one volume more than once
|
|
|
l.volumesLock.RLock() |
|
|
l.volumesLock.RLock() |
|
|
_, found := l.volumes[vid] |
|
|
_, found := l.volumes[vid] |
|
|
l.volumesLock.RUnlock() |
|
|
l.volumesLock.RUnlock() |
|
|
if !found { |
|
|
|
|
|
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { |
|
|
|
|
|
|
|
|
if found { |
|
|
|
|
|
glog.V(1).Infof("loaded volume, %v", vid) |
|
|
|
|
|
return true |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0) |
|
|
|
|
|
if e != nil { |
|
|
|
|
|
glog.V(0).Infof("new volume %s error %s", name, e) |
|
|
|
|
|
return false |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
l.volumesLock.Lock() |
|
|
l.volumesLock.Lock() |
|
|
l.volumes[vid] = v |
|
|
l.volumes[vid] = v |
|
|
l.volumesLock.Unlock() |
|
|
l.volumesLock.Unlock() |
|
|
size, _, _ := v.FileStat() |
|
|
size, _, _ := v.FileStat() |
|
|
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", |
|
|
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", |
|
|
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) |
|
|
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) |
|
|
// println("volume", vid, "last append at", v.lastAppendAtNs)
|
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(0).Infof("new volume %s error %s", name, e) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return true |
|
|
} |
|
|
} |
|
|
|
|
|
return false |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { |
|
|
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { |
|
@ -93,7 +103,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con |
|
|
go func() { |
|
|
go func() { |
|
|
defer wg.Done() |
|
|
defer wg.Done() |
|
|
for dir := range task_queue { |
|
|
for dir := range task_queue { |
|
|
l.loadExistingVolume(dir, needleMapKind) |
|
|
|
|
|
|
|
|
_ = l.loadExistingVolume(dir, needleMapKind) |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
} |
|
|
} |
|
@ -172,8 +182,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { |
|
|
|
|
|
|
|
|
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { |
|
|
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { |
|
|
if fileInfo, found := l.LocateVolume(vid); found { |
|
|
if fileInfo, found := l.LocateVolume(vid); found { |
|
|
l.loadExistingVolume(fileInfo, needleMapKind) |
|
|
|
|
|
return true |
|
|
|
|
|
|
|
|
return l.loadExistingVolume(fileInfo, needleMapKind) |
|
|
} |
|
|
} |
|
|
return false |
|
|
return false |
|
|
} |
|
|
} |
|
|