You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
236 lines
7.2 KiB
236 lines
7.2 KiB
package vacuum
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// VacuumTask implements the Task interface
|
|
type VacuumTask struct {
|
|
*base.BaseTask
|
|
server string
|
|
volumeID uint32
|
|
collection string
|
|
garbageThreshold float64
|
|
progress float64
|
|
}
|
|
|
|
// NewVacuumTask creates a new unified vacuum task instance
|
|
func NewVacuumTask(id string, server string, volumeID uint32, collection string) *VacuumTask {
|
|
return &VacuumTask{
|
|
BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum),
|
|
server: server,
|
|
volumeID: volumeID,
|
|
collection: collection,
|
|
garbageThreshold: 0.3, // Default 30% threshold
|
|
}
|
|
}
|
|
|
|
// Execute implements the UnifiedTask interface
|
|
func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
|
|
if params == nil {
|
|
return fmt.Errorf("task parameters are required")
|
|
}
|
|
|
|
vacuumParams := params.GetVacuumParams()
|
|
if vacuumParams == nil {
|
|
return fmt.Errorf("vacuum parameters are required")
|
|
}
|
|
|
|
t.garbageThreshold = vacuumParams.GarbageThreshold
|
|
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"server": t.server,
|
|
"collection": t.collection,
|
|
"garbage_threshold": t.garbageThreshold,
|
|
}).Info("Starting vacuum task")
|
|
|
|
// Step 1: Check volume status and garbage ratio
|
|
t.ReportProgress(10.0)
|
|
t.GetLogger().Info("Checking volume status")
|
|
eligible, currentGarbageRatio, err := t.checkVacuumEligibility()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check vacuum eligibility: %v", err)
|
|
}
|
|
|
|
if !eligible {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"current_garbage_ratio": currentGarbageRatio,
|
|
"required_threshold": t.garbageThreshold,
|
|
}).Info("Volume does not meet vacuum criteria, skipping")
|
|
t.ReportProgress(100.0)
|
|
return nil
|
|
}
|
|
|
|
// Step 2: Perform vacuum operation
|
|
t.ReportProgress(50.0)
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"garbage_ratio": currentGarbageRatio,
|
|
"threshold": t.garbageThreshold,
|
|
}).Info("Performing vacuum operation")
|
|
|
|
if err := t.performVacuum(); err != nil {
|
|
return fmt.Errorf("failed to perform vacuum: %v", err)
|
|
}
|
|
|
|
// Step 3: Verify vacuum results
|
|
t.ReportProgress(90.0)
|
|
t.GetLogger().Info("Verifying vacuum results")
|
|
if err := t.verifyVacuumResults(); err != nil {
|
|
glog.Warningf("Vacuum verification failed: %v", err)
|
|
// Don't fail the task - vacuum operation itself succeeded
|
|
}
|
|
|
|
t.ReportProgress(100.0)
|
|
glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)",
|
|
t.volumeID, t.server, currentGarbageRatio*100)
|
|
return nil
|
|
}
|
|
|
|
// Validate implements the UnifiedTask interface
|
|
func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error {
|
|
if params == nil {
|
|
return fmt.Errorf("task parameters are required")
|
|
}
|
|
|
|
vacuumParams := params.GetVacuumParams()
|
|
if vacuumParams == nil {
|
|
return fmt.Errorf("vacuum parameters are required")
|
|
}
|
|
|
|
if params.VolumeId != t.volumeID {
|
|
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
|
|
}
|
|
|
|
if params.Server != t.server {
|
|
return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server)
|
|
}
|
|
|
|
if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 {
|
|
return fmt.Errorf("invalid garbage threshold: %f (must be between 0.0 and 1.0)", vacuumParams.GarbageThreshold)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// EstimateTime implements the UnifiedTask interface
|
|
func (t *VacuumTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
|
|
// Basic estimate based on simulated steps
|
|
return 14 * time.Second // Sum of all step durations
|
|
}
|
|
|
|
// GetProgress returns current progress
|
|
func (t *VacuumTask) GetProgress() float64 {
|
|
return t.progress
|
|
}
|
|
|
|
// Helper methods for real vacuum operations
|
|
|
|
// checkVacuumEligibility checks if the volume meets vacuum criteria
|
|
func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
|
|
var garbageRatio float64
|
|
|
|
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check volume vacuum status: %v", err)
|
|
}
|
|
|
|
garbageRatio = resp.GarbageRatio
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return false, 0, err
|
|
}
|
|
|
|
eligible := garbageRatio >= t.garbageThreshold
|
|
glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v",
|
|
t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible)
|
|
|
|
return eligible, garbageRatio, nil
|
|
}
|
|
|
|
// performVacuum executes the actual vacuum operation
|
|
func (t *VacuumTask) performVacuum() error {
|
|
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
// Step 1: Compact the volume
|
|
t.GetLogger().Info("Compacting volume")
|
|
stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("vacuum compact failed: %v", err)
|
|
}
|
|
|
|
// Read compact progress
|
|
for {
|
|
resp, recvErr := stream.Recv()
|
|
if recvErr != nil {
|
|
if recvErr == io.EOF {
|
|
break
|
|
}
|
|
return fmt.Errorf("vacuum compact stream error: %v", recvErr)
|
|
}
|
|
glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
|
|
}
|
|
|
|
// Step 2: Commit the vacuum
|
|
t.GetLogger().Info("Committing vacuum operation")
|
|
_, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("vacuum commit failed: %v", err)
|
|
}
|
|
|
|
// Step 3: Cleanup old files
|
|
t.GetLogger().Info("Cleaning up vacuum files")
|
|
_, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("vacuum cleanup failed: %v", err)
|
|
}
|
|
|
|
glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// verifyVacuumResults checks the volume status after vacuum
|
|
func (t *VacuumTask) verifyVacuumResults() error {
|
|
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to verify vacuum results: %v", err)
|
|
}
|
|
|
|
postVacuumGarbageRatio := resp.GarbageRatio
|
|
|
|
glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%",
|
|
t.volumeID, postVacuumGarbageRatio*100)
|
|
|
|
return nil
|
|
})
|
|
}
|