From 7354fa87f1e4fb4542323307058878075e44b780 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 27 Feb 2026 17:21:13 -0800 Subject: [PATCH] refactor ec shard distribution (#8465) * refactor ec shard distribution * fix shard assignment merge and mount errors * fix mount error aggregation scope * make WithFields compatible and wrap errors --- .../erasure_coding/shard_distribution.go | 367 ++++++++++++++++++ weed/worker/tasks/erasure_coding/ec_task.go | 265 +------------ 2 files changed, 382 insertions(+), 250 deletions(-) create mode 100644 weed/storage/erasure_coding/shard_distribution.go diff --git a/weed/storage/erasure_coding/shard_distribution.go b/weed/storage/erasure_coding/shard_distribution.go new file mode 100644 index 000000000..85ed14e56 --- /dev/null +++ b/weed/storage/erasure_coding/shard_distribution.go @@ -0,0 +1,367 @@ +package erasure_coding + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "reflect" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "google.golang.org/grpc" +) + +func ensureLogger(logger logger) logger { + if logger == nil { + return &glogFallbackLogger{} + } + return logger +} + +type logger interface { + Info(string, ...interface{}) + Warning(string, ...interface{}) + Error(string, ...interface{}) +} + +type withFieldLogger interface { + WithFields(map[string]interface{}) logger +} + +func withFields(log logger, fields map[string]interface{}) logger { + if fields == nil { + return log + } + if wf, ok := log.(withFieldLogger); ok { + return wf.WithFields(fields) + } + + val := reflect.ValueOf(log) + method := val.MethodByName("WithFields") + if !method.IsValid() { + return log + } + methodType := method.Type() + if methodType.NumIn() != 1 || methodType.NumOut() != 1 { + return log + } + argType := methodType.In(0) + mapType := reflect.TypeOf(map[string]interface{}{}) + var arg reflect.Value + if mapType.AssignableTo(argType) { + arg = reflect.ValueOf(fields) + } else if mapType.ConvertibleTo(argType) { + arg = reflect.ValueOf(fields).Convert(argType) + } else { + return log + } + result := method.Call([]reflect.Value{arg})[0] + if result.IsValid() && result.CanInterface() { + if enhanced, ok := result.Interface().(logger); ok { + return enhanced + } + } + return log +} + +type glogFallbackLogger struct{} + +func (g *glogFallbackLogger) Info(msg string, args ...interface{}) { + if len(args) > 0 { + glog.Infof(msg, args...) + return + } + glog.Info(msg) +} + +func (g *glogFallbackLogger) Warning(msg string, args ...interface{}) { + if len(args) > 0 { + glog.Warningf(msg, args...) + return + } + glog.Warning(msg) +} + +func (g *glogFallbackLogger) Error(msg string, args ...interface{}) { + if len(args) > 0 { + glog.Errorf(msg, args...) + return + } + glog.Error(msg) +} + +// DistributeEcShards distributes locally generated EC shards to destination servers. +// Returns the shard assignment map used for mounting. +func DistributeEcShards(volumeID uint32, collection string, targets []*worker_pb.TaskTarget, shardFiles map[string]string, dialOption grpc.DialOption, logger logger) (map[string][]string, error) { + if len(targets) == 0 { + return nil, fmt.Errorf("no targets specified for EC shard distribution") + } + + if len(shardFiles) == 0 { + return nil, fmt.Errorf("no shard files available for distribution") + } + + log := ensureLogger(logger) + + shardAssignment := make(map[string][]string) + + for _, target := range targets { + if len(target.ShardIds) == 0 { + continue + } + + var assignedShards []string + for _, shardId := range target.ShardIds { + shardType := fmt.Sprintf("ec%02d", shardId) + assignedShards = append(assignedShards, shardType) + } + + if len(assignedShards) > 0 { + if _, hasEcx := shardFiles["ecx"]; hasEcx { + assignedShards = append(assignedShards, "ecx") + } + if _, hasEcj := shardFiles["ecj"]; hasEcj { + assignedShards = append(assignedShards, "ecj") + } + if _, hasVif := shardFiles["vif"]; hasVif { + assignedShards = append(assignedShards, "vif") + } + } + + existing := shardAssignment[target.Node] + if len(existing) == 0 { + shardAssignment[target.Node] = assignedShards + continue + } + + seen := make(map[string]struct{}, len(existing)) + for _, shard := range existing { + seen[shard] = struct{}{} + } + for _, shard := range assignedShards { + if _, ok := seen[shard]; ok { + continue + } + seen[shard] = struct{}{} + existing = append(existing, shard) + } + shardAssignment[target.Node] = existing + } + + if len(shardAssignment) == 0 { + return nil, fmt.Errorf("no shard assignments found from planning phase") + } + + for destNode, assignedShards := range shardAssignment { + withFields(log, map[string]interface{}{ + "destination": destNode, + "assigned_shards": len(assignedShards), + "shard_types": assignedShards, + }).Info("Starting shard distribution to destination server") + + var transferredBytes int64 + for _, shardType := range assignedShards { + filePath, exists := shardFiles[shardType] + if !exists { + return nil, fmt.Errorf("shard file %s not found for destination %s", shardType, destNode) + } + + if info, err := os.Stat(filePath); err == nil { + transferredBytes += info.Size() + withFields(log, map[string]interface{}{ + "destination": destNode, + "shard_type": shardType, + "file_path": filePath, + "size_bytes": info.Size(), + "size_kb": float64(info.Size()) / 1024, + }).Info("Starting shard file transfer") + } + + if err := sendShardFileToDestination(volumeID, collection, dialOption, destNode, filePath, shardType); err != nil { + return nil, fmt.Errorf("failed to send %s to %s: %w", shardType, destNode, err) + } + + withFields(log, map[string]interface{}{ + "destination": destNode, + "shard_type": shardType, + }).Info("Shard file transfer completed") + } + + withFields(log, map[string]interface{}{ + "destination": destNode, + "shards_transferred": len(assignedShards), + "total_bytes": transferredBytes, + "total_mb": float64(transferredBytes) / (1024 * 1024), + }).Info("All shards distributed to destination server") + } + + glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment)) + return shardAssignment, nil +} + +// MountEcShards mounts EC shards on destination servers using an assignment map. +func MountEcShards(volumeID uint32, collection string, shardAssignment map[string][]string, dialOption grpc.DialOption, logger logger) error { + if shardAssignment == nil { + return fmt.Errorf("shard assignment not available for mounting") + } + + log := ensureLogger(logger) + + var mountErrors []error + for destNode, assignedShards := range shardAssignment { + var shardIds []uint32 + var metadataFiles []string + + for _, shardType := range assignedShards { + if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { + var shardId uint32 + if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil { + shardIds = append(shardIds, shardId) + } + } else { + metadataFiles = append(metadataFiles, shardType) + } + } + + withFields(log, map[string]interface{}{ + "destination": destNode, + "shard_ids": shardIds, + "shard_count": len(shardIds), + "metadata_files": metadataFiles, + }).Info("Starting EC shard mount operation") + + if len(shardIds) == 0 { + withFields(log, map[string]interface{}{ + "destination": destNode, + "metadata_files": metadataFiles, + }).Info("No EC shards to mount (only metadata files)") + continue + } + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), dialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: collection, + ShardIds: shardIds, + }) + return mountErr + }) + + if err != nil { + mountErrors = append(mountErrors, fmt.Errorf("mount %s shards %v: %w", destNode, shardIds, err)) + withFields(log, map[string]interface{}{ + "destination": destNode, + "shard_ids": shardIds, + "error": err.Error(), + }).Error("Failed to mount EC shards") + } else { + withFields(log, map[string]interface{}{ + "destination": destNode, + "shard_ids": shardIds, + "volume_id": volumeID, + "collection": collection, + }).Info("Successfully mounted EC shards") + } + } + + if len(mountErrors) > 0 { + return errors.Join(mountErrors...) + } + return nil +} + +// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API. +func sendShardFileToDestination(volumeID uint32, collection string, dialOption grpc.DialOption, destServer, filePath, shardType string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), dialOption, + func(client volume_server_pb.VolumeServerClient) error { + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open shard file %s: %v", filePath, err) + } + defer file.Close() + + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to get file info for %s: %v", filePath, err) + } + + var ext string + var shardId uint32 + if shardType == "ecx" { + ext = ".ecx" + shardId = 0 + } else if shardType == "ecj" { + ext = ".ecj" + shardId = 0 + } else if shardType == "vif" { + ext = ".vif" + shardId = 0 + } else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { + ext = "." + shardType + fmt.Sscanf(shardType[2:], "%d", &shardId) + } else { + return fmt.Errorf("unknown shard type: %s", shardType) + } + + stream, err := client.ReceiveFile(context.Background()) + if err != nil { + return fmt.Errorf("failed to create receive stream: %v", err) + } + + err = stream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_Info{ + Info: &volume_server_pb.ReceiveFileInfo{ + VolumeId: volumeID, + Ext: ext, + Collection: collection, + IsEcVolume: true, + ShardId: shardId, + FileSize: uint64(fileInfo.Size()), + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to send file info: %v", err) + } + + buffer := make([]byte, 64*1024) + for { + n, readErr := file.Read(buffer) + if n > 0 { + err = stream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_FileContent{ + FileContent: buffer[:n], + }, + }) + if err != nil { + return fmt.Errorf("failed to send file content: %v", err) + } + } + if readErr == io.EOF { + break + } + if readErr != nil { + return fmt.Errorf("failed to read file: %v", readErr) + } + } + + resp, err := stream.CloseAndRecv() + if err != nil { + return fmt.Errorf("failed to close stream: %v", err) + } + + if resp.Error != "" { + return fmt.Errorf("server error: %s", resp.Error) + } + + glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer) + return nil + }) +} diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 0000e22ca..e2e130f1b 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -413,6 +413,16 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string }).Info("EC index file generated") } + ecjFile := baseName + ".ecj" + if info, err := os.Stat(ecjFile); err == nil { + shardFiles["ecj"] = ecjFile + t.GetLogger().WithFields(map[string]interface{}{ + "file_type": "ecj", + "file_path": ecjFile, + "size_bytes": info.Size(), + }).Info("EC journal file generated") + } + // Generate .vif file (volume info) vifFile := baseName + ".vif" volumeInfo := &volume_server_pb.VolumeInfo{ @@ -444,262 +454,17 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string // distributeEcShards distributes locally generated EC shards to destination servers // using pre-assigned shard IDs from planning phase func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error { - if len(t.targets) == 0 { - return fmt.Errorf("no targets specified for EC shard distribution") - } - - if len(shardFiles) == 0 { - return fmt.Errorf("no shard files available for distribution") - } - - // Build shard assignment from pre-assigned target shard IDs (from planning phase) - shardAssignment := make(map[string][]string) - - for _, target := range t.targets { - if len(target.ShardIds) == 0 { - continue // Skip targets with no assigned shards - } - - var assignedShards []string - - // Convert shard IDs to shard file names (e.g., 0 → "ec00", 1 → "ec01") - for _, shardId := range target.ShardIds { - shardType := fmt.Sprintf("ec%02d", shardId) - assignedShards = append(assignedShards, shardType) - } - - // Add metadata files (.ecx, .vif) to targets that have shards - if len(assignedShards) > 0 { - if _, hasEcx := shardFiles["ecx"]; hasEcx { - assignedShards = append(assignedShards, "ecx") - } - if _, hasVif := shardFiles["vif"]; hasVif { - assignedShards = append(assignedShards, "vif") - } - } - - shardAssignment[target.Node] = assignedShards - } - - if len(shardAssignment) == 0 { - return fmt.Errorf("no shard assignments found from planning phase") - } - - // Store assignment for use during mounting - t.shardAssignment = shardAssignment - - // Send assigned shards to each destination - for destNode, assignedShards := range shardAssignment { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "assigned_shards": len(assignedShards), - "shard_types": assignedShards, - }).Info("Starting shard distribution to destination server") - - // Send only the assigned shards to this destination - var transferredBytes int64 - for _, shardType := range assignedShards { - filePath, exists := shardFiles[shardType] - if !exists { - return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode) - } - - // Log file size before transfer - if info, err := os.Stat(filePath); err == nil { - transferredBytes += info.Size() - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_type": shardType, - "file_path": filePath, - "size_bytes": info.Size(), - "size_kb": float64(info.Size()) / 1024, - }).Info("Starting shard file transfer") - } - - if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil { - return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err) - } - - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_type": shardType, - }).Info("Shard file transfer completed") - } - - // Log summary for this destination - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shards_transferred": len(assignedShards), - "total_bytes": transferredBytes, - "total_mb": float64(transferredBytes) / (1024 * 1024), - }).Info("All shards distributed to destination server") + assignment, err := erasure_coding.DistributeEcShards(t.volumeID, t.collection, t.targets, shardFiles, t.grpcDialOption, t.GetLogger()) + if err != nil { + return err } - - glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment)) + t.shardAssignment = assignment return nil } -// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API -func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), t.grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - // Open the local shard file - file, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open shard file %s: %v", filePath, err) - } - defer file.Close() - - // Get file size - fileInfo, err := file.Stat() - if err != nil { - return fmt.Errorf("failed to get file info for %s: %v", filePath, err) - } - - // Determine file extension and shard ID - var ext string - var shardId uint32 - if shardType == "ecx" { - ext = ".ecx" - shardId = 0 // ecx file doesn't have a specific shard ID - } else if shardType == "vif" { - ext = ".vif" - shardId = 0 // vif file doesn't have a specific shard ID - } else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { - // EC shard file like "ec00", "ec01", etc. - ext = "." + shardType - fmt.Sscanf(shardType[2:], "%d", &shardId) - } else { - return fmt.Errorf("unknown shard type: %s", shardType) - } - - // Create streaming client - stream, err := client.ReceiveFile(context.Background()) - if err != nil { - return fmt.Errorf("failed to create receive stream: %v", err) - } - - // Send file info first - err = stream.Send(&volume_server_pb.ReceiveFileRequest{ - Data: &volume_server_pb.ReceiveFileRequest_Info{ - Info: &volume_server_pb.ReceiveFileInfo{ - VolumeId: t.volumeID, - Ext: ext, - Collection: t.collection, - IsEcVolume: true, - ShardId: shardId, - FileSize: uint64(fileInfo.Size()), - }, - }, - }) - if err != nil { - return fmt.Errorf("failed to send file info: %v", err) - } - - // Send file content in chunks - buffer := make([]byte, 64*1024) // 64KB chunks - for { - n, readErr := file.Read(buffer) - if n > 0 { - err = stream.Send(&volume_server_pb.ReceiveFileRequest{ - Data: &volume_server_pb.ReceiveFileRequest_FileContent{ - FileContent: buffer[:n], - }, - }) - if err != nil { - return fmt.Errorf("failed to send file content: %v", err) - } - } - if readErr == io.EOF { - break - } - if readErr != nil { - return fmt.Errorf("failed to read file: %v", readErr) - } - } - - // Close stream and get response - resp, err := stream.CloseAndRecv() - if err != nil { - return fmt.Errorf("failed to close stream: %v", err) - } - - if resp.Error != "" { - return fmt.Errorf("server error: %s", resp.Error) - } - - glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer) - return nil - }) -} - // mountEcShards mounts EC shards on destination servers func (t *ErasureCodingTask) mountEcShards() error { - if t.shardAssignment == nil { - return fmt.Errorf("shard assignment not available for mounting") - } - - // Mount only assigned shards on each destination - for destNode, assignedShards := range t.shardAssignment { - // Convert shard names to shard IDs for mounting - var shardIds []uint32 - var metadataFiles []string - - for _, shardType := range assignedShards { - // Skip metadata files (.ecx, .vif) - only mount EC shards - if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { - // Parse shard ID from "ec00", "ec01", etc. - var shardId uint32 - if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil { - shardIds = append(shardIds, shardId) - } - } else { - metadataFiles = append(metadataFiles, shardType) - } - } - - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_ids": shardIds, - "shard_count": len(shardIds), - "metadata_files": metadataFiles, - }).Info("Starting EC shard mount operation") - - if len(shardIds) == 0 { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "metadata_files": metadataFiles, - }).Info("No EC shards to mount (only metadata files)") - continue - } - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), t.grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: shardIds, - }) - return mountErr - }) - - if err != nil { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_ids": shardIds, - "error": err.Error(), - }).Error("Failed to mount EC shards") - } else { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_ids": shardIds, - "volume_id": t.volumeID, - "collection": t.collection, - }).Info("Successfully mounted EC shards") - } - } - - return nil + return erasure_coding.MountEcShards(t.volumeID, t.collection, t.shardAssignment, t.grpcDialOption, t.GetLogger()) } // deleteOriginalVolume deletes the original volume and all its replicas from all servers