From 9e2e600b6d215cdac0161f0a8b0842332a0aa7c8 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 14:14:37 -0700 Subject: [PATCH] VolumeEcShardsGenerate updated for generation-specific file creation --- weed/server/volume_grpc_erasure_coding.go | 41 ++++++++++++++----- weed/shell/command_ec_encode.go | 1 + weed/storage/erasure_coding/ec_encoder.go | 10 ++++- weed/worker/tasks/ec_vacuum/ec_vacuum_task.go | 1 + 4 files changed, 40 insertions(+), 13 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 6d4ff623e..c5dc2a951 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -7,6 +7,7 @@ import ( "math" "os" "path" + "path/filepath" "strings" "time" @@ -44,31 +45,49 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ if v == nil { return nil, fmt.Errorf("volume %d not found", req.VolumeId) } - baseFileName := v.DataFileName() if v.Collection != req.Collection { return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + // Generate output filenames with generation suffix + generation := req.Generation + // Extract base names by removing file extensions + dataFileName := v.DataFileName() // e.g., "/data/collection_123.dat" + indexFileName := v.IndexFileName() // e.g., "/index/collection_123.idx" + + // Remove the .dat and .idx extensions to get base filenames + dataBaseName := dataFileName[:len(dataFileName)-4] // removes ".dat" + indexBaseName := indexFileName[:len(indexFileName)-4] // removes ".idx" + + // Apply generation naming + dataBaseFileName := erasure_coding.EcShardFileNameWithGeneration(v.Collection, filepath.Dir(dataBaseName), int(req.VolumeId), generation) + indexBaseFileName := erasure_coding.EcShardFileNameWithGeneration(v.Collection, filepath.Dir(indexBaseName), int(req.VolumeId), generation) + + glog.V(1).Infof("VolumeEcShardsGenerate: generating EC shards with generation %d: data=%s, index=%s", + generation, dataBaseFileName, indexBaseFileName) + shouldCleanup := true defer func() { if !shouldCleanup { return } + // Clean up generation-specific files on error for i := 0; i < erasure_coding.TotalShardsCount; i++ { - os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i)) + os.Remove(fmt.Sprintf("%s.ec%02d", dataBaseFileName, i)) } - os.Remove(v.IndexFileName() + ".ecx") + os.Remove(indexBaseFileName + ".ecx") + os.Remove(dataBaseFileName + ".vif") }() - // write .ec00 ~ .ec13 files - if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { - return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + // write .ec00 ~ .ec13 files with generation-specific names + if err := erasure_coding.WriteEcFiles(dataBaseFileName); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err) } - // write .ecx file - if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil { - return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err) + // write .ecx file with generation-specific name + if err := erasure_coding.WriteSortedFileFromIdxToTarget(v.IndexFileName(), indexBaseFileName+".ecx"); err != nil { + return nil, fmt.Errorf("WriteSortedFileFromIdxToTarget %s: %v", indexBaseFileName, err) } // write .vif files @@ -84,8 +103,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ datSize, _, _ := v.FileStat() volumeInfo.DatFileSize = int64(datSize) - if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil { - return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err) + if err := volume_info.SaveVolumeInfo(dataBaseFileName+".vif", volumeInfo); err != nil { + return nil, fmt.Errorf("SaveVolumeInfo %s: %v", dataBaseFileName, err) } shouldCleanup = false diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index a0794294e..42c9b942b 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -262,6 +262,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, + Generation: 0, // shell commands operate on existing (generation 0) volumes }) return genErr }) diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index eeeb156e6..8987e5aa2 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -26,8 +26,14 @@ const ( // WriteSortedFileFromIdx generates .ecx file from existing .idx file // all keys are sorted in ascending order func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) { + return WriteSortedFileFromIdxToTarget(baseFileName, baseFileName+ext) +} + +// WriteSortedFileFromIdxToTarget generates .ecx file from existing .idx file to specified target +// all keys are sorted in ascending order +func WriteSortedFileFromIdxToTarget(sourceBaseFileName string, targetFileName string) (e error) { - nm, err := readNeedleMap(baseFileName) + nm, err := readNeedleMap(sourceBaseFileName) if nm != nil { defer nm.Close() } @@ -35,7 +41,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) { return fmt.Errorf("readNeedleMap: %w", err) } - ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) + ecxFile, err := os.OpenFile(targetFileName, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("failed to open ecx file: %w", err) } diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 0525a7ccf..7a7367aba 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -205,6 +205,7 @@ func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error _, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: t.volumeID, Collection: t.collection, + Generation: 1, // TODO: implement proper generation tracking in vacuum task }) return err })