Browse Source

fix some data race problem detect by go race

pull/279/head
tnextday 10 years ago
parent
commit
6e20f65528
  1. 48
      go/stats/duration_counter.go
  2. 8
      go/storage/store.go
  3. 44
      go/storage/volume.go
  4. 4
      go/storage/volume_super_block.go
  5. 4
      go/storage/volume_vacuum.go

48
go/stats/duration_counter.go

@ -1,6 +1,7 @@
package stats
import (
"sync"
"time"
)
@ -14,28 +15,33 @@ func NewTimedValue(t time.Time, val int64) *TimedValue {
}
type RoundRobinCounter struct {
LastIndex int
Values []int64
Counts []int64
lastIndex int
values []int64
counts []int64
mutex sync.RWMutex
}
func NewRoundRobinCounter(slots int) *RoundRobinCounter {
return &RoundRobinCounter{LastIndex: -1, Values: make([]int64, slots), Counts: make([]int64, slots)}
return &RoundRobinCounter{lastIndex: -1, values: make([]int64, slots), counts: make([]int64, slots)}
}
func (rrc *RoundRobinCounter) Add(index int, val int64) {
if index >= len(rrc.Values) {
rrc.mutex.Lock()
defer rrc.mutex.Unlock()
if index >= len(rrc.values) {
return
}
for rrc.LastIndex != index {
rrc.LastIndex = (rrc.LastIndex + 1) % len(rrc.Values)
rrc.Values[rrc.LastIndex] = 0
rrc.Counts[rrc.LastIndex] = 0
for rrc.lastIndex != index {
rrc.lastIndex = (rrc.lastIndex + 1) % len(rrc.values)
rrc.values[rrc.lastIndex] = 0
rrc.counts[rrc.lastIndex] = 0
}
rrc.Values[index] += val
rrc.Counts[index]++
rrc.values[index] += val
rrc.counts[index]++
}
func (rrc *RoundRobinCounter) Max() (max int64) {
for _, val := range rrc.Values {
rrc.mutex.RLock()
defer rrc.mutex.RUnlock()
for _, val := range rrc.values {
if max < val {
max = val
}
@ -43,28 +49,34 @@ func (rrc *RoundRobinCounter) Max() (max int64) {
return
}
func (rrc *RoundRobinCounter) Count() (cnt int64) {
for _, c := range rrc.Counts {
rrc.mutex.RLock()
defer rrc.mutex.RUnlock()
for _, c := range rrc.counts {
cnt += c
}
return
}
func (rrc *RoundRobinCounter) Sum() (sum int64) {
for _, val := range rrc.Values {
rrc.mutex.RLock()
defer rrc.mutex.RUnlock()
for _, val := range rrc.values {
sum += val
}
return
}
func (rrc *RoundRobinCounter) ToList() (ret []int64) {
index := rrc.LastIndex
step := len(rrc.Values)
rrc.mutex.RLock()
defer rrc.mutex.RUnlock()
index := rrc.lastIndex
step := len(rrc.values)
for step > 0 {
step--
index++
if index >= len(rrc.Values) {
if index >= len(rrc.values) {
index = 0
}
ret = append(ret, rrc.Values[index])
ret = append(ret, rrc.values[index])
}
return
}

8
go/storage/store.go

@ -190,7 +190,7 @@ func (s *Store) Status() []*VolumeInfo {
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly,
ReadOnly: v.IsReadOnly(),
Ttl: v.Ttl}
stats = append(stats, s)
return nil
@ -233,7 +233,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
FileCount: proto.Uint64(uint64(v.nm.FileCount())),
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
ReadOnly: proto.Bool(v.readOnly),
ReadOnly: proto.Bool(v.IsReadOnly()),
Version: proto.Uint32(uint32(v.Version())),
Ttl: proto.Uint32(v.Ttl.ToUint32()),
}
@ -300,7 +300,7 @@ func (s *Store) Close() {
}
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
if v := s.findVolume(i); v != nil {
if v.readOnly {
if v.IsReadOnly() {
err = fmt.Errorf("Volume %d is read only", i)
return
}
@ -322,7 +322,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
return
}
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
if v := s.findVolume(i); v != nil && !v.readOnly {
if v := s.findVolume(i); v != nil && !v.IsReadOnly() {
return v.delete(n)
}
return 0, nil

44
go/storage/volume.go

@ -24,7 +24,7 @@ type Volume struct {
SuperBlock
dataFileAccessLock sync.RWMutex
mutex sync.RWMutex
lastModifiedTime uint64 //unix time in seconds
}
@ -36,6 +36,8 @@ func NewVolume(dirname string, collection string, id VolumeId, needleMapKind Nee
return
}
func (v *Volume) String() string {
v.mutex.RLock()
defer v.mutex.RUnlock()
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
}
@ -131,20 +133,22 @@ func (v *Volume) Version() Version {
return v.SuperBlock.Version()
}
func (v *Volume) Size() int64 {
v.mutex.RLock()
defer v.mutex.RUnlock()
stat, e := v.dataFile.Stat()
if e == nil {
return stat.Size()
}
glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
glog.V(2).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
return -1
}
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
v.mutex.Lock()
defer v.mutex.Unlock()
v.nm.Close()
_ = v.dataFile.Close()
v.dataFile.Close()
}
// isFileUnchanged checks whether this needle to write is same as last one.
@ -171,7 +175,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
// Destroy removes everything related to this volume
func (v *Volume) Destroy() (err error) {
if v.readOnly {
if v.IsReadOnly() {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
return
}
@ -186,12 +190,12 @@ func (v *Volume) Destroy() (err error) {
// AppendBlob append a blob to end of the data file, used in replication
func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
v.mutex.Lock()
defer v.mutex.Unlock()
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
return
}
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
@ -209,13 +213,13 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
}
func (v *Volume) write(n *Needle) (size uint32, err error) {
v.mutex.Lock()
defer v.mutex.Unlock()
glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
return
}
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if v.isFileUnchanged(n) {
size = n.DataSize
glog.V(4).Infof("needle is unchanged!")
@ -255,12 +259,12 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
}
func (v *Volume) delete(n *Needle) (uint32, error) {
v.mutex.Lock()
defer v.mutex.Unlock()
glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
}
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok {
@ -284,9 +288,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if !ok || nv.Offset == 0 {
return -1, errors.New("Not Found")
}
v.dataFileAccessLock.RLock()
v.mutex.RLock()
err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
v.dataFileAccessLock.RUnlock()
v.mutex.RUnlock()
if err != nil {
return 0, err
}
@ -392,6 +396,8 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti
// or when the volume does not have a ttl
// or when volumeSizeLimit is 0 when server just starts
func (v *Volume) expired(volumeSizeLimit uint64) bool {
v.mutex.RLock()
defer v.mutex.RUnlock()
if volumeSizeLimit == 0 {
//skip if we don't know size limit
return false
@ -413,6 +419,8 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
// wait either maxDelayMinutes or 10% of ttl minutes
func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
v.mutex.RLock()
defer v.mutex.RUnlock()
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
@ -428,6 +436,8 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
}
func (v *Volume) SetReadOnly(isReadOnly bool) error {
v.mutex.Lock()
defer v.mutex.Unlock()
if isReadOnly == false {
if fi, e := v.dataFile.Stat(); e != nil {
return e
@ -440,3 +450,9 @@ func (v *Volume) SetReadOnly(isReadOnly bool) error {
v.readOnly = isReadOnly
return nil
}
func (v *Volume) IsReadOnly() bool {
v.mutex.RLock()
defer v.mutex.RUnlock()
return v.readOnly
}

4
go/storage/volume_super_block.go

@ -74,8 +74,8 @@ func (v *Volume) readSuperBlock() (err error) {
}
func (v *Volume) writeSuperBlock() (err error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
v.mutex.Lock()
defer v.mutex.Unlock()
if _, e := v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0); e != nil {
return fmt.Errorf("cannot write volume %d super block: %v", v.Id, e)
}

4
go/storage/volume_vacuum.go

@ -25,8 +25,8 @@ func (v *Volume) Compact() error {
}
func (v *Volume) commitCompact() error {
glog.V(3).Infof("Committing vacuuming...")
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
v.mutex.Lock()
defer v.mutex.Unlock()
glog.V(3).Infof("Got Committing lock...")
_ = v.dataFile.Close()
var e error

Loading…
Cancel
Save