Browse Source

make Disklocation is concurrent safe

pull/279/head
tnextday 10 years ago
parent
commit
e791e53952
  1. 126
      go/storage/disk_location.go
  2. 99
      go/storage/store.go
  3. 2
      go/storage/store_task_replication.go
  4. 6
      go/storage/volume.go

126
go/storage/disk_location.go

@ -0,0 +1,126 @@
package storage
import (
"io/ioutil"
"strings"
"sync"
"github.com/golang/glog"
)
// DiskLocation is concurrent safe
type DiskLocation struct {
Directory string
MaxVolumeCount int
volumes map[VolumeId]*Volume
lock sync.RWMutex
}
func NewDiskLocation(dir string, maxVolCount int) *DiskLocation {
return &DiskLocation{
Directory: dir,
MaxVolumeCount: maxVolCount,
volumes: make(map[VolumeId]*Volume),
}
}
func (l *DiskLocation) LoadExistingVolumes(needleMapKind NeedleMapType) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
collection := ""
base := name[:len(name)-len(".dat")]
i := strings.LastIndex(base, "_")
if i > 0 {
collection, base = base[0:i], base[i+1:]
}
if vid, err := NewVolumeId(base); err == nil {
if !l.HasVolume(vid) {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil {
l.AddVolume(vid, v)
glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
}
}
}
}
}
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
}
func (l *DiskLocation) AddVolume(vid VolumeId, v *Volume) {
l.lock.Lock()
defer l.lock.Unlock()
l.volumes[vid] = v
}
func (l *DiskLocation) DeleteVolume(vid VolumeId) (e error) {
l.lock.Lock()
defer l.lock.Unlock()
if v, ok := l.volumes[vid]; ok {
e = v.Destroy()
}
delete(l.volumes, vid)
return
}
func (l *DiskLocation) DeleteCollection(collection string) (e error) {
l.lock.Lock()
defer l.lock.Unlock()
for k, v := range l.volumes {
if v.Collection == collection {
e = v.Destroy()
if e != nil {
return
}
delete(l.volumes, k)
}
}
return
}
func (l *DiskLocation) HasVolume(vid VolumeId) bool {
l.lock.RLock()
defer l.lock.RUnlock()
_, ok := l.volumes[vid]
return ok
}
func (l *DiskLocation) GetVolume(vid VolumeId) (v *Volume, ok bool) {
l.lock.RLock()
defer l.lock.RUnlock()
v, ok = l.volumes[vid]
return
}
func (l *DiskLocation) VolumeCount() int {
l.lock.RLock()
defer l.lock.RUnlock()
return len(l.volumes)
}
func (l *DiskLocation) CloseAllVolume() {
l.lock.RLock()
defer l.lock.RUnlock()
for _, v := range l.volumes {
v.Close()
}
}
// break walk when walker fuc return an error
type VolumeWalker func(v *Volume) (e error)
// must not add or delete volume in walker
func (l *DiskLocation) WalkVolume(vw VolumeWalker) (e error) {
l.lock.RLock()
defer l.lock.RUnlock()
for _, v := range l.volumes {
if e = vw(v); e != nil {
return e
}
}
return
}

99
go/storage/store.go

