You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
193 lines
6.9 KiB
193 lines
6.9 KiB
package ec_vacuum
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
// performSafetyChecks performs comprehensive safety verification before cleanup
|
|
func (t *EcVacuumTask) performSafetyChecks() error {
|
|
// Master address should have been fetched early in execution
|
|
if t.masterAddress == "" {
|
|
return fmt.Errorf("CRITICAL: cannot perform safety checks - master address not available (should have been fetched during task initialization)")
|
|
}
|
|
|
|
// Safety Check 1: Verify master connectivity and volume existence
|
|
if err := t.verifyMasterConnectivity(); err != nil {
|
|
return fmt.Errorf("master connectivity check failed: %w", err)
|
|
}
|
|
|
|
// Safety Check 2: Verify new generation is active on master
|
|
if err := t.verifyNewGenerationActive(); err != nil {
|
|
return fmt.Errorf("active generation verification failed: %w", err)
|
|
}
|
|
|
|
// Safety Check 3: Verify old generation is not the active generation
|
|
if err := t.verifyOldGenerationInactive(); err != nil {
|
|
return fmt.Errorf("old generation activity check failed: %w", err)
|
|
}
|
|
|
|
// Safety Check 4: Verify new generation has sufficient shards
|
|
if err := t.verifyNewGenerationReadiness(); err != nil {
|
|
return fmt.Errorf("new generation readiness check failed: %w", err)
|
|
}
|
|
|
|
// Safety Check 5: Verify no active read operations on old generation
|
|
if err := t.verifyNoActiveOperations(); err != nil {
|
|
return fmt.Errorf("active operations check failed: %w", err)
|
|
}
|
|
|
|
t.LogInfo("🛡️ ALL SAFETY CHECKS PASSED - Cleanup approved", map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"source_generation": t.sourceGeneration,
|
|
"target_generation": t.targetGeneration,
|
|
"safety_checks": 5,
|
|
"status": "SAFE_TO_CLEANUP",
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// verifyMasterConnectivity ensures we can communicate with the master
|
|
func (t *EcVacuumTask) verifyMasterConnectivity() error {
|
|
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := client.Statistics(ctx, &master_pb.StatisticsRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("master ping failed: %w", err)
|
|
}
|
|
|
|
t.LogInfo("✅ Safety Check 1: Master connectivity verified", nil)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// verifyNewGenerationActive checks with master that the new generation is active
|
|
func (t *EcVacuumTask) verifyNewGenerationActive() error {
|
|
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup EC volume from master: %w", err)
|
|
}
|
|
|
|
if resp.ActiveGeneration != t.targetGeneration {
|
|
return fmt.Errorf("CRITICAL: master active generation is %d, expected %d - ABORTING CLEANUP",
|
|
resp.ActiveGeneration, t.targetGeneration)
|
|
}
|
|
|
|
t.LogInfo("✅ Safety Check 2: New generation is active on master", map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"active_generation": resp.ActiveGeneration,
|
|
})
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// verifyOldGenerationInactive ensures the old generation is not active
|
|
func (t *EcVacuumTask) verifyOldGenerationInactive() error {
|
|
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup EC volume from master: %w", err)
|
|
}
|
|
|
|
if resp.ActiveGeneration == t.sourceGeneration {
|
|
return fmt.Errorf("CRITICAL: old generation %d is still active - ABORTING CLEANUP to prevent data loss",
|
|
t.sourceGeneration)
|
|
}
|
|
|
|
t.LogInfo("✅ Safety Check 3: Old generation is inactive", map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"source_generation": t.sourceGeneration,
|
|
"active_generation": resp.ActiveGeneration,
|
|
})
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// verifyNewGenerationReadiness checks that the new generation has enough shards
|
|
func (t *EcVacuumTask) verifyNewGenerationReadiness() error {
|
|
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
|
|
VolumeId: t.volumeID,
|
|
Generation: t.targetGeneration, // Explicitly request new generation
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup new generation %d from master: %w", t.targetGeneration, err)
|
|
}
|
|
|
|
shardCount := len(resp.ShardIdLocations)
|
|
if shardCount < 10 { // Need at least 10 data shards for safety
|
|
return fmt.Errorf("CRITICAL: new generation %d has only %d shards (need ≥10) - ABORTING CLEANUP",
|
|
t.targetGeneration, shardCount)
|
|
}
|
|
|
|
t.LogInfo("✅ Safety Check 4: New generation has sufficient shards", map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"target_generation": t.targetGeneration,
|
|
"shard_count": shardCount,
|
|
"minimum_required": 10,
|
|
})
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// verifyNoActiveOperations checks that no active operations are using the old generation
|
|
func (t *EcVacuumTask) verifyNoActiveOperations() error {
|
|
// For now, this is a simple time-based check (grace period serves this purpose)
|
|
// In the future, this could be enhanced to check actual operation metrics or locks
|
|
|
|
t.LogInfo("✅ Safety Check 5: Grace period completed - no active operations expected", map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"source_generation": t.sourceGeneration,
|
|
"grace_period": t.cleanupGracePeriod,
|
|
"assumption": "grace period ensures operation quiescence",
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// finalSafetyCheck performs one last verification before each unmount operation
|
|
func (t *EcVacuumTask) finalSafetyCheck() error {
|
|
if t.masterAddress == "" {
|
|
// If we don't have master access, we can't do this check
|
|
// but other safety checks should have already passed
|
|
return nil
|
|
}
|
|
|
|
return operation.WithMasterServerClient(false, t.masterAddress, t.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := client.LookupEcVolume(ctx, &master_pb.LookupEcVolumeRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("final safety lookup failed: %w", err)
|
|
}
|
|
|
|
if resp.ActiveGeneration == t.sourceGeneration {
|
|
return fmt.Errorf("ABORT: active generation is %d (same as source %d) - PREVENTING DELETION",
|
|
resp.ActiveGeneration, t.sourceGeneration)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|