You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

236 lines
7.2 KiB

package vacuum
import (
"context"
"fmt"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
"google.golang.org/grpc"
)
// VacuumTask implements the Task interface
type VacuumTask struct {
*base.BaseTask
server string
volumeID uint32
collection string
garbageThreshold float64
progress float64
}
// NewVacuumTask creates a new unified vacuum task instance
func NewVacuumTask(id string, server string, volumeID uint32, collection string) *VacuumTask {
return &VacuumTask{
BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum),
server: server,
volumeID: volumeID,
collection: collection,
garbageThreshold: 0.3, // Default 30% threshold
}
}
// Execute implements the UnifiedTask interface
func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task parameters are required")
}
vacuumParams := params.GetVacuumParams()
if vacuumParams == nil {
return fmt.Errorf("vacuum parameters are required")
}
t.garbageThreshold = vacuumParams.GarbageThreshold
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"server": t.server,
"collection": t.collection,
"garbage_threshold": t.garbageThreshold,
}).Info("Starting vacuum task")
// Step 1: Check volume status and garbage ratio
t.ReportProgress(10.0)
t.GetLogger().Info("Checking volume status")
eligible, currentGarbageRatio, err := t.checkVacuumEligibility()
if err != nil {
return fmt.Errorf("failed to check vacuum eligibility: %v", err)
}
if !eligible {
t.GetLogger().WithFields(map[string]interface{}{
"current_garbage_ratio": currentGarbageRatio,
"required_threshold": t.garbageThreshold,
}).Info("Volume does not meet vacuum criteria, skipping")
t.ReportProgress(100.0)
return nil
}
// Step 2: Perform vacuum operation
t.ReportProgress(50.0)
t.GetLogger().WithFields(map[string]interface{}{
"garbage_ratio": currentGarbageRatio,
"threshold": t.garbageThreshold,
}).Info("Performing vacuum operation")
if err := t.performVacuum(); err != nil {
return fmt.Errorf("failed to perform vacuum: %v", err)
}
// Step 3: Verify vacuum results
t.ReportProgress(90.0)
t.GetLogger().Info("Verifying vacuum results")
if err := t.verifyVacuumResults(); err != nil {
glog.Warningf("Vacuum verification failed: %v", err)
// Don't fail the task - vacuum operation itself succeeded
}
t.ReportProgress(100.0)
glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)",
t.volumeID, t.server, currentGarbageRatio*100)
return nil
}
// Validate implements the UnifiedTask interface
func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task parameters are required")
}
vacuumParams := params.GetVacuumParams()
if vacuumParams == nil {
return fmt.Errorf("vacuum parameters are required")
}
if params.VolumeId != t.volumeID {
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
}
if params.Server != t.server {
return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server)
}
if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 {
return fmt.Errorf("invalid garbage threshold: %f (must be between 0.0 and 1.0)", vacuumParams.GarbageThreshold)
}
return nil
}
// EstimateTime implements the UnifiedTask interface
func (t *VacuumTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
// Basic estimate based on simulated steps
return 14 * time.Second // Sum of all step durations
}
// GetProgress returns current progress
func (t *VacuumTask) GetProgress() float64 {
return t.progress
}
// Helper methods for real vacuum operations
// checkVacuumEligibility checks if the volume meets vacuum criteria
func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
var garbageRatio float64
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("failed to check volume vacuum status: %v", err)
}
garbageRatio = resp.GarbageRatio
return nil
})
if err != nil {
return false, 0, err
}
eligible := garbageRatio >= t.garbageThreshold
glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v",
t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible)
return eligible, garbageRatio, nil
}
// performVacuum executes the actual vacuum operation
func (t *VacuumTask) performVacuum() error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
// Step 1: Compact the volume
t.GetLogger().Info("Compacting volume")
stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum compact failed: %v", err)
}
// Read compact progress
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
}
return fmt.Errorf("vacuum compact stream error: %v", recvErr)
}
glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
}
// Step 2: Commit the vacuum
t.GetLogger().Info("Committing vacuum operation")
_, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum commit failed: %v", err)
}
// Step 3: Cleanup old files
t.GetLogger().Info("Cleaning up vacuum files")
_, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum cleanup failed: %v", err)
}
glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID)
return nil
})
}
// verifyVacuumResults checks the volume status after vacuum
func (t *VacuumTask) verifyVacuumResults() error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("failed to verify vacuum results: %v", err)
}
postVacuumGarbageRatio := resp.GarbageRatio
glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%",
t.volumeID, postVacuumGarbageRatio*100)
return nil
})
}