From c4d642b8aa4b9470dd0c6eb594573a9bdebdb1ae Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 14 Mar 2026 20:59:47 -0700 Subject: [PATCH] fix(ec): gather shards from all disk locations before rebuild (#8633) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(ec): gather shards from all disk locations before rebuild (#8631) Fix "too few shards given" error during ec.rebuild on multi-disk volume servers. The root cause has two parts: 1. VolumeEcShardsRebuild only looked at a single disk location for shard files. On multi-disk servers, the existing local shards could be on one disk while copied shards were placed on another, causing the rebuild to see fewer shards than actually available. 2. VolumeEcShardsCopy had a DiskId condition (req.DiskId == 0 && len(vs.store.Locations) > 0) that was always true, making the FindFreeLocation fallback dead code. This meant copies always went to Locations[0] regardless of where existing shards were. Changes: - VolumeEcShardsRebuild now finds the location with the most shards, then gathers shard files from other locations via hard links (or symlinks for cross-device) before rebuilding. Gathered files are cleaned up after rebuild. - VolumeEcShardsCopy now only uses Locations[DiskId] when DiskId > 0 (explicitly set). Otherwise, it prefers the location that already has the EC volume, falling back to HDD then any free location. - generateMissingEcFiles now logs shard counts and provides a clear error message when not enough shards are found, instead of passing through to the opaque reedsolomon "too few shards given" error. * fix(ec): update test to match skip behavior for unrepairable volumes The test expected an error for volumes with insufficient shards, but commit 5acb4578a changed unrepairable volumes to be skipped with a log message instead of returning an error. Update the test to verify the skip behavior and log output. * fix(ec): address PR review comments - Add comment clarifying DiskId=0 means "not specified" (protobuf default), callers must use DiskId >= 1 to target a specific disk. - Log warnings on cleanup failures for gathered shard links. * fix(ec): read shard files from other disks directly instead of linking Replace the hard link / symlink gathering approach with passing additional search directories into RebuildEcFiles. The rebuild function now opens shard files directly from whichever disk they live on, avoiding filesystem link operations and cleanup. RebuildEcFiles and RebuildEcFilesWithContext gain a variadic additionalDirs parameter (backward compatible with existing callers). * fix(ec): clarify DiskId selection semantics in VolumeEcShardsCopy comment * fix(ec): avoid empty files on failed rebuild; don't skip ecx-only locations - generateMissingEcFiles: two-pass approach — first discover present/missing shards and check reconstructability, only then create output files. This avoids leaving behind empty truncated shard files when there are too few shards to rebuild. - VolumeEcShardsRebuild: compute hasEcx before skipping zero-shard locations. A location with an .ecx file but no shard files (all shards on other disks) is now a valid rebuild candidate instead of being silently skipped. * fix(ec): select ecx-only location as rebuildLocation when none chosen yet When rebuildLocation is nil and a location has hasEcx=true but existingShardCount=0 (all shards on other disks), the condition 0 > 0 was false so it was never promoted to rebuildLocation. Add rebuildLocation == nil to the predicate so the first location with an .ecx file is always selected as a candidate. --- weed/server/volume_grpc_erasure_coding.go | 101 ++++++++++++++++------ weed/shell/command_ec_rebuild_test.go | 12 +-- weed/storage/erasure_coding/ec_encoder.go | 76 ++++++++++++---- 3 files changed, 144 insertions(+), 45 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 803aed937..5eb2ec2f4 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -144,38 +144,69 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s var rebuiltShardIds []uint32 + // Find the rebuild location: the location with the most shards and an .ecx file. + // With multi-disk servers, shards may be spread across different locations. + var rebuildLocation *storage.DiskLocation + var rebuildShardCount int + var otherLocationsWithShards []*storage.DiskLocation + for _, location := range vs.store.Locations { _, _, existingShardCount, err := checkEcVolumeStatus(baseFileName, location) if err != nil { return nil, err } - if existingShardCount == 0 { - continue - } - indexBaseFileName := path.Join(location.IdxDirectory, baseFileName) if !util.FileExists(indexBaseFileName+".ecx") && location.IdxDirectory != location.Directory { - // .ecx may be in the data directory if created before -dir.idx was configured indexBaseFileName = path.Join(location.Directory, baseFileName) } - if util.FileExists(indexBaseFileName + ".ecx") { - // write .ec00 ~ .ec13 files - dataBaseFileName := path.Join(location.Directory, baseFileName) - if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err) - } else { - rebuiltShardIds = generatedShardIds - } + hasEcx := util.FileExists(indexBaseFileName + ".ecx") - if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcxFile %s: %v", indexBaseFileName, err) - } + // Skip locations that have neither shard files nor an .ecx file. + if existingShardCount == 0 && !hasEcx { + continue + } - break + if hasEcx && (rebuildLocation == nil || existingShardCount > rebuildShardCount) { + if rebuildLocation != nil { + otherLocationsWithShards = append(otherLocationsWithShards, rebuildLocation) + } + rebuildLocation = location + rebuildShardCount = existingShardCount + } else { + otherLocationsWithShards = append(otherLocationsWithShards, location) } } + if rebuildLocation == nil { + return &volume_server_pb.VolumeEcShardsRebuildResponse{}, nil + } + + // Collect additional directories where shard files may exist. + // On multi-disk servers, existing local shards may be on a different disk + // than where copied shards were placed during ec.rebuild. + rebuildDataDir := rebuildLocation.Directory + var additionalDirs []string + for _, otherLocation := range otherLocationsWithShards { + additionalDirs = append(additionalDirs, otherLocation.Directory) + } + + // Rebuild missing EC files, searching all disk locations for input shards + dataBaseFileName := path.Join(rebuildDataDir, baseFileName) + if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName, additionalDirs...); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err) + } else { + rebuiltShardIds = generatedShardIds + } + + indexBaseFileName := path.Join(rebuildLocation.IdxDirectory, baseFileName) + if !util.FileExists(indexBaseFileName+".ecx") && rebuildLocation.IdxDirectory != rebuildLocation.Directory { + indexBaseFileName = path.Join(rebuildLocation.Directory, baseFileName) + } + if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", indexBaseFileName, err) + } + return &volume_server_pb.VolumeEcShardsRebuildResponse{ RebuiltShardIds: rebuiltShardIds, }, nil @@ -191,8 +222,20 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv var location *storage.DiskLocation - // Use disk_id if provided (disk-aware storage) - if req.DiskId > 0 || (req.DiskId == 0 && len(vs.store.Locations) > 0) { + // Select the target location for storing EC shard files. + // + // When req.DiskId > 0 the caller is explicitly choosing a disk: + // location = vs.store.Locations[req.DiskId] + // (DiskId=1 → Locations[1], DiskId=2 → Locations[2], etc.) + // + // When req.DiskId == 0 (the protobuf default, meaning "not specified") + // we auto-select location by preferring the disk that already holds EC + // shards for this volume, then falling back to any HDD, then any disk. + // + // Note: Locations[0] cannot be targeted explicitly via DiskId because 0 + // is indistinguishable from "unset". It can still be chosen by the + // auto-select logic. + if req.DiskId > 0 { // Validate disk ID is within bounds if int(req.DiskId) >= len(vs.store.Locations) { return nil, fmt.Errorf("invalid disk_id %d: only have %d disks", req.DiskId, len(vs.store.Locations)) @@ -202,13 +245,21 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv location = vs.store.Locations[req.DiskId] glog.V(1).Infof("Using disk %d for EC shard copy: %s", req.DiskId, location.Directory) } else { - // Fallback to old behavior for backward compatibility - if req.CopyEcxFile { - location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { - return location.DiskType == types.HardDriveType + // Prefer a location that already has shards for this volume, + // so all shards end up on the same disk for rebuild. + location = vs.store.FindFreeLocation(func(loc *storage.DiskLocation) bool { + _, found := loc.FindEcVolume(needle.VolumeId(req.VolumeId)) + return found + }) + if location == nil { + // Fall back to any HDD location with free space + location = vs.store.FindFreeLocation(func(loc *storage.DiskLocation) bool { + return loc.DiskType == types.HardDriveType }) - } else { - location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { + } + if location == nil { + // Fall back to any location with free space + location = vs.store.FindFreeLocation(func(loc *storage.DiskLocation) bool { return true }) } diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go index 3baaee325..4dccf29e5 100644 --- a/weed/shell/command_ec_rebuild_test.go +++ b/weed/shell/command_ec_rebuild_test.go @@ -80,7 +80,7 @@ func TestEcShardMapShardCount(t *testing.T) { } } -// TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes +// TestRebuildEcVolumesInsufficientShards tests that unrepairable volumes are skipped func TestRebuildEcVolumesInsufficientShards(t *testing.T) { var logBuffer bytes.Buffer @@ -101,11 +101,13 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) { erb.rebuildEcVolumes("c1") err := erb.ewg.Wait() - if err == nil { - t.Fatal("Expected error for insufficient shards, got nil") + // Unrepairable volumes should be skipped (not cause an error) + if err != nil { + t.Fatalf("Expected no error for unrepairable volume (should be skipped), got: %v", err) } - if !strings.Contains(err.Error(), "unrepairable") { - t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error()) + // Verify the skip message was logged + if !strings.Contains(logBuffer.String(), "unrepairable") { + t.Errorf("Expected 'unrepairable' in log output, got: %s", logBuffer.String()) } } diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 81ebffdcb..cd4d95fd8 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "path/filepath" "github.com/klauspost/reedsolomon" @@ -67,7 +68,10 @@ func WriteEcFilesWithContext(baseFileName string, ctx *ECContext) error { return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx) } -func RebuildEcFiles(baseFileName string) ([]uint32, error) { +// RebuildEcFiles rebuilds missing EC shard files. +// additionalDirs are extra directories to search for existing shard files, +// which handles multi-disk servers where shards may be spread across disks. +func RebuildEcFiles(baseFileName string, additionalDirs ...string) ([]uint32, error) { // Attempt to load EC config from .vif file to preserve original configuration var ctx *ECContext if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(baseFileName + ".vif"); found && volumeInfo.EcShardConfig != nil { @@ -90,12 +94,13 @@ func RebuildEcFiles(baseFileName string) ([]uint32, error) { ctx = NewDefaultECContext("", 0) } - return RebuildEcFilesWithContext(baseFileName, ctx) + return RebuildEcFilesWithContext(baseFileName, ctx, additionalDirs...) } -// RebuildEcFilesWithContext rebuilds missing EC files using the provided context -func RebuildEcFilesWithContext(baseFileName string, ctx *ECContext) ([]uint32, error) { - return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx) +// RebuildEcFilesWithContext rebuilds missing EC files using the provided context. +// additionalDirs are extra directories to search for existing shard files. +func RebuildEcFilesWithContext(baseFileName string, ctx *ECContext, additionalDirs ...string) ([]uint32, error) { + return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx, additionalDirs) } func ToExt(ecIndex int) string { @@ -122,30 +127,71 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, return nil } -func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext) (generatedShardIds []uint32, err error) { +// findShardFile looks for a shard file at baseFileName+ext, then in additionalDirs. +func findShardFile(baseFileName string, ext string, additionalDirs []string) string { + primary := baseFileName + ext + if util.FileExists(primary) { + return primary + } + baseName := filepath.Base(baseFileName) + for _, dir := range additionalDirs { + candidate := filepath.Join(dir, baseName+ext) + if util.FileExists(candidate) { + return candidate + } + } + return "" +} + +func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext, additionalDirs []string) (generatedShardIds []uint32, err error) { + // Pass 1: discover which shards exist and which are missing, + // opening input files but NOT creating output files yet. shardHasData := make([]bool, ctx.Total()) + shardPaths := make([]string, ctx.Total()) // non-empty for present shards inputFiles := make([]*os.File, ctx.Total()) - outputFiles := make([]*os.File, ctx.Total()) + presentCount := 0 for shardId := 0; shardId < ctx.Total(); shardId++ { - shardFileName := baseFileName + ctx.ToExt(shardId) - if util.FileExists(shardFileName) { + ext := ctx.ToExt(shardId) + shardPath := findShardFile(baseFileName, ext, additionalDirs) + if shardPath != "" { shardHasData[shardId] = true - inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0) + shardPaths[shardId] = shardPath + inputFiles[shardId], err = os.OpenFile(shardPath, os.O_RDONLY, 0) if err != nil { return nil, err } defer inputFiles[shardId].Close() + presentCount++ } else { - outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - defer outputFiles[shardId].Close() generatedShardIds = append(generatedShardIds, uint32(shardId)) } } + // Pre-check: bail out before creating any output files. + if presentCount < ctx.DataShards { + return nil, fmt.Errorf("not enough shards to rebuild %s: found %d shards, need at least %d (data shards), missing shards: %v", + baseFileName, presentCount, ctx.DataShards, generatedShardIds) + } + + glog.V(0).Infof("rebuilding %s: %d shards present, %d missing %v, config %s", + baseFileName, presentCount, len(generatedShardIds), generatedShardIds, ctx.String()) + + // Pass 2: create output files for missing shards now that we know + // reconstruction is possible. + outputFiles := make([]*os.File, ctx.Total()) + for shardId := 0; shardId < ctx.Total(); shardId++ { + if shardHasData[shardId] { + continue + } + outputFileName := baseFileName + ctx.ToExt(shardId) + outputFiles[shardId], err = os.OpenFile(outputFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + defer outputFiles[shardId].Close() + } + err = rebuildEcFiles(shardHasData, inputFiles, outputFiles, ctx) if err != nil { return nil, fmt.Errorf("rebuildEcFiles: %w", err)