Browse Source

implement todos

add-ec-vacuum
chrislu 4 months ago
parent
commit
cac6a51cbf
  1. 78
      weed/worker/tasks/ec_vacuum/ec_vacuum_task.go

78
weed/worker/tasks/ec_vacuum/ec_vacuum_task.go

@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
storage_types "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
@ -392,7 +393,7 @@ func (t *EcVacuumTask) mergeEcjFiles() error {
// Merge all .ecj files into a single file
// Each .ecj file contains deleted needle IDs from a specific server
deletedNeedles := make(map[string]bool) // Track unique deleted needles
deletedNeedles := make(map[storage_types.NeedleId]bool) // Track unique deleted needles
for _, ecjFile := range ecjFiles {
err := t.processEcjFile(ecjFile, deletedNeedles)
@ -421,35 +422,39 @@ func (t *EcVacuumTask) mergeEcjFiles() error {
}
// processEcjFile reads a .ecj file and adds deleted needle IDs to the set
func (t *EcVacuumTask) processEcjFile(ecjFile string, deletedNeedles map[string]bool) error {
// TODO: Implement proper .ecj file parsing
// .ecj files contain binary data with deleted needle IDs
// For now, we'll use a placeholder implementation
func (t *EcVacuumTask) processEcjFile(ecjFile string, deletedNeedles map[storage_types.NeedleId]bool) error {
t.LogInfo("Processing .ecj file for deleted needle IDs", map[string]interface{}{
"file": ecjFile,
})
// Get base name for the file (remove .ecj extension) for IterateEcjFile
baseName := strings.TrimSuffix(ecjFile, ".ecj")
deletedCount := 0
err := erasure_coding.IterateEcjFile(baseName, func(needleId storage_types.NeedleId) error {
deletedNeedles[needleId] = true
deletedCount++
return nil
})
file, err := os.Open(ecjFile)
if err != nil {
return fmt.Errorf("failed to open .ecj file %s: %w", ecjFile, err)
return fmt.Errorf("failed to iterate .ecj file %s: %w", ecjFile, err)
}
defer file.Close()
// Simple implementation: if file exists and has content, we assume some deletions
// Real implementation would parse the binary format to extract actual needle IDs
info, err := file.Stat()
if err == nil && info.Size() > 0 {
// For now, just log that we found a non-empty .ecj file
t.LogInfo("Found non-empty .ecj file with deletions", map[string]interface{}{
"file": ecjFile,
"size": info.Size(),
})
}
t.LogInfo("Successfully processed .ecj file", map[string]interface{}{
"file": ecjFile,
"deleted_needles": deletedCount,
})
return nil
}
// writeMergedEcjFile writes the merged deletion information to a new .ecj file
func (t *EcVacuumTask) writeMergedEcjFile(mergedEcjFile string, deletedNeedles map[string]bool) error {
// TODO: Implement proper .ecj file writing
// For now, create an empty file since we don't have proper parsing yet
func (t *EcVacuumTask) writeMergedEcjFile(mergedEcjFile string, deletedNeedles map[storage_types.NeedleId]bool) error {
t.LogInfo("Writing merged .ecj file", map[string]interface{}{
"file": mergedEcjFile,
"deleted_needles": len(deletedNeedles),
})
file, err := os.Create(mergedEcjFile)
if err != nil {
@ -457,9 +462,28 @@ func (t *EcVacuumTask) writeMergedEcjFile(mergedEcjFile string, deletedNeedles m
}
defer file.Close()
t.LogInfo("Created merged .ecj file", map[string]interface{}{
// Write each deleted needle ID as binary data
writtenCount := 0
needleBytes := make([]byte, storage_types.NeedleIdSize)
for needleId := range deletedNeedles {
storage_types.NeedleIdToBytes(needleBytes, needleId)
_, err := file.Write(needleBytes)
if err != nil {
return fmt.Errorf("failed to write needle ID to .ecj file: %w", err)
}
writtenCount++
}
// Sync to ensure data is written to disk
err = file.Sync()
if err != nil {
return fmt.Errorf("failed to sync .ecj file: %w", err)
}
t.LogInfo("Successfully wrote merged .ecj file", map[string]interface{}{
"file": mergedEcjFile,
"deleted_needles": len(deletedNeedles),
"deleted_needles": writtenCount,
"file_size": writtenCount * storage_types.NeedleIdSize,
})
return nil
@ -574,11 +598,13 @@ func (t *EcVacuumTask) distributeNewEcShards() error {
targetBaseFileName := filepath.Join(t.tempDir, fmt.Sprintf("%s_%d_g%d", t.collection, t.volumeID, t.targetGeneration))
// Step 1: Find one server with dedicated index folder for shared index files (.ecx, .vif, .ecj)
// Step 1: Find best server for shared index files (.vif, .ecj)
// Note: .ecx files are skipped per user guidance - they can be regenerated
var indexServer pb.ServerAddress
for serverAddr := range t.sourceNodes {
// For now, use the first server as index server
// TODO: Check if server has dedicated index folder capability
// Use the first server as index server
// Future enhancement: Query volume server capabilities to find servers with dedicated index folders
// This could be done via a new gRPC call or by checking server configuration
indexServer = serverAddr
break
}

Loading…
Cancel
Save