diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 803aed937..5eb2ec2f4 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -144,38 +144,69 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s var rebuiltShardIds []uint32 + // Find the rebuild location: the location with the most shards and an .ecx file. + // With multi-disk servers, shards may be spread across different locations. + var rebuildLocation *storage.DiskLocation + var rebuildShardCount int + var otherLocationsWithShards []*storage.DiskLocation + for _, location := range vs.store.Locations { _, _, existingShardCount, err := checkEcVolumeStatus(baseFileName, location) if err != nil { return nil, err } - if existingShardCount == 0 { - continue - } - indexBaseFileName := path.Join(location.IdxDirectory, baseFileName) if !util.FileExists(indexBaseFileName+".ecx") && location.IdxDirectory != location.Directory { - // .ecx may be in the data directory if created before -dir.idx was configured indexBaseFileName = path.Join(location.Directory, baseFileName) } - if util.FileExists(indexBaseFileName + ".ecx") { - // write .ec00 ~ .ec13 files - dataBaseFileName := path.Join(location.Directory, baseFileName) - if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err) - } else { - rebuiltShardIds = generatedShardIds - } + hasEcx := util.FileExists(indexBaseFileName + ".ecx") - if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcxFile %s: %v", indexBaseFileName, err) - } + // Skip locations that have neither shard files nor an .ecx file. + if existingShardCount == 0 && !hasEcx { + continue + } - break + if hasEcx && (rebuildLocation == nil || existingShardCount > rebuildShardCount) { + if rebuildLocation != nil { + otherLocationsWithShards = append(otherLocationsWithShards, rebuildLocation) + } + rebuildLocation = location + rebuildShardCount = existingShardCount + } else { + otherLocationsWithShards = append(otherLocationsWithShards, location) } } + if rebuildLocation == nil { + return &volume_server_pb.VolumeEcShardsRebuildResponse{}, nil + } + + // Collect additional directories where shard files may exist. + // On multi-disk servers, existing local shards may be on a different disk + // than where copied shards were placed during ec.rebuild. + rebuildDataDir := rebuildLocation.Directory + var additionalDirs []string + for _, otherLocation := range otherLocationsWithShards { + additionalDirs = append(additionalDirs, otherLocation.Directory) + } + + // Rebuild missing EC files, searching all disk locations for input shards + dataBaseFileName := path.Join(rebuildDataDir, baseFileName) + if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName, additionalDirs...); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err) + } else { + rebuiltShardIds = generatedShardIds + } + + indexBaseFileName := path.Join(rebuildLocation.IdxDirectory, baseFileName) + if !util.FileExists(indexBaseFileName+".ecx") && rebuildLocation.IdxDirectory != rebuildLocation.Directory { + indexBaseFileName = path.Join(rebuildLocation.Directory, baseFileName) + } + if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", indexBaseFileName, err) + } + return &volume_server_pb.VolumeEcShardsRebuildResponse{ RebuiltShardIds: rebuiltShardIds, }, nil @@ -191,8 +222,20 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv var location *storage.DiskLocation - // Use disk_id if provided (disk-aware storage) - if req.DiskId > 0 || (req.DiskId == 0 && len(vs.store.Locations) > 0) { + // Select the target location for storing EC shard files. + // + // When req.DiskId > 0 the caller is explicitly choosing a disk: + // location = vs.store.Locations[req.DiskId] + // (DiskId=1 → Locations[1], DiskId=2 → Locations[2], etc.) + // + // When req.DiskId == 0 (the protobuf default, meaning "not specified") + // we auto-select location by preferring the disk that already holds EC + // shards for this volume, then falling back to any HDD, then any disk. + // + // Note: Locations[0] cannot be targeted explicitly via DiskId because 0 + // is indistinguishable from "unset". It can still be chosen by the + // auto-select logic. + if req.DiskId > 0 { // Validate disk ID is within bounds if int(req.DiskId) >= len(vs.store.Locations) { return nil, fmt.Errorf("invalid disk_id %d: only have %d disks", req.DiskId, len(vs.store.Locations)) @@ -202,13 +245,21 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv location = vs.store.Locations[req.DiskId] glog.V(1).Infof("Using disk %d for EC shard copy: %s", req.DiskId, location.Directory) } else { - // Fallback to old behavior for backward compatibility - if req.CopyEcxFile { - location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { - return location.DiskType == types.HardDriveType + // Prefer a location that already has shards for this volume, + // so all shards end up on the same disk for rebuild. + location = vs.store.FindFreeLocation(func(loc *storage.DiskLocation) bool { + _, found := loc.FindEcVolume(needle.VolumeId(req.VolumeId)) + return found + }) + if location == nil { + // Fall back to any HDD location with free space + location = vs.store.FindFreeLocation(func(loc *storage.DiskLocation) bool { + return loc.DiskType == types.HardDriveType }) - } else { - location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { + } + if location == nil { + // Fall back to any location with free space + location = vs.store.FindFreeLocation(func(loc *storage.DiskLocation) bool { return true }) } diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go index 3baaee325..4dccf29e5 100644 --- a/weed/shell/command_ec_rebuild_test.go +++ b/weed/shell/command_ec_rebuild_test.go @@ -80,7 +80,7 @@ func TestEcShardMapShardCount(t *testing.T) { } } -// TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes +// TestRebuildEcVolumesInsufficientShards tests that unrepairable volumes are skipped func TestRebuildEcVolumesInsufficientShards(t *testing.T) { var logBuffer bytes.Buffer @@ -101,11 +101,13 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) { erb.rebuildEcVolumes("c1") err := erb.ewg.Wait() - if err == nil { - t.Fatal("Expected error for insufficient shards, got nil") + // Unrepairable volumes should be skipped (not cause an error) + if err != nil { + t.Fatalf("Expected no error for unrepairable volume (should be skipped), got: %v", err) } - if !strings.Contains(err.Error(), "unrepairable") { - t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error()) + // Verify the skip message was logged + if !strings.Contains(logBuffer.String(), "unrepairable") { + t.Errorf("Expected 'unrepairable' in log output, got: %s", logBuffer.String()) } } diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 81ebffdcb..cd4d95fd8 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "path/filepath" "github.com/klauspost/reedsolomon" @@ -67,7 +68,10 @@ func WriteEcFilesWithContext(baseFileName string, ctx *ECContext) error { return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx) } -func RebuildEcFiles(baseFileName string) ([]uint32, error) { +// RebuildEcFiles rebuilds missing EC shard files. +// additionalDirs are extra directories to search for existing shard files, +// which handles multi-disk servers where shards may be spread across disks. +func RebuildEcFiles(baseFileName string, additionalDirs ...string) ([]uint32, error) { // Attempt to load EC config from .vif file to preserve original configuration var ctx *ECContext if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(baseFileName + ".vif"); found && volumeInfo.EcShardConfig != nil { @@ -90,12 +94,13 @@ func RebuildEcFiles(baseFileName string) ([]uint32, error) { ctx = NewDefaultECContext("", 0) } - return RebuildEcFilesWithContext(baseFileName, ctx) + return RebuildEcFilesWithContext(baseFileName, ctx, additionalDirs...) } -// RebuildEcFilesWithContext rebuilds missing EC files using the provided context -func RebuildEcFilesWithContext(baseFileName string, ctx *ECContext) ([]uint32, error) { - return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx) +// RebuildEcFilesWithContext rebuilds missing EC files using the provided context. +// additionalDirs are extra directories to search for existing shard files. +func RebuildEcFilesWithContext(baseFileName string, ctx *ECContext, additionalDirs ...string) ([]uint32, error) { + return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx, additionalDirs) } func ToExt(ecIndex int) string { @@ -122,30 +127,71 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, return nil } -func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext) (generatedShardIds []uint32, err error) { +// findShardFile looks for a shard file at baseFileName+ext, then in additionalDirs. +func findShardFile(baseFileName string, ext string, additionalDirs []string) string { + primary := baseFileName + ext + if util.FileExists(primary) { + return primary + } + baseName := filepath.Base(baseFileName) + for _, dir := range additionalDirs { + candidate := filepath.Join(dir, baseName+ext) + if util.FileExists(candidate) { + return candidate + } + } + return "" +} + +func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext, additionalDirs []string) (generatedShardIds []uint32, err error) { + // Pass 1: discover which shards exist and which are missing, + // opening input files but NOT creating output files yet. shardHasData := make([]bool, ctx.Total()) + shardPaths := make([]string, ctx.Total()) // non-empty for present shards inputFiles := make([]*os.File, ctx.Total()) - outputFiles := make([]*os.File, ctx.Total()) + presentCount := 0 for shardId := 0; shardId < ctx.Total(); shardId++ { - shardFileName := baseFileName + ctx.ToExt(shardId) - if util.FileExists(shardFileName) { + ext := ctx.ToExt(shardId) + shardPath := findShardFile(baseFileName, ext, additionalDirs) + if shardPath != "" { shardHasData[shardId] = true - inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0) + shardPaths[shardId] = shardPath + inputFiles[shardId], err = os.OpenFile(shardPath, os.O_RDONLY, 0) if err != nil { return nil, err } defer inputFiles[shardId].Close() + presentCount++ } else { - outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - defer outputFiles[shardId].Close() generatedShardIds = append(generatedShardIds, uint32(shardId)) } } + // Pre-check: bail out before creating any output files. + if presentCount < ctx.DataShards { + return nil, fmt.Errorf("not enough shards to rebuild %s: found %d shards, need at least %d (data shards), missing shards: %v", + baseFileName, presentCount, ctx.DataShards, generatedShardIds) + } + + glog.V(0).Infof("rebuilding %s: %d shards present, %d missing %v, config %s", + baseFileName, presentCount, len(generatedShardIds), generatedShardIds, ctx.String()) + + // Pass 2: create output files for missing shards now that we know + // reconstruction is possible. + outputFiles := make([]*os.File, ctx.Total()) + for shardId := 0; shardId < ctx.Total(); shardId++ { + if shardHasData[shardId] { + continue + } + outputFileName := baseFileName + ctx.ToExt(shardId) + outputFiles[shardId], err = os.OpenFile(outputFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + defer outputFiles[shardId].Close() + } + err = rebuildEcFiles(shardHasData, inputFiles, outputFiles, ctx) if err != nil { return nil, fmt.Errorf("rebuildEcFiles: %w", err)