From 50cc17e8fa5f99791ea042354ced8ad2e857ffd2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 13:11:47 -0700 Subject: [PATCH] ActivateEcGeneration RPC implemented --- weed/server/master_grpc_server_volume.go | 72 ++++++++++++++++++++++++ weed/topology/topology_ec.go | 26 +++++++++ 2 files changed, 98 insertions(+) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 48d03e9f1..41589b507 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -10,6 +10,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/topology" @@ -291,6 +292,77 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku return resp, nil } +func (ms *MasterServer) ActivateEcGeneration(ctx context.Context, req *master_pb.ActivateEcGenerationRequest) (*master_pb.ActivateEcGenerationResponse, error) { + if !ms.Topo.IsLeader() { + return &master_pb.ActivateEcGenerationResponse{ + Success: false, + Error: "not leader", + }, nil + } + + // Basic request validation + if req.VolumeId == 0 { + return &master_pb.ActivateEcGenerationResponse{ + Success: false, + Error: "invalid volume ID: cannot be 0", + }, nil + } + + volumeId := needle.VolumeId(req.VolumeId) + targetGeneration := req.Generation + + glog.V(1).Infof("ActivateEcGeneration: Activating generation %d for EC volume %d in collection %s", + targetGeneration, req.VolumeId, req.Collection) + + // Validate that the target generation exists and has sufficient shards + ready, availableShards, err := ms.Topo.ValidateEcGenerationReadiness(volumeId, targetGeneration) + if err != nil { + errMsg := err.Error() + glog.Warningf("ActivateEcGeneration: %s", errMsg) + return &master_pb.ActivateEcGenerationResponse{ + Success: false, + Error: errMsg, + }, nil + } + + if !ready { + errMsg := fmt.Sprintf("generation %d for EC volume %d not ready: has %d shards, needs %d", + targetGeneration, req.VolumeId, availableShards, erasure_coding.DataShardsCount) + glog.Warningf("ActivateEcGeneration: %s", errMsg) + return &master_pb.ActivateEcGenerationResponse{ + Success: false, + Error: errMsg, + }, nil + } + + glog.V(2).Infof("ActivateEcGeneration: Generation %d for volume %d is ready with %d available shards", + targetGeneration, req.VolumeId, availableShards) + + // Check current active generation for logging + var currentActiveGeneration uint32 + if current, exists := ms.Topo.GetEcActiveGeneration(volumeId); exists { + currentActiveGeneration = current + if current == targetGeneration { + glog.V(2).Infof("ActivateEcGeneration: Generation %d is already active for volume %d", targetGeneration, req.VolumeId) + return &master_pb.ActivateEcGenerationResponse{ + Success: true, + Error: "", + }, nil + } + } + + // Perform the atomic activation + ms.Topo.SetEcActiveGeneration(volumeId, targetGeneration) + + glog.V(0).Infof("ActivateEcGeneration: Successfully activated generation %d for EC volume %d (was: %d)", + targetGeneration, req.VolumeId, currentActiveGeneration) + + return &master_pb.ActivateEcGenerationResponse{ + Success: true, + Error: "", + }, nil +} + func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) { if !ms.Topo.IsLeader() { diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 0cab86395..fa31ebfe4 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -338,3 +338,29 @@ func (t *Topology) LookupEcShardsWithFallback(vid needle.VolumeId, requestedGene return nil, 0, false } + +// ValidateEcGenerationReadiness checks if an EC generation has sufficient shards for activation +// Returns true if the generation has at least erasure_coding.DataShardsCount shards available +func (t *Topology) ValidateEcGenerationReadiness(vid needle.VolumeId, generation uint32) (ready bool, availableShards int, err error) { + t.ecShardMapLock.RLock() + defer t.ecShardMapLock.RUnlock() + + key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation} + ecLocations, found := t.ecShardMap[key] + if !found { + return false, 0, fmt.Errorf("generation %d not found for EC volume %d", generation, vid) + } + + // Count available shards + availableShards = 0 + for _, locations := range ecLocations.Locations { + if len(locations) > 0 { + availableShards++ + } + } + + // Need at least DataShardsCount shards to reconstruct data + ready = availableShards >= erasure_coding.DataShardsCount + + return ready, availableShards, nil +}