Browse Source

fix: sync EC volume files before copying to fix deleted needles not being marked when decoding (#7755)

* fix: sync EC volume files before copying to fix deleted needles not being marked when decoding (#7751)

When a file is deleted from an EC volume, the deletion is written to both
the .ecx and .ecj files. However, these writes were not synced to disk
before the files were copied during ec.decode. This caused the copied
files to miss the deletion markers, resulting in 'leaked' space where
deleted files were not properly tracked after decoding.

This fix:
1. Adds a Sync() method to EcVolume that flushes .ecx and .ecj files
   to disk without closing them
2. Calls Sync() in CopyFile before copying EC volume files to ensure
   all deletions are visible to the copy operation

Fixes #7751

* test: add integration tests for EC volume deletion sync (issue #7751)

Add comprehensive tests to verify that deleted needles are properly
visible after EcVolume.Sync() is called. These tests cover:

1. TestWriteIdxFileFromEcIndex_PreservesDeletedNeedles
   - Verifies that WriteIdxFileFromEcIndex preserves deletion markers
     from .ecx files when generating .idx files

2. TestWriteIdxFileFromEcIndex_ProcessesEcjJournal
   - Verifies that deletions from .ecj journal file are correctly
     appended to the generated .idx file

3. TestEcxFileDeletionVisibleAfterSync
   - Verifies that MarkNeedleDeleted changes are visible after Sync()

4. TestEcxFileDeletionWithSeparateHandles
   - Tests that synced changes are visible across separate file handles

5. TestEcVolumeSyncEnsuresDeletionsVisible
   - Integration test for the full EcVolume.DeleteNeedleFromEcx +
     Sync() workflow that validates the fix for issue #7751

* refactor: log sync errors in EcVolume.Sync() instead of ignoring them

Per code review feedback: sync errors could reintroduce the bug this PR
fixes, so logging warnings helps with debugging.
pull/7760/head
Chris Lu 3 weeks ago
committed by GitHub
parent
commit
32a9a1f46f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 7
      weed/server/volume_grpc_copy.go
  2. 274
      weed/storage/erasure_coding/ec_decoder_test.go
  3. 18
      weed/storage/erasure_coding/ec_volume.go

7
weed/server/volume_grpc_copy.go

@ -343,6 +343,13 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
v.SyncToDisk()
fileName = v.FileName(req.Ext)
} else {
// Sync EC volume files to disk before copying to ensure deletions are visible
// This fixes issue #7751 where deleted files in encoded volumes were not
// properly marked as deleted when decoded.
if ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
ecVolume.Sync()
}
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
tName := util.Join(location.Directory, baseFileName)

274
weed/storage/erasure_coding/ec_decoder_test.go

@ -6,6 +6,7 @@ import (
"testing"
erasure_coding "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
@ -79,3 +80,276 @@ func makeNeedleMapEntry(key types.NeedleId, offset types.Offset, size types.Size
types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:types.NeedleIdSize+types.OffsetSize+types.SizeSize], size)
return b
}
// TestWriteIdxFileFromEcIndex_PreservesDeletedNeedles verifies that WriteIdxFileFromEcIndex
// correctly marks deleted needles in the generated .idx file.
// This tests the fix for issue #7751 where deleted files in encoded volumes
// were not properly marked as deleted when decoded.
func TestWriteIdxFileFromEcIndex_PreservesDeletedNeedles(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "foo_1")
// Create an .ecx file with one live needle and one deleted needle
needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100))
needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.TombstoneFileSize) // deleted
ecxData := append(needle1, needle2...)
if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
// Generate .idx from .ecx
if err := erasure_coding.WriteIdxFileFromEcIndex(base); err != nil {
t.Fatalf("WriteIdxFileFromEcIndex: %v", err)
}
// Verify .idx file has the same content
idxData, err := os.ReadFile(base + ".idx")
if err != nil {
t.Fatalf("read idx: %v", err)
}
if len(idxData) != len(ecxData) {
t.Fatalf("idx file size mismatch: got %d, want %d", len(idxData), len(ecxData))
}
// Verify the second needle is still marked as deleted
entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize
entry2 := idxData[entrySize : entrySize*2]
size2 := types.BytesToSize(entry2[types.NeedleIdSize+types.OffsetSize:])
if !size2.IsDeleted() {
t.Fatalf("expected needle 2 to be marked as deleted, got size: %d", size2)
}
}
// TestWriteIdxFileFromEcIndex_ProcessesEcjJournal verifies that WriteIdxFileFromEcIndex
// correctly processes deletions from the .ecj journal file.
func TestWriteIdxFileFromEcIndex_ProcessesEcjJournal(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "foo_1")
// Create an .ecx file with two live needles
needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100))
needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200))
ecxData := append(needle1, needle2...)
if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
// Create an .ecj file that records needle 2 as deleted
ecjData := make([]byte, types.NeedleIdSize)
types.NeedleIdToBytes(ecjData, types.NeedleId(2))
if err := os.WriteFile(base+".ecj", ecjData, 0644); err != nil {
t.Fatalf("write ecj: %v", err)
}
// Generate .idx from .ecx and .ecj
if err := erasure_coding.WriteIdxFileFromEcIndex(base); err != nil {
t.Fatalf("WriteIdxFileFromEcIndex: %v", err)
}
// Verify .idx file has 3 entries: 2 from .ecx + 1 deletion from .ecj
idxData, err := os.ReadFile(base + ".idx")
if err != nil {
t.Fatalf("read idx: %v", err)
}
entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize
expectedSize := entrySize * 3 // 2 from ecx + 1 deletion append from ecj
if len(idxData) != expectedSize {
t.Fatalf("idx file size mismatch: got %d, want %d", len(idxData), expectedSize)
}
// The third entry should be the deletion record for needle 2
entry3 := idxData[entrySize*2 : entrySize*3]
key3 := types.BytesToNeedleId(entry3[0:types.NeedleIdSize])
size3 := types.BytesToSize(entry3[types.NeedleIdSize+types.OffsetSize:])
if key3 != types.NeedleId(2) {
t.Fatalf("expected needle id 2 in deletion record, got: %d", key3)
}
if !size3.IsDeleted() {
t.Fatalf("expected deletion record to have tombstone size, got: %d", size3)
}
}
// TestEcxFileDeletionVisibleAfterSync verifies that deletions made to .ecx
// via MarkNeedleDeleted are visible to other readers after Sync().
// This is a regression test for issue #7751.
func TestEcxFileDeletionVisibleAfterSync(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "foo_1")
// Create an .ecx file with one live needle
needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100))
if err := os.WriteFile(base+".ecx", needle1, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
// Open the file for writing (simulating what EcVolume does)
ecxFile, err := os.OpenFile(base+".ecx", os.O_RDWR, 0644)
if err != nil {
t.Fatalf("open ecx: %v", err)
}
// Mark needle as deleted using MarkNeedleDeleted
err = erasure_coding.MarkNeedleDeleted(ecxFile, 0)
if err != nil {
ecxFile.Close()
t.Fatalf("MarkNeedleDeleted: %v", err)
}
// Sync the file to ensure changes are visible to other readers
if err := ecxFile.Sync(); err != nil {
ecxFile.Close()
t.Fatalf("Sync: %v", err)
}
ecxFile.Close()
// Now open with a new file handle and verify deletion is visible
data, err := os.ReadFile(base + ".ecx")
if err != nil {
t.Fatalf("read ecx: %v", err)
}
size := types.BytesToSize(data[types.NeedleIdSize+types.OffsetSize:])
if !size.IsDeleted() {
t.Fatalf("expected needle to be marked as deleted after sync, got size: %d", size)
}
}
// TestEcxFileDeletionNotVisibleWithoutSync verifies that without Sync(),
// deletions may not be visible to other readers (demonstrating the bug).
// Note: This test may be flaky depending on OS caching behavior, but it
// documents the expected behavior.
func TestEcxFileDeletionWithSeparateHandles(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "foo_1")
// Create an .ecx file with one live needle
needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100))
if err := os.WriteFile(base+".ecx", needle1, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
// Open the file for writing (writer handle)
writerFile, err := os.OpenFile(base+".ecx", os.O_RDWR, 0644)
if err != nil {
t.Fatalf("open ecx for write: %v", err)
}
defer writerFile.Close()
// Open the file for reading (reader handle - simulating CopyFile behavior)
readerFile, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644)
if err != nil {
t.Fatalf("open ecx for read: %v", err)
}
defer readerFile.Close()
// Mark needle as deleted via writer handle
err = erasure_coding.MarkNeedleDeleted(writerFile, 0)
if err != nil {
t.Fatalf("MarkNeedleDeleted: %v", err)
}
// Sync the writer to flush changes
if err := writerFile.Sync(); err != nil {
t.Fatalf("Sync: %v", err)
}
// Read via reader handle - after sync, changes should be visible
data := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize)
if _, err := readerFile.ReadAt(data, 0); err != nil {
t.Fatalf("ReadAt: %v", err)
}
size := types.BytesToSize(data[types.NeedleIdSize+types.OffsetSize:])
if !size.IsDeleted() {
t.Fatalf("expected deletion to be visible after Sync(), got size: %d", size)
}
}
// TestEcVolumeSyncEnsuresDeletionsVisible is an integration test for issue #7751
// that verifies the EcVolume.Sync() method ensures deleted needles are visible
// to external readers (like ec.decode's CopyFile).
func TestEcVolumeSyncEnsuresDeletionsVisible(t *testing.T) {
dir := t.TempDir()
collection := "test"
vid := 1
base := filepath.Join(dir, collection+"_1")
// Create initial .ecx file with live needle
needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100))
needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200))
ecxData := append(needle1, needle2...)
if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
// Create empty .ecj file
if err := os.WriteFile(base+".ecj", []byte{}, 0644); err != nil {
t.Fatalf("write ecj: %v", err)
}
// Create minimal EC shard file to allow EcVolume creation
// Shards need super block header (8 bytes)
shardData := make([]byte, 8)
if err := os.WriteFile(base+".ec00", shardData, 0644); err != nil {
t.Fatalf("write ec00: %v", err)
}
// Create .vif file
if err := os.WriteFile(base+".vif", []byte{}, 0644); err != nil {
t.Fatalf("write vif: %v", err)
}
// Create EcVolume
ecVolume, err := erasure_coding.NewEcVolume(
"hdd",
dir, // data dir
dir, // idx dir
collection,
needle.VolumeId(vid),
)
if err != nil {
t.Fatalf("NewEcVolume: %v", err)
}
defer ecVolume.Close()
// Delete needle 2 via EcVolume (this writes to .ecx and .ecj)
if err := ecVolume.DeleteNeedleFromEcx(types.NeedleId(2)); err != nil {
t.Fatalf("DeleteNeedleFromEcx: %v", err)
}
// Before Sync: open a new reader handle to simulate CopyFile
readerBefore, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644)
if err != nil {
t.Fatalf("open ecx for read (before sync): %v", err)
}
defer readerBefore.Close()
// Now call Sync() - this is what fixes issue #7751
ecVolume.Sync()
// After Sync: open another reader handle
readerAfter, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644)
if err != nil {
t.Fatalf("open ecx for read (after sync): %v", err)
}
defer readerAfter.Close()
// Read needle 2's entry from the after-sync handle
entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize
data := make([]byte, entrySize)
if _, err := readerAfter.ReadAt(data, int64(entrySize)); err != nil {
t.Fatalf("ReadAt after sync: %v", err)
}
// Verify needle 2 is marked as deleted
size := types.BytesToSize(data[types.NeedleIdSize+types.OffsetSize:])
if !size.IsDeleted() {
t.Fatalf("expected needle 2 to be deleted after Sync(), got size: %d", size)
}
}

18
weed/storage/erasure_coding/ec_volume.go

@ -166,6 +166,24 @@ func (ev *EcVolume) Close() {
}
}
// Sync flushes the .ecx and .ecj files to disk without closing them.
// This ensures that deletions made via DeleteNeedleFromEcx are visible
// to other processes/file handles that may read these files.
func (ev *EcVolume) Sync() {
if ev.ecjFile != nil {
ev.ecjFileAccessLock.Lock()
if err := ev.ecjFile.Sync(); err != nil {
glog.Warningf("failed to sync ecj file for volume %d: %v", ev.VolumeId, err)
}
ev.ecjFileAccessLock.Unlock()
}
if ev.ecxFile != nil {
if err := ev.ecxFile.Sync(); err != nil {
glog.Warningf("failed to sync ecx file for volume %d: %v", ev.VolumeId, err)
}
}
}
func (ev *EcVolume) Destroy() {
ev.Close()

Loading…
Cancel
Save