114 lines
3.4 KiB

2 years ago
2 years ago
6 years ago
3 years ago
2 years ago
2 years ago
2 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/stats"
  7. "runtime"
  8. "github.com/prometheus/procfs"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. )
  13. var numCPU = runtime.NumCPU()
  14. func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
  15. resp := &volume_server_pb.VacuumVolumeCheckResponse{}
  16. garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId))
  17. resp.GarbageRatio = garbageRatio
  18. if err != nil {
  19. glog.V(3).Infof("check volume %d: %v", req.VolumeId, err)
  20. }
  21. return resp, err
  22. }
  23. func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
  24. start := time.Now()
  25. defer func(start time.Time) {
  26. stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
  27. }(start)
  28. resp := &volume_server_pb.VacuumVolumeCompactResponse{}
  29. reportInterval := int64(1024 * 1024 * 128)
  30. nextReportTarget := reportInterval
  31. fs, fsErr := procfs.NewDefaultFS()
  32. var sendErr error
  33. err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
  34. if processed > nextReportTarget {
  35. resp.ProcessedBytes = processed
  36. if fsErr == nil && numCPU > 0 {
  37. if fsLa, err := fs.LoadAvg(); err == nil {
  38. resp.LoadAvg_1M = float32(fsLa.Load1 / float64(numCPU))
  39. }
  40. }
  41. if sendErr = stream.Send(resp); sendErr != nil {
  42. return false
  43. }
  44. nextReportTarget = processed + reportInterval
  45. }
  46. return true
  47. })
  48. stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc()
  49. if err != nil {
  50. glog.Errorf("failed compact volume %d: %v", req.VolumeId, err)
  51. return err
  52. }
  53. if sendErr != nil {
  54. glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr)
  55. return sendErr
  56. }
  57. glog.V(1).Infof("compact volume %d", req.VolumeId)
  58. return nil
  59. }
  60. func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
  61. start := time.Now()
  62. defer func(start time.Time) {
  63. stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
  64. }(start)
  65. resp := &volume_server_pb.VacuumVolumeCommitResponse{}
  66. readOnly, volumeSize, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
  67. if err != nil {
  68. glog.Errorf("failed commit volume %d: %v", req.VolumeId, err)
  69. } else {
  70. glog.V(1).Infof("commit volume %d", req.VolumeId)
  71. }
  72. stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
  73. resp.IsReadOnly = readOnly
  74. resp.VolumeSize = uint64(volumeSize)
  75. return resp, err
  76. }
  77. func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
  78. resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
  79. err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
  80. if err != nil {
  81. glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err)
  82. } else {
  83. glog.V(1).Infof("cleanup volume %d", req.VolumeId)
  84. }
  85. return resp, err
  86. }