@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"strconv"
"strings"
@ -20,15 +19,6 @@ const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
type DiskLocation struct {
Directory string
MaxVolumeCount int
volumes map[VolumeId]*Volume
}
func (mn *DiskLocation) reset() {
}
type MasterNodes struct {
nodes []string
lastNode int
@ -107,9 +97,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume)
location.loadExistingVolumes(needleMapKind)
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
location.LoadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
}
return
@ -148,29 +137,17 @@ func (s *Store) AddVolume(volumeListString string, collection string, ttlString
}
func (s *Store) DeleteCollection(collection string) (e error) {
for _, location := range s.Locations {
for k, v := range location.volumes {
if v.Collection == collection {
e = v.Destroy()
if e != nil {
return
}
delete(location.volumes, k)
}
}
location.DeleteCollection(collection)
}
return
}
func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
e = v.Destroy()
if e != nil {
return
}
delete(volumes, v.Id)
return
func (s *Store) DeleteVolume(dl *DiskLocation, v *Volume) (e error) {
return dl.DeleteVolume(v.Id)
}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
if v, found := location.GetVolume(vid); found {
return v
}
}
@ -179,7 +156,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume {
func (s *Store) findFreeLocation() (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - len(location.volumes)
currentFreeCount := location.MaxVolumeCount - location.VolumeCount()
if currentFreeCount > max {
max = currentFreeCount
ret = location
@ -195,7 +172,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v",
location.Directory, vid, collection, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, s.needleMapKind, ttl); err == nil {
location.volumes[vid] = volume
location.AddVolume(vid, volume)
return nil
} else {
return err
@ -204,38 +181,12 @@ func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error {
return fmt.Errorf("No more free space left")
}
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
collection := ""
base := name[:len(name)-len(".dat")]
i := strings.LastIndex(base, "_")
if i > 0 {
collection, base = base[0:i], base[i+1:]
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
}
}
}
}
}
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
}
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for _, location := range s.Locations {
for k, v := range location.volumes {
location.WalkVolume(func(v *Volume) (e error) {
s := &VolumeInfo{
Id: VolumeId(k),
Id: VolumeId(v.Id),
Size: v.ContentSize(),
Collection: v.Collection,
Version: v.Version(),
@ -245,7 +196,8 @@ func (s *Store) Status() []*VolumeInfo {
ReadOnly: v.readOnly,
Ttl: v.Ttl}
stats = append(stats, s)
}
return nil
})
}
sortVolumeInfos(stats)
return stats
@ -271,13 +223,14 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
var maxFileKey uint64
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
volumeToDelete := []VolumeId{}
location.WalkVolume(func(v *Volume) (e error) {
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
if !v.expired(s.volumeSizeLimit) {
volumeMessage := &operation.VolumeInformationMessage{
Id: proto.Uint32(uint32(k)),
Id: proto.Uint32(uint32(v.Id)),
Size: proto.Uint64(uint64(v.Size())),
Collection: proto.String(v.Collection),
FileCount: proto.Uint64(uint64(v.nm.FileCount())),
@ -289,13 +242,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
volumeMessages = append(volumeMessages, volumeMessage)
} else {
if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
s.DeleteVolume(location.volumes, v)
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
volumeToDelete = append(volumeToDelete, v.Id)
glog.V(0).Infoln("volume", v.Id, "is deleted.")
} else {
glog.V(0).Infoln("volume", v.Id, "is expired.")
}
}
return nil
})
for _, vid := range volumeToDelete {
location.DeleteVolume(vid)
}
}
@ -341,9 +298,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
func (s *Store) Close() {
for _, location := range s.Locations {
for _, v := range location.volumes {
v.Close()
}
location.CloseAllVolume()
}
}
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
@ -390,14 +345,10 @@ func (s *Store) HasVolume(i VolumeId) bool {
return v != nil
}
type VolumeWalker func(v *Volume) (e error)
func (s *Store) WalkVolume(walker VolumeWalker) error {
for _, location := range s.Locations {
for _, v := range location.volumes {
if e := walker(v); e != nil {
return e
}
if e := location.WalkVolume(walker); e != nil {
return e
}
}
return nil

2
go/storage/store_task_replication.go

@ -88,7 +88,7 @@ func (t *ReplicaTask) Commit() error {
}
volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil)
if e == nil {
t.location.volumes[t.VID] = volume
t.location.AddVolume(t.VID, volume)
t.s.SendHeartbeatToMaster()
}
return e

6
go/storage/volume.go

@ -24,7 +24,7 @@ type Volume struct {
SuperBlock
dataFileAccessLock sync.Mutex
dataFileAccessLock sync.RWMutex
lastModifiedTime uint64 //unix time in seconds
}
@ -284,7 +284,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if !ok || nv.Offset == 0 {
return -1, errors.New("Not Found")
}
v.dataFileAccessLock.RLock()
err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
v.dataFileAccessLock.RUnlock()
if err != nil {
return 0, err
}
@ -410,7 +412,7 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
}
// wait either maxDelayMinutes or 10% of ttl minutes
func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}

Loading…
Cancel
Save