You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.3 KiB

10 years ago
10 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. )
  8. func (v *Volume) garbageLevel() float64 {
  9. return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
  10. }
  11. func (v *Volume) Compact() error {
  12. glog.V(3).Infof("Compacting ...")
  13. //no need to lock for copy on write
  14. //v.accessLock.Lock()
  15. //defer v.accessLock.Unlock()
  16. //glog.V(3).Infof("Got Compaction lock...")
  17. filePath := v.FileName()
  18. glog.V(3).Infof("creating copies for volume %d ...", v.Id)
  19. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  20. }
  21. func (v *Volume) Compact2() error {
  22. glog.V(3).Infof("Compact2 ...")
  23. filePath := v.FileName()
  24. glog.V(3).Infof("creating copies for volume %d ...", v.Id)
  25. return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
  26. }
  27. func (v *Volume) commitCompact() error {
  28. glog.V(3).Infof("Committing vacuuming...")
  29. v.dataFileAccessLock.Lock()
  30. defer v.dataFileAccessLock.Unlock()
  31. glog.V(3).Infof("Got Committing lock...")
  32. v.nm.Close()
  33. _ = v.dataFile.Close()
  34. var e error
  35. if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
  36. return e
  37. }
  38. if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
  39. return e
  40. }
  41. //glog.V(3).Infof("Pretending to be vacuuming...")
  42. //time.Sleep(20 * time.Second)
  43. glog.V(3).Infof("Loading Commit file...")
  44. if e = v.load(true, false, v.needleMapKind); e != nil {
  45. return e
  46. }
  47. return nil
  48. }
  49. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  50. var (
  51. dst, idx *os.File
  52. )
  53. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  54. return
  55. }
  56. defer dst.Close()
  57. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  58. return
  59. }
  60. defer idx.Close()
  61. nm := NewNeedleMap(idx)
  62. new_offset := int64(SuperBlockSize)
  63. now := uint64(time.Now().Unix())
  64. err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind,
  65. func(superBlock SuperBlock) error {
  66. superBlock.CompactRevision++
  67. _, err = dst.Write(superBlock.Bytes())
  68. return err
  69. }, true, func(n *Needle, offset int64) error {
  70. if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
  71. return nil
  72. }
  73. nv, ok := v.nm.Get(n.Id)
  74. glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
  75. if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
  76. if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
  77. return fmt.Errorf("cannot put needle: %s", err)
  78. }
  79. if _, err = n.Append(dst, v.Version()); err != nil {
  80. return fmt.Errorf("cannot append needle: %s", err)
  81. }
  82. new_offset += n.DiskSize()
  83. glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
  84. }
  85. return nil
  86. })
  87. return
  88. }
  89. func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
  90. var (
  91. dst, idx, oldIndexFile *os.File
  92. )
  93. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  94. return
  95. }
  96. defer dst.Close()
  97. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  98. return
  99. }
  100. defer idx.Close()
  101. if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
  102. return
  103. }
  104. defer oldIndexFile.Close()
  105. nm := NewNeedleMap(idx)
  106. now := uint64(time.Now().Unix())
  107. v.SuperBlock.CompactRevision++
  108. dst.Write(v.SuperBlock.Bytes())
  109. new_offset := int64(SuperBlockSize)
  110. WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error {
  111. if size <= 0 {
  112. return nil
  113. }
  114. nv, ok := v.nm.Get(key)
  115. if !ok {
  116. return nil
  117. }
  118. n := new(Needle)
  119. n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version())
  120. defer n.ReleaseMemory()
  121. if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
  122. return nil
  123. }
  124. glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
  125. if nv.Offset == offset && nv.Size > 0 {
  126. if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
  127. return fmt.Errorf("cannot put needle: %s", err)
  128. }
  129. if _, err = n.Append(dst, v.Version()); err != nil {
  130. return fmt.Errorf("cannot append needle: %s", err)
  131. }
  132. new_offset += n.DiskSize()
  133. glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
  134. }
  135. return nil
  136. })
  137. return
  138. }