diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 450901105..5692f04a1 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + storage_types "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" "github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types/base" @@ -392,7 +393,7 @@ func (t *EcVacuumTask) mergeEcjFiles() error { // Merge all .ecj files into a single file // Each .ecj file contains deleted needle IDs from a specific server - deletedNeedles := make(map[string]bool) // Track unique deleted needles + deletedNeedles := make(map[storage_types.NeedleId]bool) // Track unique deleted needles for _, ecjFile := range ecjFiles { err := t.processEcjFile(ecjFile, deletedNeedles) @@ -421,35 +422,39 @@ func (t *EcVacuumTask) mergeEcjFiles() error { } // processEcjFile reads a .ecj file and adds deleted needle IDs to the set -func (t *EcVacuumTask) processEcjFile(ecjFile string, deletedNeedles map[string]bool) error { - // TODO: Implement proper .ecj file parsing - // .ecj files contain binary data with deleted needle IDs - // For now, we'll use a placeholder implementation +func (t *EcVacuumTask) processEcjFile(ecjFile string, deletedNeedles map[storage_types.NeedleId]bool) error { + t.LogInfo("Processing .ecj file for deleted needle IDs", map[string]interface{}{ + "file": ecjFile, + }) + + // Get base name for the file (remove .ecj extension) for IterateEcjFile + baseName := strings.TrimSuffix(ecjFile, ".ecj") + + deletedCount := 0 + err := erasure_coding.IterateEcjFile(baseName, func(needleId storage_types.NeedleId) error { + deletedNeedles[needleId] = true + deletedCount++ + return nil + }) - file, err := os.Open(ecjFile) if err != nil { - return fmt.Errorf("failed to open .ecj file %s: %w", ecjFile, err) + return fmt.Errorf("failed to iterate .ecj file %s: %w", ecjFile, err) } - defer file.Close() - // Simple implementation: if file exists and has content, we assume some deletions - // Real implementation would parse the binary format to extract actual needle IDs - info, err := file.Stat() - if err == nil && info.Size() > 0 { - // For now, just log that we found a non-empty .ecj file - t.LogInfo("Found non-empty .ecj file with deletions", map[string]interface{}{ - "file": ecjFile, - "size": info.Size(), - }) - } + t.LogInfo("Successfully processed .ecj file", map[string]interface{}{ + "file": ecjFile, + "deleted_needles": deletedCount, + }) return nil } // writeMergedEcjFile writes the merged deletion information to a new .ecj file -func (t *EcVacuumTask) writeMergedEcjFile(mergedEcjFile string, deletedNeedles map[string]bool) error { - // TODO: Implement proper .ecj file writing - // For now, create an empty file since we don't have proper parsing yet +func (t *EcVacuumTask) writeMergedEcjFile(mergedEcjFile string, deletedNeedles map[storage_types.NeedleId]bool) error { + t.LogInfo("Writing merged .ecj file", map[string]interface{}{ + "file": mergedEcjFile, + "deleted_needles": len(deletedNeedles), + }) file, err := os.Create(mergedEcjFile) if err != nil { @@ -457,9 +462,28 @@ func (t *EcVacuumTask) writeMergedEcjFile(mergedEcjFile string, deletedNeedles m } defer file.Close() - t.LogInfo("Created merged .ecj file", map[string]interface{}{ + // Write each deleted needle ID as binary data + writtenCount := 0 + needleBytes := make([]byte, storage_types.NeedleIdSize) + for needleId := range deletedNeedles { + storage_types.NeedleIdToBytes(needleBytes, needleId) + _, err := file.Write(needleBytes) + if err != nil { + return fmt.Errorf("failed to write needle ID to .ecj file: %w", err) + } + writtenCount++ + } + + // Sync to ensure data is written to disk + err = file.Sync() + if err != nil { + return fmt.Errorf("failed to sync .ecj file: %w", err) + } + + t.LogInfo("Successfully wrote merged .ecj file", map[string]interface{}{ "file": mergedEcjFile, - "deleted_needles": len(deletedNeedles), + "deleted_needles": writtenCount, + "file_size": writtenCount * storage_types.NeedleIdSize, }) return nil @@ -574,11 +598,13 @@ func (t *EcVacuumTask) distributeNewEcShards() error { targetBaseFileName := filepath.Join(t.tempDir, fmt.Sprintf("%s_%d_g%d", t.collection, t.volumeID, t.targetGeneration)) - // Step 1: Find one server with dedicated index folder for shared index files (.ecx, .vif, .ecj) + // Step 1: Find best server for shared index files (.vif, .ecj) + // Note: .ecx files are skipped per user guidance - they can be regenerated var indexServer pb.ServerAddress for serverAddr := range t.sourceNodes { - // For now, use the first server as index server - // TODO: Check if server has dedicated index folder capability + // Use the first server as index server + // Future enhancement: Query volume server capabilities to find servers with dedicated index folders + // This could be done via a new gRPC call or by checking server configuration indexServer = serverAddr break }