You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

193 lines
6.9 KiB

package ec_vacuum
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// performSafetyChecks performs comprehensive safety verification before cleanup
func (t *EcVacuumTask) performSafetyChecks() error {
// Master address should have been fetched early in execution
if t.masterAddress == "" {
return fmt.Errorf("CRITICAL: cannot perform safety checks - master address not available (should have been fetched during task initialization)")
}
// Safety Check 1: Verify master connectivity and volume existence
if err := t.verifyMasterConnectivity(); err != nil {
return fmt.Errorf("master connectivity check failed: %w", err)
}
// Safety Check 2: Verify new generation is active on master
if err := t.verifyNewGenerationActive(); err != nil {
return fmt.Errorf("active generation verification failed: %w", err)
}
// Safety Check 3: Verify old generation is not the active generation
if err := t.verifyOldGenerationInactive(); err != nil {
return fmt.Errorf("old generation activity check failed: %w", err)
}
// Safety Check 4: Verify new generation has sufficient shards
if err := t.verifyNewGenerationReadiness(); err != nil {
return fmt.Errorf("new generation readiness check failed: %w", err)
}
// Safety Check 5: Verify no active read operations on old generation
if err := t.verifyNoActiveOperations(); err != nil {
return fmt.Errorf("active operations check failed: %w", err)
}
t.LogInfo("🛡️ ALL SAFETY CHECKS PASSED - Cleanup approved", map[string]interface{}{
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
"target_generation": t.targetGeneration,
"safety_checks": 5,
"status": "SAFE_TO_CLEANUP",
})
return nil
}
// verifyMasterConnectivity ensures we can communicate with the master
func (t *EcVacuumTask) verifyMasterConnectivity() error {
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := client.Statistics(ctx, &master_pb.StatisticsRequest{})
if err != nil {
return fmt.Errorf("master ping failed: %w", err)
}
t.LogInfo("✅ Safety Check 1: Master connectivity verified", nil)
return nil
})
}
// verifyNewGenerationActive checks with master that the new generation is active
func (t *EcVacuumTask) verifyNewGenerationActive() error {
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("failed to lookup EC volume from master: %w", err)
}
if resp.ActiveGeneration != t.targetGeneration {
return fmt.Errorf("CRITICAL: master active generation is %d, expected %d - ABORTING CLEANUP",
resp.ActiveGeneration, t.targetGeneration)
}
t.LogInfo("✅ Safety Check 2: New generation is active on master", map[string]interface{}{
"volume_id": t.volumeID,
"active_generation": resp.ActiveGeneration,
})
return nil
})
}
// verifyOldGenerationInactive ensures the old generation is not active
func (t *EcVacuumTask) verifyOldGenerationInactive() error {
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("failed to lookup EC volume from master: %w", err)
}
if resp.ActiveGeneration == t.sourceGeneration {
return fmt.Errorf("CRITICAL: old generation %d is still active - ABORTING CLEANUP to prevent data loss",
t.sourceGeneration)
}
t.LogInfo("✅ Safety Check 3: Old generation is inactive", map[string]interface{}{
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
"active_generation": resp.ActiveGeneration,
})
return nil
})
}
// verifyNewGenerationReadiness checks that the new generation has enough shards
func (t *EcVacuumTask) verifyNewGenerationReadiness() error {
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
VolumeId: t.volumeID,
Generation: t.targetGeneration, // Explicitly request new generation
})
if err != nil {
return fmt.Errorf("failed to lookup new generation %d from master: %w", t.targetGeneration, err)
}
shardCount := len(resp.ShardIdLocations)
if shardCount < 10 { // Need at least 10 data shards for safety
return fmt.Errorf("CRITICAL: new generation %d has only %d shards (need ≥10) - ABORTING CLEANUP",
t.targetGeneration, shardCount)
}
t.LogInfo("✅ Safety Check 4: New generation has sufficient shards", map[string]interface{}{
"volume_id": t.volumeID,
"target_generation": t.targetGeneration,
"shard_count": shardCount,
"minimum_required": 10,
})
return nil
})
}
// verifyNoActiveOperations checks that no active operations are using the old generation
func (t *EcVacuumTask) verifyNoActiveOperations() error {
// For now, this is a simple time-based check (grace period serves this purpose)
// In the future, this could be enhanced to check actual operation metrics or locks
t.LogInfo("✅ Safety Check 5: Grace period completed - no active operations expected", map[string]interface{}{
"volume_id": t.volumeID,
"source_generation": t.sourceGeneration,
"grace_period": t.cleanupGracePeriod,
"assumption": "grace period ensures operation quiescence",
})
return nil
}
// finalSafetyCheck performs one last verification before each unmount operation
func (t *EcVacuumTask) finalSafetyCheck() error {
if t.masterAddress == "" {
// If we don't have master access, we can't do this check
// but other safety checks should have already passed
return nil
}
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("final safety lookup failed: %w", err)
}
if resp.ActiveGeneration == t.sourceGeneration {
return fmt.Errorf("ABORT: active generation is %d (same as source %d) - PREVENTING DELETION",
resp.ActiveGeneration, t.sourceGeneration)
}
return nil
})
}