You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
4.9 KiB

9 years ago
  1. package topology
  2. import (
  3. "context"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage"
  9. )
  10. func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
  11. ch := make(chan bool, locationlist.Length())
  12. for index, dn := range locationlist.list {
  13. go func(index int, url string, vid storage.VolumeId) {
  14. err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  15. resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
  16. VolumdId: uint32(vid),
  17. })
  18. if err != nil {
  19. ch <- false
  20. return err
  21. }
  22. isNeeded := resp.GarbageRatio > garbageThreshold
  23. ch <- isNeeded
  24. return nil
  25. })
  26. if err != nil {
  27. glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
  28. }
  29. }(index, dn.Url(), vid)
  30. }
  31. isCheckSuccess := true
  32. for _ = range locationlist.list {
  33. select {
  34. case canVacuum := <-ch:
  35. isCheckSuccess = isCheckSuccess && canVacuum
  36. case <-time.After(30 * time.Minute):
  37. isCheckSuccess = false
  38. break
  39. }
  40. }
  41. return isCheckSuccess
  42. }
  43. func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
  44. vl.removeFromWritable(vid)
  45. ch := make(chan bool, locationlist.Length())
  46. for index, dn := range locationlist.list {
  47. go func(index int, url string, vid storage.VolumeId) {
  48. glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
  49. err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  50. _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
  51. VolumdId: uint32(vid),
  52. })
  53. return err
  54. })
  55. if err != nil {
  56. glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)
  57. ch <- false
  58. } else {
  59. glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
  60. ch <- true
  61. }
  62. }(index, dn.Url(), vid)
  63. }
  64. isVacuumSuccess := true
  65. for _ = range locationlist.list {
  66. select {
  67. case canCommit := <-ch:
  68. isVacuumSuccess = isVacuumSuccess && canCommit
  69. case <-time.After(30 * time.Minute):
  70. isVacuumSuccess = false
  71. break
  72. }
  73. }
  74. return isVacuumSuccess
  75. }
  76. func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
  77. isCommitSuccess := true
  78. for _, dn := range locationlist.list {
  79. glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
  80. err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  81. _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
  82. VolumdId: uint32(vid),
  83. })
  84. return err
  85. })
  86. if err != nil {
  87. glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
  88. isCommitSuccess = false
  89. } else {
  90. glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url())
  91. }
  92. if isCommitSuccess {
  93. vl.SetVolumeAvailable(dn, vid)
  94. }
  95. }
  96. return isCommitSuccess
  97. }
  98. func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
  99. for _, dn := range locationlist.list {
  100. glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
  101. err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  102. _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
  103. VolumdId: uint32(vid),
  104. })
  105. return err
  106. })
  107. if err != nil {
  108. glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, dn.Url(), err)
  109. } else {
  110. glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, dn.Url())
  111. }
  112. }
  113. }
  114. func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int {
  115. glog.V(0).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
  116. for _, col := range t.collectionMap.Items() {
  117. c := col.(*Collection)
  118. for _, vl := range c.storageType2VolumeLayout.Items() {
  119. if vl != nil {
  120. volumeLayout := vl.(*VolumeLayout)
  121. for vid, locationlist := range volumeLayout.vid2location {
  122. volumeLayout.accessLock.RLock()
  123. isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
  124. volumeLayout.accessLock.RUnlock()
  125. if hasValue && isReadOnly {
  126. continue
  127. }
  128. glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
  129. if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
  130. if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) {
  131. batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
  132. }
  133. }
  134. }
  135. }
  136. }
  137. }
  138. return 0
  139. }
  140. type VacuumVolumeResult struct {
  141. Result bool
  142. Error string
  143. }