From af68449a26bb0dcb20efb76b1510300eaed1874f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 1 Apr 2026 01:15:26 -0700 Subject: [PATCH] Process .ecj deletions during EC decode and vacuum decoded volume (#8863) * Process .ecj deletions during EC decode and vacuum decoded volume (#8798) When decoding EC volumes back to normal volumes, deletions recorded in the .ecj journal were not being applied before computing the dat file size or checking for live needles. This caused the decoded volume to include data for deleted files and could produce false positives in the all-deleted check. - Call RebuildEcxFile before HasLiveNeedles/FindDatFileSize in VolumeEcShardsToVolume so .ecj deletions are merged into .ecx first - Vacuum the decoded volume after mounting in ec.decode to compact out deleted needle data from the .dat file - Add integration tests for decoding with non-empty .ecj files * storage: add offline volume compaction helper Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * ec: compact decoded volumes before deleting shards Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * ec: address PR review comments - Fall back to data directory for .ecx when idx directory lacks it - Make compaction failure non-fatal during EC decode - Remove misleading "buffer: 10%" from space check error message * ec: collect .ecj from all shard locations during decode Each server's .ecj only contains deletions for needles whose data resides in shards held by that server. Previously, sources with no new data shards to contribute were skipped entirely, losing their .ecj deletion entries. Now .ecj is always appended from every shard location so RebuildEcxFile sees the full set of deletions. * ec: add integration tests for .ecj collection during decode TestEcDecodePreservesDeletedNeedles: verifies that needles deleted via VolumeEcBlobDelete are excluded from the decoded volume. TestEcDecodeCollectsEcjFromPeer: regression test for the fix in collectEcShards. Deletes a needle only on a peer server that holds no new data shards, then verifies the deletion survives decode via .ecj collection. * ec: address review nits in decode and tests - Remove double error wrapping in mountDecodedVolume - Check VolumeUnmount error in peer ecj test - Assert 404 specifically for deleted needles, fail on 5xx --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- test/volume_server/grpc/ec_decode_ecj_test.go | 294 ++++++++++++++++++ weed/server/volume_grpc_erasure_coding.go | 34 ++ weed/shell/command_ec_decode.go | 54 ++-- .../storage/erasure_coding/ec_decoder_test.go | 212 +++++++++++++ weed/storage/store_vacuum.go | 90 ++++-- weed/storage/volume_loading.go | 14 + weed/storage/volume_vacuum_test.go | 87 ++++++ 7 files changed, 741 insertions(+), 44 deletions(-) create mode 100644 test/volume_server/grpc/ec_decode_ecj_test.go diff --git a/test/volume_server/grpc/ec_decode_ecj_test.go b/test/volume_server/grpc/ec_decode_ecj_test.go new file mode 100644 index 000000000..bd48594d1 --- /dev/null +++ b/test/volume_server/grpc/ec_decode_ecj_test.go @@ -0,0 +1,294 @@ +package volume_server_grpc_test + +import ( + "context" + "net/http" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +// TestEcDecodePreservesDeletedNeedles verifies that needles deleted via +// VolumeEcBlobDelete (recorded in .ecj) are correctly excluded from the +// decoded volume produced by VolumeEcShardsToVolume. +func TestEcDecodePreservesDeletedNeedles(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + const ( + volumeID = uint32(140) + keyA = uint64(990020) + cookieA = uint32(0xDA001122) + keyB = uint64(990021) + cookieB = uint32(0xDA003344) + ) + + framework.AllocateVolume(t, client, volumeID, "") + + httpClient := framework.NewHTTPClient() + fidA := framework.NewFileID(volumeID, keyA, cookieA) + fidB := framework.NewFileID(volumeID, keyB, cookieB) + payloadA := []byte("needle-A-should-be-deleted-after-decode") + payloadB := []byte("needle-B-should-survive-decode") + + // Upload two needles. + for _, tc := range []struct { + fid string + payload []byte + }{ + {fidA, payloadA}, + {fidB, payloadB}, + } { + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(), tc.fid, tc.payload) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload %s: expected 201, got %d", tc.fid, resp.StatusCode) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // EC encode. + _, err := client.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsGenerate: %v", err) + } + + // Mount all data shards so the EC volume is usable. + _, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: "", + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount: %v", err) + } + + // Delete needle A via EC path (writes to .ecj). + _, err = client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + VolumeId: volumeID, Collection: "", + FileKey: keyA, Version: uint32(needle.GetCurrentVersion()), + }) + if err != nil { + t.Fatalf("VolumeEcBlobDelete needle A: %v", err) + } + + // Unmount the normal volume so decode writes fresh files. + _, err = client.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeUnmount: %v", err) + } + + // Decode EC shards back to a normal volume. + _, err = client.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsToVolume: %v", err) + } + + // Re-mount the decoded volume. + _, err = client.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeMount: %v", err) + } + + // Needle A should be gone (deleted via .ecj before decode). + respA := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(), fidA) + bodyA := framework.ReadAllAndClose(t, respA) + if respA.StatusCode >= 500 { + t.Fatalf("needle A read: server error %d: %s", respA.StatusCode, bodyA) + } + if respA.StatusCode != http.StatusNotFound { + t.Fatalf("needle A should be 404 after decode, got %d", respA.StatusCode) + } + + // Needle B should still be readable. + respB := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(), fidB) + bodyB := framework.ReadAllAndClose(t, respB) + if respB.StatusCode != http.StatusOK { + t.Fatalf("needle B read: expected 200, got %d", respB.StatusCode) + } + if string(bodyB) != string(payloadB) { + t.Fatalf("needle B payload mismatch: got %q, want %q", bodyB, payloadB) + } +} + +// TestEcDecodeCollectsEcjFromPeer verifies that .ecj deletion entries from a +// peer server that contributes no new data shards are still collected during +// decode. This is the regression test for the fix in collectEcShards that +// always copies .ecj from every shard location. +// +// Scenario: +// - Server 0 holds all 10 data shards (decode target). +// - Server 1 holds a copy of shard 0 (no new shards for server 0). +// - A needle is deleted ONLY on server 1 (server 0's .ecj is empty). +// - During decode on server 0, server 1's .ecj must be collected and applied. +func TestEcDecodeCollectsEcjFromPeer(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartMultiVolumeClusterAuto(t, matrix.P1(), 2) + conn0, client0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, client1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer conn1.Close() + + const ( + volumeID = uint32(141) + keyA = uint64(990030) + cookieA = uint32(0xDB001122) + keyB = uint64(990031) + cookieB = uint32(0xDB003344) + ) + + // Allocate and upload on server 0. + framework.AllocateVolume(t, client0, volumeID, "") + + httpClient := framework.NewHTTPClient() + fidA := framework.NewFileID(volumeID, keyA, cookieA) + fidB := framework.NewFileID(volumeID, keyB, cookieB) + payloadB := []byte("needle-B-should-survive-peer-ecj-decode") + + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidA, []byte("needle-A-deleted-on-peer")) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload A: expected 201, got %d", resp.StatusCode) + } + resp = framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidB, payloadB) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload B: expected 201, got %d", resp.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // EC encode on server 0. + _, err := client0.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsGenerate on server 0: %v", err) + } + + // Build the SourceDataNode address for server 0 (format: host:adminPort.grpcPort). + sourceDataNode := cluster.VolumeAdminAddress(0) + "." + + strings.Split(cluster.VolumeGRPCAddress(0), ":")[1] + + // Copy shard 0 + ecx + ecj from server 0 → server 1. + _, err = client1.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: volumeID, + Collection: "", + SourceDataNode: sourceDataNode, + ShardIds: []uint32{0}, + CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, + }) + if err != nil { + t.Fatalf("VolumeEcShardsCopy 0→1: %v", err) + } + + // Mount shard 0 on server 1 so the EC volume can accept deletions. + _, err = client1.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: "", + ShardIds: []uint32{0}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount on server 1: %v", err) + } + + // Delete needle A on server 1 only (creates .ecj entry on server 1). + _, err = client1.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + VolumeId: volumeID, Collection: "", + FileKey: keyA, Version: uint32(needle.GetCurrentVersion()), + }) + if err != nil { + t.Fatalf("VolumeEcBlobDelete needle A on server 1: %v", err) + } + + // Mount all data shards on server 0 (the decode target). + _, err = client0.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: "", + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount on server 0: %v", err) + } + + // Collect .ecj from server 1 → server 0 with NO new shard IDs. + // This is the critical path: server 1 has shard 0 which server 0 already + // has, so needToCopyShardsInfo would be empty. Before the fix in + // collectEcShards, this copy would be skipped entirely, losing server 1's + // deletion entries. + server1DataNode := cluster.VolumeAdminAddress(1) + "." + + strings.Split(cluster.VolumeGRPCAddress(1), ":")[1] + + _, err = client0.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: volumeID, + Collection: "", + SourceDataNode: server1DataNode, + ShardIds: []uint32{}, // No new shards — just .ecj. + CopyEcxFile: false, + CopyEcjFile: true, + CopyVifFile: false, + }) + if err != nil { + t.Fatalf("VolumeEcShardsCopy .ecj from server 1→0: %v", err) + } + + // Unmount the normal volume before decode. + _, err = client0.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeUnmount on server 0: %v", err) + } + + // Decode on server 0. RebuildEcxFile should see needle A's deletion from + // the .ecj that was collected from server 1. + _, err = client0.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsToVolume on server 0: %v", err) + } + + // Re-mount the decoded normal volume. + _, err = client0.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeMount on server 0: %v", err) + } + + // Needle A should be gone — its deletion was in server 1's .ecj. + respA := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidA) + bodyA := framework.ReadAllAndClose(t, respA) + if respA.StatusCode >= 500 { + t.Fatalf("needle A read: server error %d: %s", respA.StatusCode, bodyA) + } + if respA.StatusCode != http.StatusNotFound { + t.Fatalf("needle A should be 404 (ecj from peer), got %d", respA.StatusCode) + } + + // Needle B should still be readable. + respB := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidB) + bodyB := framework.ReadAllAndClose(t, respB) + if respB.StatusCode != http.StatusOK { + t.Fatalf("needle B read: expected 200, got %d", respB.StatusCode) + } + if string(bodyB) != string(payloadB) { + t.Fatalf("needle B payload mismatch: got %q, want %q", bodyB, payloadB) + } +} diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 5eb2ec2f4..dbca04587 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -609,6 +609,17 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ } dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() + if !util.FileExists(indexBaseFileName + ".ecx") { + indexBaseFileName = dataBaseFileName + } + + // Merge .ecj deletions into .ecx so that HasLiveNeedles and FindDatFileSize + // see the full set of deleted needles. Without this, needles deleted after the + // last ecx rebuild would still appear live, causing the decoded .dat to include + // data that should be skipped and HasLiveNeedles to return a false positive. + if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", indexBaseFileName, err) + } // If the EC index contains no live entries, decoding should be a no-op: // just allow the caller to purge EC shards and do not generate an empty normal volume. @@ -636,6 +647,29 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err) } + var volumeLocation *storage.DiskLocation + for _, location := range vs.store.Locations { + if candidate, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found && candidate == v { + volumeLocation = location + break + } + } + if volumeLocation == nil { + return nil, fmt.Errorf("ec volume %d location not found for offline compaction", req.VolumeId) + } + + if err := vs.store.CompactVolumeFiles( + needle.VolumeId(req.VolumeId), + v.Collection, + volumeLocation, + vs.needleMapKind, + vs.ldbTimout, + 0, + vs.compactionBytePerSecond, + ); err != nil { + glog.Errorf("CompactVolumeFiles %s: %v", dataBaseFileName, err) + } + return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil } diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index b903ebf45..5c906c297 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -169,8 +169,14 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } + // mount the decoded volume after server-side offline compaction succeeded + err = mountDecodedVolume(commandEnv.option.GrpcDialOption, targetNodeLocation, vid) + if err != nil { + return fmt.Errorf("mount decoded volume %d on %s: %v", vid, targetNodeLocation, err) + } + // delete the previous ec shards - err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcShardsInfo, vid) + err = unmountAndDeleteEcShardsWithPrefix("deleteDecodedEcShards", commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } @@ -219,23 +225,16 @@ func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialO return ewg.Wait() } -func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { - - // mount volume - if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { +func mountDecodedVolume(grpcDialOption grpc.DialOption, targetNodeLocation pb.ServerAddress, vid needle.VolumeId) error { + return operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) return mountErr - }); err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) - } - - return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid) + }) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { - fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -280,13 +279,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddre } needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards() - if needToCopyShardsInfo.Count() == 0 { - continue - } err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation) + // Always collect .ecj from every shard location. Each server's .ecj + // only contains deletions for needles whose data resides in shards + // held by that server. Without merging all .ecj files, deletions + // recorded on other servers would be lost during decode. + if needToCopyShardsInfo.Count() > 0 { + fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation) + } else { + fmt.Printf("collect ecj %d %s => %s\n", vid, loc, targetNodeLocation) + } _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), @@ -294,21 +298,23 @@ func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddre ShardIds: needToCopyShardsInfo.IdsUint32(), CopyEcxFile: false, CopyEcjFile: true, - CopyVifFile: true, + CopyVifFile: needToCopyShardsInfo.Count() > 0, SourceDataNode: string(loc), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr) } - fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation) - _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: uint32(vid), - Collection: collection, - ShardIds: needToCopyShardsInfo.IdsUint32(), - }) - if mountErr != nil { - return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr) + if needToCopyShardsInfo.Count() > 0 { + fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation) + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(vid), + Collection: collection, + ShardIds: needToCopyShardsInfo.IdsUint32(), + }) + if mountErr != nil { + return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr) + } } return nil diff --git a/weed/storage/erasure_coding/ec_decoder_test.go b/weed/storage/erasure_coding/ec_decoder_test.go index 1ca158f37..f1f54bb25 100644 --- a/weed/storage/erasure_coding/ec_decoder_test.go +++ b/weed/storage/erasure_coding/ec_decoder_test.go @@ -174,6 +174,218 @@ func TestWriteIdxFileFromEcIndex_ProcessesEcjJournal(t *testing.T) { } } +// TestDecodeWithNonEmptyEcj_AllDeleted verifies the full decode pre-processing +// when .ecj contains deletions for ALL live entries in .ecx. +// After RebuildEcxFile merges .ecj into .ecx, HasLiveNeedles must return false +// and WriteIdxFileFromEcIndex must produce an .idx where every entry is tombstoned. +func TestDecodeWithNonEmptyEcj_AllDeleted(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: two live entries + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200)) + ecxData := append(needle1, needle2...) + if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // .ecj: both needles deleted + ecjData := make([]byte, 2*types.NeedleIdSize) + types.NeedleIdToBytes(ecjData[0:types.NeedleIdSize], types.NeedleId(1)) + types.NeedleIdToBytes(ecjData[types.NeedleIdSize:], types.NeedleId(2)) + if err := os.WriteFile(base+".ecj", ecjData, 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + + // Before rebuild, ecx entries look live + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles before rebuild: %v", err) + } + if !hasLive { + t.Fatal("expected live entries before rebuild") + } + + // Simulate what VolumeEcShardsToVolume now does: merge .ecj into .ecx + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + // .ecj should be removed after rebuild + if _, err := os.Stat(base + ".ecj"); !os.IsNotExist(err) { + t.Fatal("expected .ecj to be removed after RebuildEcxFile") + } + + // After rebuild, HasLiveNeedles must return false + hasLive, err = erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles after rebuild: %v", err) + } + if hasLive { + t.Fatal("expected no live entries after rebuild merged all deletions") + } + + // WriteIdxFileFromEcIndex should still work (no .ecj to process) + if err := erasure_coding.WriteIdxFileFromEcIndex(base); err != nil { + t.Fatalf("WriteIdxFileFromEcIndex: %v", err) + } + + idxData, err := os.ReadFile(base + ".idx") + if err != nil { + t.Fatalf("read idx: %v", err) + } + + // .idx should have exactly 2 entries (copied from .ecx, both now tombstoned) + entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize + if len(idxData) != 2*entrySize { + t.Fatalf("idx file size: got %d, want %d", len(idxData), 2*entrySize) + } + + // Both entries must be tombstoned + for i := 0; i < 2; i++ { + entry := idxData[i*entrySize : (i+1)*entrySize] + size := types.BytesToSize(entry[types.NeedleIdSize+types.OffsetSize:]) + if !size.IsDeleted() { + t.Fatalf("entry %d: expected tombstone, got size %d", i+1, size) + } + } +} + +// TestDecodeWithNonEmptyEcj_PartiallyDeleted verifies decode pre-processing +// when .ecj deletes only some entries. After RebuildEcxFile, HasLiveNeedles +// must still return true for the surviving entries, and WriteIdxFileFromEcIndex +// must produce an .idx that correctly distinguishes live from deleted needles. +func TestDecodeWithNonEmptyEcj_PartiallyDeleted(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: three live entries + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200)) + needle3 := makeNeedleMapEntry(types.NeedleId(3), types.ToOffset(256), types.Size(300)) + ecxData := append(append(needle1, needle2...), needle3...) + if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // .ecj: only needle 2 is deleted + ecjData := make([]byte, types.NeedleIdSize) + types.NeedleIdToBytes(ecjData, types.NeedleId(2)) + if err := os.WriteFile(base+".ecj", ecjData, 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + + // Merge .ecj into .ecx + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + // HasLiveNeedles must still return true (needles 1 and 3 survive) + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatal("expected live entries after partial deletion") + } + + // WriteIdxFileFromEcIndex + if err := erasure_coding.WriteIdxFileFromEcIndex(base); err != nil { + t.Fatalf("WriteIdxFileFromEcIndex: %v", err) + } + + idxData, err := os.ReadFile(base + ".idx") + if err != nil { + t.Fatalf("read idx: %v", err) + } + + entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize + if len(idxData) != 3*entrySize { + t.Fatalf("idx file size: got %d, want %d", len(idxData), 3*entrySize) + } + + // Verify each entry + for i := 0; i < 3; i++ { + entry := idxData[i*entrySize : (i+1)*entrySize] + key := types.BytesToNeedleId(entry[0:types.NeedleIdSize]) + size := types.BytesToSize(entry[types.NeedleIdSize+types.OffsetSize:]) + + switch key { + case types.NeedleId(1), types.NeedleId(3): + if size.IsDeleted() { + t.Fatalf("needle %d: should be live, got tombstone", key) + } + case types.NeedleId(2): + if !size.IsDeleted() { + t.Fatalf("needle %d: should be tombstoned, got size %d", key, size) + } + default: + t.Fatalf("unexpected needle id %d", key) + } + } +} + +// TestDecodeWithEmptyEcj verifies that the decode flow is a no-op when +// .ecj exists but is empty (no deletions recorded). +func TestDecodeWithEmptyEcj(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: one live entry + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + if err := os.WriteFile(base+".ecx", needle1, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // .ecj: empty + if err := os.WriteFile(base+".ecj", []byte{}, 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + + // RebuildEcxFile with empty .ecj should not change anything + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + // HasLiveNeedles must still return true + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatal("expected live entries with empty .ecj") + } +} + +// TestDecodeWithNoEcjFile verifies that the decode flow works when no .ecj +// file exists at all. +func TestDecodeWithNoEcjFile(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: one live entry + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + if err := os.WriteFile(base+".ecx", needle1, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // No .ecj file + + // RebuildEcxFile should be a no-op + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatal("expected live entries without .ecj file") + } +} + // TestEcxFileDeletionVisibleAfterSync verifies that deletions made to .ecx // via MarkNeedleDeleted are visible to other readers after Sync(). // This is a regression test for issue #7751. diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 178d96b08..7d4f6dc89 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -19,27 +19,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { if v := s.findVolume(vid); v != nil { - // Get current volume size for space calculation - volumeSize, indexSize, _ := v.FileStat() - - // Calculate space needed for compaction: - // 1. Space for the new compacted volume (approximately same as current volume size) - // 2. Use the larger of preallocate or estimated volume size - estimatedCompactSize := int64(volumeSize + indexSize) - spaceNeeded := preallocate - if estimatedCompactSize > preallocate { - spaceNeeded = estimatedCompactSize + if err := ensureCompactVolumeSpace(v, preallocate); err != nil { + return err } - - diskStatus := stats.NewDiskStatus(v.dir) - if int64(diskStatus.Free) < spaceNeeded { - return fmt.Errorf("insufficient free space for compaction: need %d bytes (volume: %d, index: %d, buffer: 10%%), but only %d bytes available", - spaceNeeded, volumeSize, indexSize, diskStatus.Free) - } - - glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d", - vid, volumeSize, indexSize, spaceNeeded, diskStatus.Free) - return v.CompactByIndex(&CompactOptions{ PreallocateBytes: preallocate, MaxBytesPerSecond: compactionBytePerSecond, @@ -71,3 +53,71 @@ func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error { } return fmt.Errorf("volume id %d is not found during cleaning up", vid) } + +func ensureCompactVolumeSpace(v *Volume, preallocate int64) error { + // Get current volume size for space calculation + volumeSize, indexSize, _ := v.FileStat() + + // Calculate space needed for compaction: + // 1. Space for the new compacted volume (approximately same as current volume size) + // 2. Use the larger of preallocate or estimated volume size + estimatedCompactSize := int64(volumeSize + indexSize) + spaceNeeded := preallocate + if estimatedCompactSize > preallocate { + spaceNeeded = estimatedCompactSize + } + + diskStatus := stats.NewDiskStatus(v.dir) + if int64(diskStatus.Free) < spaceNeeded { + return fmt.Errorf("insufficient free space for compaction: need %d bytes (volume: %d, index: %d), but only %d bytes available", + spaceNeeded, volumeSize, indexSize, diskStatus.Free) + } + + glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d", + v.Id, volumeSize, indexSize, spaceNeeded, diskStatus.Free) + + return nil +} + +func (s *Store) CompactVolumeFiles(vid needle.VolumeId, collection string, location *DiskLocation, needleMapKind NeedleMapKind, ldbTimeout int64, preallocate int64, compactionBytePerSecond int64) (err error) { + if location == nil { + return fmt.Errorf("volume %d compaction location is nil", vid) + } + + tempVolume, err := loadVolumeWithoutWorker(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, ldbTimeout) + if err != nil { + return fmt.Errorf("load volume %d for offline compaction: %w", vid, err) + } + tempVolume.location = location + + defer func() { + if tempVolume.tmpNm != nil { + tempVolume.tmpNm.Close() + tempVolume.tmpNm = nil + } + tempVolume.doClose() + }() + + if err := ensureCompactVolumeSpace(tempVolume, preallocate); err != nil { + return err + } + + if err := tempVolume.CompactByIndex(&CompactOptions{ + PreallocateBytes: preallocate, + MaxBytesPerSecond: compactionBytePerSecond, + }); err != nil { + if cleanupErr := tempVolume.cleanupCompact(); cleanupErr != nil { + return fmt.Errorf("compact volume %d: %v (cleanup failed: %v)", vid, err, cleanupErr) + } + return fmt.Errorf("compact volume %d: %w", vid, err) + } + + if err := tempVolume.CommitCompact(); err != nil { + if cleanupErr := tempVolume.cleanupCompact(); cleanupErr != nil { + return fmt.Errorf("commit compact volume %d: %v (cleanup failed: %v)", vid, err, cleanupErr) + } + return fmt.Errorf("commit compact volume %d: %w", vid, err) + } + + return nil +} diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 901b4a37a..37be275b6 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -24,6 +24,20 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI return } +func loadVolumeWithoutWorker(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ldbTimeout int64) (v *Volume, err error) { + v = &Volume{ + dir: dirname, + dirIdx: dirIdx, + Collection: collection, + Id: id, + needleMapKind: needleMapKind, + ldbTimeout: ldbTimeout, + } + v.SuperBlock = super_block.SuperBlock{} + err = v.load(true, false, needleMapKind, 0, needle.GetCurrentVersion()) + return +} + func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) { alreadyHasSuperBlock := false diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 7bf1e16cd..d8d6201eb 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -2,6 +2,8 @@ package storage import ( "math/rand" + "os" + "path/filepath" "reflect" "testing" "time" @@ -9,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" ) /* @@ -146,6 +149,90 @@ func testCompactionByIndex(t *testing.T, needleMapKind NeedleMapKind) { } } + +func TestCompactVolumeFilesOffline(t *testing.T) { + dir := t.TempDir() + location := NewDiskLocation(dir, 10, util.MinFreeSpace{}, dir, "", nil) + defer location.Close() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + + infos := make([]*needleInfo, 32) + for i := 1; i <= 32; i++ { + doSomeWritesDeletes(i, v, t, infos) + } + v.Close() + + store := &Store{} + if err := store.CompactVolumeFiles(needle.VolumeId(1), "", location, NeedleMapInMemory, 0, 0, 0); err != nil { + t.Fatalf("CompactVolumeFiles: %v", err) + } + + reloaded, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume reload: %v", err) + } + defer reloaded.Close() + + if _, err := os.Stat(filepath.Join(dir, "1.cpd")); !os.IsNotExist(err) { + t.Fatalf("expected no .cpd after successful offline compaction, got err=%v", err) + } + if _, err := os.Stat(filepath.Join(dir, "1.cpx")); !os.IsNotExist(err) { + t.Fatalf("expected no .cpx after successful offline compaction, got err=%v", err) + } +} + +func TestCleanupCompactRemovesTempFiles(t *testing.T) { + dir := t.TempDir() + location := NewDiskLocation(dir, 10, util.MinFreeSpace{}, dir, "", nil) + defer location.Close() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + + infos := make([]*needleInfo, 16) + for i := 1; i <= 16; i++ { + doSomeWritesDeletes(i, v, t, infos) + } + v.Close() + + if err := os.WriteFile(filepath.Join(dir, "1.cpx"), []byte("broken"), 0o644); err != nil { + t.Fatalf("write broken cpx: %v", err) + } + if err := os.WriteFile(filepath.Join(dir, "1.cpd"), []byte("temp"), 0o644); err != nil { + t.Fatalf("write cpd: %v", err) + } + if err := os.Mkdir(filepath.Join(dir, "1.cpldb"), 0o755); err != nil { + t.Fatalf("mkdir cpldb: %v", err) + } + + tempVolume, err := loadVolumeWithoutWorker(dir, dir, "", needle.VolumeId(1), NeedleMapInMemory, 0) + if err != nil { + t.Fatalf("loadVolumeWithoutWorker: %v", err) + } + tempVolume.location = location + defer tempVolume.doClose() + + if err := tempVolume.cleanupCompact(); err != nil { + t.Fatalf("cleanupCompact: %v", err) + } + + if _, err := os.Stat(filepath.Join(dir, "1.cpd")); !os.IsNotExist(err) { + t.Fatalf("expected cleanup to remove .cpd, got err=%v", err) + } + if _, err := os.Stat(filepath.Join(dir, "1.cpx")); !os.IsNotExist(err) { + t.Fatalf("expected cleanup to remove .cpx, got err=%v", err) + } + if _, err := os.Stat(filepath.Join(dir, "1.cpldb")); !os.IsNotExist(err) { + t.Fatalf("expected cleanup to remove .cpldb, got err=%v", err) + } +} + func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) _, size, _, err := v.writeNeedle2(n, true, false)