diff --git a/test/volume_server/grpc/ec_decode_ecj_test.go b/test/volume_server/grpc/ec_decode_ecj_test.go new file mode 100644 index 000000000..bd48594d1 --- /dev/null +++ b/test/volume_server/grpc/ec_decode_ecj_test.go @@ -0,0 +1,294 @@ +package volume_server_grpc_test + +import ( + "context" + "net/http" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +// TestEcDecodePreservesDeletedNeedles verifies that needles deleted via +// VolumeEcBlobDelete (recorded in .ecj) are correctly excluded from the +// decoded volume produced by VolumeEcShardsToVolume. +func TestEcDecodePreservesDeletedNeedles(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + const ( + volumeID = uint32(140) + keyA = uint64(990020) + cookieA = uint32(0xDA001122) + keyB = uint64(990021) + cookieB = uint32(0xDA003344) + ) + + framework.AllocateVolume(t, client, volumeID, "") + + httpClient := framework.NewHTTPClient() + fidA := framework.NewFileID(volumeID, keyA, cookieA) + fidB := framework.NewFileID(volumeID, keyB, cookieB) + payloadA := []byte("needle-A-should-be-deleted-after-decode") + payloadB := []byte("needle-B-should-survive-decode") + + // Upload two needles. + for _, tc := range []struct { + fid string + payload []byte + }{ + {fidA, payloadA}, + {fidB, payloadB}, + } { + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(), tc.fid, tc.payload) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload %s: expected 201, got %d", tc.fid, resp.StatusCode) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // EC encode. + _, err := client.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsGenerate: %v", err) + } + + // Mount all data shards so the EC volume is usable. + _, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: "", + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount: %v", err) + } + + // Delete needle A via EC path (writes to .ecj). + _, err = client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + VolumeId: volumeID, Collection: "", + FileKey: keyA, Version: uint32(needle.GetCurrentVersion()), + }) + if err != nil { + t.Fatalf("VolumeEcBlobDelete needle A: %v", err) + } + + // Unmount the normal volume so decode writes fresh files. + _, err = client.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeUnmount: %v", err) + } + + // Decode EC shards back to a normal volume. + _, err = client.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsToVolume: %v", err) + } + + // Re-mount the decoded volume. + _, err = client.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeMount: %v", err) + } + + // Needle A should be gone (deleted via .ecj before decode). + respA := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(), fidA) + bodyA := framework.ReadAllAndClose(t, respA) + if respA.StatusCode >= 500 { + t.Fatalf("needle A read: server error %d: %s", respA.StatusCode, bodyA) + } + if respA.StatusCode != http.StatusNotFound { + t.Fatalf("needle A should be 404 after decode, got %d", respA.StatusCode) + } + + // Needle B should still be readable. + respB := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(), fidB) + bodyB := framework.ReadAllAndClose(t, respB) + if respB.StatusCode != http.StatusOK { + t.Fatalf("needle B read: expected 200, got %d", respB.StatusCode) + } + if string(bodyB) != string(payloadB) { + t.Fatalf("needle B payload mismatch: got %q, want %q", bodyB, payloadB) + } +} + +// TestEcDecodeCollectsEcjFromPeer verifies that .ecj deletion entries from a +// peer server that contributes no new data shards are still collected during +// decode. This is the regression test for the fix in collectEcShards that +// always copies .ecj from every shard location. +// +// Scenario: +// - Server 0 holds all 10 data shards (decode target). +// - Server 1 holds a copy of shard 0 (no new shards for server 0). +// - A needle is deleted ONLY on server 1 (server 0's .ecj is empty). +// - During decode on server 0, server 1's .ecj must be collected and applied. +func TestEcDecodeCollectsEcjFromPeer(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartMultiVolumeClusterAuto(t, matrix.P1(), 2) + conn0, client0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, client1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer conn1.Close() + + const ( + volumeID = uint32(141) + keyA = uint64(990030) + cookieA = uint32(0xDB001122) + keyB = uint64(990031) + cookieB = uint32(0xDB003344) + ) + + // Allocate and upload on server 0. + framework.AllocateVolume(t, client0, volumeID, "") + + httpClient := framework.NewHTTPClient() + fidA := framework.NewFileID(volumeID, keyA, cookieA) + fidB := framework.NewFileID(volumeID, keyB, cookieB) + payloadB := []byte("needle-B-should-survive-peer-ecj-decode") + + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidA, []byte("needle-A-deleted-on-peer")) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload A: expected 201, got %d", resp.StatusCode) + } + resp = framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidB, payloadB) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload B: expected 201, got %d", resp.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // EC encode on server 0. + _, err := client0.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsGenerate on server 0: %v", err) + } + + // Build the SourceDataNode address for server 0 (format: host:adminPort.grpcPort). + sourceDataNode := cluster.VolumeAdminAddress(0) + "." + + strings.Split(cluster.VolumeGRPCAddress(0), ":")[1] + + // Copy shard 0 + ecx + ecj from server 0 → server 1. + _, err = client1.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: volumeID, + Collection: "", + SourceDataNode: sourceDataNode, + ShardIds: []uint32{0}, + CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, + }) + if err != nil { + t.Fatalf("VolumeEcShardsCopy 0→1: %v", err) + } + + // Mount shard 0 on server 1 so the EC volume can accept deletions. + _, err = client1.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: "", + ShardIds: []uint32{0}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount on server 1: %v", err) + } + + // Delete needle A on server 1 only (creates .ecj entry on server 1). + _, err = client1.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + VolumeId: volumeID, Collection: "", + FileKey: keyA, Version: uint32(needle.GetCurrentVersion()), + }) + if err != nil { + t.Fatalf("VolumeEcBlobDelete needle A on server 1: %v", err) + } + + // Mount all data shards on server 0 (the decode target). + _, err = client0.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: "", + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount on server 0: %v", err) + } + + // Collect .ecj from server 1 → server 0 with NO new shard IDs. + // This is the critical path: server 1 has shard 0 which server 0 already + // has, so needToCopyShardsInfo would be empty. Before the fix in + // collectEcShards, this copy would be skipped entirely, losing server 1's + // deletion entries. + server1DataNode := cluster.VolumeAdminAddress(1) + "." + + strings.Split(cluster.VolumeGRPCAddress(1), ":")[1] + + _, err = client0.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: volumeID, + Collection: "", + SourceDataNode: server1DataNode, + ShardIds: []uint32{}, // No new shards — just .ecj. + CopyEcxFile: false, + CopyEcjFile: true, + CopyVifFile: false, + }) + if err != nil { + t.Fatalf("VolumeEcShardsCopy .ecj from server 1→0: %v", err) + } + + // Unmount the normal volume before decode. + _, err = client0.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeUnmount on server 0: %v", err) + } + + // Decode on server 0. RebuildEcxFile should see needle A's deletion from + // the .ecj that was collected from server 1. + _, err = client0.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: volumeID, Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsToVolume on server 0: %v", err) + } + + // Re-mount the decoded normal volume. + _, err = client0.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeMount on server 0: %v", err) + } + + // Needle A should be gone — its deletion was in server 1's .ecj. + respA := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidA) + bodyA := framework.ReadAllAndClose(t, respA) + if respA.StatusCode >= 500 { + t.Fatalf("needle A read: server error %d: %s", respA.StatusCode, bodyA) + } + if respA.StatusCode != http.StatusNotFound { + t.Fatalf("needle A should be 404 (ecj from peer), got %d", respA.StatusCode) + } + + // Needle B should still be readable. + respB := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(0), fidB) + bodyB := framework.ReadAllAndClose(t, respB) + if respB.StatusCode != http.StatusOK { + t.Fatalf("needle B read: expected 200, got %d", respB.StatusCode) + } + if string(bodyB) != string(payloadB) { + t.Fatalf("needle B payload mismatch: got %q, want %q", bodyB, payloadB) + } +} diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 5eb2ec2f4..dbca04587 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -609,6 +609,17 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ } dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() + if !util.FileExists(indexBaseFileName + ".ecx") { + indexBaseFileName = dataBaseFileName + } + + // Merge .ecj deletions into .ecx so that HasLiveNeedles and FindDatFileSize + // see the full set of deleted needles. Without this, needles deleted after the + // last ecx rebuild would still appear live, causing the decoded .dat to include + // data that should be skipped and HasLiveNeedles to return a false positive. + if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", indexBaseFileName, err) + } // If the EC index contains no live entries, decoding should be a no-op: // just allow the caller to purge EC shards and do not generate an empty normal volume. @@ -636,6 +647,29 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err) } + var volumeLocation *storage.DiskLocation + for _, location := range vs.store.Locations { + if candidate, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found && candidate == v { + volumeLocation = location + break + } + } + if volumeLocation == nil { + return nil, fmt.Errorf("ec volume %d location not found for offline compaction", req.VolumeId) + } + + if err := vs.store.CompactVolumeFiles( + needle.VolumeId(req.VolumeId), + v.Collection, + volumeLocation, + vs.needleMapKind, + vs.ldbTimout, + 0, + vs.compactionBytePerSecond, + ); err != nil { + glog.Errorf("CompactVolumeFiles %s: %v", dataBaseFileName, err) + } + return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil } diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index b903ebf45..5c906c297 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -169,8 +169,14 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } + // mount the decoded volume after server-side offline compaction succeeded + err = mountDecodedVolume(commandEnv.option.GrpcDialOption, targetNodeLocation, vid) + if err != nil { + return fmt.Errorf("mount decoded volume %d on %s: %v", vid, targetNodeLocation, err) + } + // delete the previous ec shards - err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcShardsInfo, vid) + err = unmountAndDeleteEcShardsWithPrefix("deleteDecodedEcShards", commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } @@ -219,23 +225,16 @@ func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialO return ewg.Wait() } -func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { - - // mount volume - if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { +func mountDecodedVolume(grpcDialOption grpc.DialOption, targetNodeLocation pb.ServerAddress, vid needle.VolumeId) error { + return operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) return mountErr - }); err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) - } - - return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid) + }) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { - fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -280,13 +279,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddre } needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards() - if needToCopyShardsInfo.Count() == 0 { - continue - } err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation) + // Always collect .ecj from every shard location. Each server's .ecj + // only contains deletions for needles whose data resides in shards + // held by that server. Without merging all .ecj files, deletions + // recorded on other servers would be lost during decode. + if needToCopyShardsInfo.Count() > 0 { + fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation) + } else { + fmt.Printf("collect ecj %d %s => %s\n", vid, loc, targetNodeLocation) + } _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), @@ -294,21 +298,23 @@ func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddre ShardIds: needToCopyShardsInfo.IdsUint32(), CopyEcxFile: false, CopyEcjFile: true, - CopyVifFile: true, + CopyVifFile: needToCopyShardsInfo.Count() > 0, SourceDataNode: string(loc), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr) } - fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation) - _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: uint32(vid), - Collection: collection, - ShardIds: needToCopyShardsInfo.IdsUint32(), - }) - if mountErr != nil { - return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr) + if needToCopyShardsInfo.Count() > 0 { + fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation) + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(vid), + Collection: collection, + ShardIds: needToCopyShardsInfo.IdsUint32(), + }) + if mountErr != nil { + return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr) + } } return nil diff --git a/weed/storage/erasure_coding/ec_decoder_test.go b/weed/storage/erasure_coding/ec_decoder_test.go index 1ca158f37..f1f54bb25 100644 --- a/weed/storage/erasure_coding/ec_decoder_test.go +++ b/weed/storage/erasure_coding/ec_decoder_test.go @@ -174,6 +174,218 @@ func TestWriteIdxFileFromEcIndex_ProcessesEcjJournal(t *testing.T) { } } +// TestDecodeWithNonEmptyEcj_AllDeleted verifies the full decode pre-processing +// when .ecj contains deletions for ALL live entries in .ecx. +// After RebuildEcxFile merges .ecj into .ecx, HasLiveNeedles must return false +// and WriteIdxFileFromEcIndex must produce an .idx where every entry is tombstoned. +func TestDecodeWithNonEmptyEcj_AllDeleted(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: two live entries + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200)) + ecxData := append(needle1, needle2...) + if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // .ecj: both needles deleted + ecjData := make([]byte, 2*types.NeedleIdSize) + types.NeedleIdToBytes(ecjData[0:types.NeedleIdSize], types.NeedleId(1)) + types.NeedleIdToBytes(ecjData[types.NeedleIdSize:], types.NeedleId(2)) + if err := os.WriteFile(base+".ecj", ecjData, 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + + // Before rebuild, ecx entries look live + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles before rebuild: %v", err) + } + if !hasLive { + t.Fatal("expected live entries before rebuild") + } + + // Simulate what VolumeEcShardsToVolume now does: merge .ecj into .ecx + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + // .ecj should be removed after rebuild + if _, err := os.Stat(base + ".ecj"); !os.IsNotExist(err) { + t.Fatal("expected .ecj to be removed after RebuildEcxFile") + } + + // After rebuild, HasLiveNeedles must return false + hasLive, err = erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles after rebuild: %v", err) + } + if hasLive { + t.Fatal("expected no live entries after rebuild merged all deletions") + } + + // WriteIdxFileFromEcIndex should still work (no .ecj to process) + if err := erasure_coding.WriteIdxFileFromEcIndex(base); err != nil { + t.Fatalf("WriteIdxFileFromEcIndex: %v", err) + } + + idxData, err := os.ReadFile(base + ".idx") + if err != nil { + t.Fatalf("read idx: %v", err) + } + + // .idx should have exactly 2 entries (copied from .ecx, both now tombstoned) + entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize + if len(idxData) != 2*entrySize { + t.Fatalf("idx file size: got %d, want %d", len(idxData), 2*entrySize) + } + + // Both entries must be tombstoned + for i := 0; i < 2; i++ { + entry := idxData[i*entrySize : (i+1)*entrySize] + size := types.BytesToSize(entry[types.NeedleIdSize+types.OffsetSize:]) + if !size.IsDeleted() { + t.Fatalf("entry %d: expected tombstone, got size %d", i+1, size) + } + } +} + +// TestDecodeWithNonEmptyEcj_PartiallyDeleted verifies decode pre-processing +// when .ecj deletes only some entries. After RebuildEcxFile, HasLiveNeedles +// must still return true for the surviving entries, and WriteIdxFileFromEcIndex +// must produce an .idx that correctly distinguishes live from deleted needles. +func TestDecodeWithNonEmptyEcj_PartiallyDeleted(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: three live entries + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200)) + needle3 := makeNeedleMapEntry(types.NeedleId(3), types.ToOffset(256), types.Size(300)) + ecxData := append(append(needle1, needle2...), needle3...) + if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // .ecj: only needle 2 is deleted + ecjData := make([]byte, types.NeedleIdSize) + types.NeedleIdToBytes(ecjData, types.NeedleId(2)) + if err := os.WriteFile(base+".ecj", ecjData, 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + + // Merge .ecj into .ecx + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + // HasLiveNeedles must still return true (needles 1 and 3 survive) + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatal("expected live entries after partial deletion") + } + + // WriteIdxFileFromEcIndex + if err := erasure_coding.WriteIdxFileFromEcIndex(base); err != nil { + t.Fatalf("WriteIdxFileFromEcIndex: %v", err) + } + + idxData, err := os.ReadFile(base + ".idx") + if err != nil { + t.Fatalf("read idx: %v", err) + } + + entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize + if len(idxData) != 3*entrySize { + t.Fatalf("idx file size: got %d, want %d", len(idxData), 3*entrySize) + } + + // Verify each entry + for i := 0; i < 3; i++ { + entry := idxData[i*entrySize : (i+1)*entrySize] + key := types.BytesToNeedleId(entry[0:types.NeedleIdSize]) + size := types.BytesToSize(entry[types.NeedleIdSize+types.OffsetSize:]) + + switch key { + case types.NeedleId(1), types.NeedleId(3): + if size.IsDeleted() { + t.Fatalf("needle %d: should be live, got tombstone", key) + } + case types.NeedleId(2): + if !size.IsDeleted() { + t.Fatalf("needle %d: should be tombstoned, got size %d", key, size) + } + default: + t.Fatalf("unexpected needle id %d", key) + } + } +} + +// TestDecodeWithEmptyEcj verifies that the decode flow is a no-op when +// .ecj exists but is empty (no deletions recorded). +func TestDecodeWithEmptyEcj(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: one live entry + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + if err := os.WriteFile(base+".ecx", needle1, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // .ecj: empty + if err := os.WriteFile(base+".ecj", []byte{}, 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + + // RebuildEcxFile with empty .ecj should not change anything + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + // HasLiveNeedles must still return true + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatal("expected live entries with empty .ecj") + } +} + +// TestDecodeWithNoEcjFile verifies that the decode flow works when no .ecj +// file exists at all. +func TestDecodeWithNoEcjFile(t *testing.T) { + dir := t.TempDir() + base := filepath.Join(dir, "test_1") + + // .ecx: one live entry + needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) + if err := os.WriteFile(base+".ecx", needle1, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + // No .ecj file + + // RebuildEcxFile should be a no-op + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatal("expected live entries without .ecj file") + } +} + // TestEcxFileDeletionVisibleAfterSync verifies that deletions made to .ecx // via MarkNeedleDeleted are visible to other readers after Sync(). // This is a regression test for issue #7751. diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 178d96b08..7d4f6dc89 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -19,27 +19,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { if v := s.findVolume(vid); v != nil { - // Get current volume size for space calculation - volumeSize, indexSize, _ := v.FileStat() - - // Calculate space needed for compaction: - // 1. Space for the new compacted volume (approximately same as current volume size) - // 2. Use the larger of preallocate or estimated volume size - estimatedCompactSize := int64(volumeSize + indexSize) - spaceNeeded := preallocate - if estimatedCompactSize > preallocate { - spaceNeeded = estimatedCompactSize + if err := ensureCompactVolumeSpace(v, preallocate); err != nil { + return err } - - diskStatus := stats.NewDiskStatus(v.dir) - if int64(diskStatus.Free) < spaceNeeded { - return fmt.Errorf("insufficient free space for compaction: need %d bytes (volume: %d, index: %d, buffer: 10%%), but only %d bytes available", - spaceNeeded, volumeSize, indexSize, diskStatus.Free) - } - - glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d", - vid, volumeSize, indexSize, spaceNeeded, diskStatus.Free) - return v.CompactByIndex(&CompactOptions{ PreallocateBytes: preallocate, MaxBytesPerSecond: compactionBytePerSecond, @@ -71,3 +53,71 @@ func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error { } return fmt.Errorf("volume id %d is not found during cleaning up", vid) } + +func ensureCompactVolumeSpace(v *Volume, preallocate int64) error { + // Get current volume size for space calculation + volumeSize, indexSize, _ := v.FileStat() + + // Calculate space needed for compaction: + // 1. Space for the new compacted volume (approximately same as current volume size) + // 2. Use the larger of preallocate or estimated volume size + estimatedCompactSize := int64(volumeSize + indexSize) + spaceNeeded := preallocate + if estimatedCompactSize > preallocate { + spaceNeeded = estimatedCompactSize + } + + diskStatus := stats.NewDiskStatus(v.dir) + if int64(diskStatus.Free) < spaceNeeded { + return fmt.Errorf("insufficient free space for compaction: need %d bytes (volume: %d, index: %d), but only %d bytes available", + spaceNeeded, volumeSize, indexSize, diskStatus.Free) + } + + glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d", + v.Id, volumeSize, indexSize, spaceNeeded, diskStatus.Free) + + return nil +} + +func (s *Store) CompactVolumeFiles(vid needle.VolumeId, collection string, location *DiskLocation, needleMapKind NeedleMapKind, ldbTimeout int64, preallocate int64, compactionBytePerSecond int64) (err error) { + if location == nil { + return fmt.Errorf("volume %d compaction location is nil", vid) + } + + tempVolume, err := loadVolumeWithoutWorker(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, ldbTimeout) + if err != nil { + return fmt.Errorf("load volume %d for offline compaction: %w", vid, err) + } + tempVolume.location = location + + defer func() { + if tempVolume.tmpNm != nil { + tempVolume.tmpNm.Close() + tempVolume.tmpNm = nil + } + tempVolume.doClose() + }() + + if err := ensureCompactVolumeSpace(tempVolume, preallocate); err != nil { + return err + } + + if err := tempVolume.CompactByIndex(&CompactOptions{ + PreallocateBytes: preallocate, + MaxBytesPerSecond: compactionBytePerSecond, + }); err != nil { + if cleanupErr := tempVolume.cleanupCompact(); cleanupErr != nil { + return fmt.Errorf("compact volume %d: %v (cleanup failed: %v)", vid, err, cleanupErr) + } + return fmt.Errorf("compact volume %d: %w", vid, err) + } + + if err := tempVolume.CommitCompact(); err != nil { + if cleanupErr := tempVolume.cleanupCompact(); cleanupErr != nil { + return fmt.Errorf("commit compact volume %d: %v (cleanup failed: %v)", vid, err, cleanupErr) + } + return fmt.Errorf("commit compact volume %d: %w", vid, err) + } + + return nil +} diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 901b4a37a..37be275b6 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -24,6 +24,20 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI return } +func loadVolumeWithoutWorker(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ldbTimeout int64) (v *Volume, err error) { + v = &Volume{ + dir: dirname, + dirIdx: dirIdx, + Collection: collection, + Id: id, + needleMapKind: needleMapKind, + ldbTimeout: ldbTimeout, + } + v.SuperBlock = super_block.SuperBlock{} + err = v.load(true, false, needleMapKind, 0, needle.GetCurrentVersion()) + return +} + func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) { alreadyHasSuperBlock := false diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 7bf1e16cd..d8d6201eb 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -2,6 +2,8 @@ package storage import ( "math/rand" + "os" + "path/filepath" "reflect" "testing" "time" @@ -9,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" ) /* @@ -146,6 +149,90 @@ func testCompactionByIndex(t *testing.T, needleMapKind NeedleMapKind) { } } + +func TestCompactVolumeFilesOffline(t *testing.T) { + dir := t.TempDir() + location := NewDiskLocation(dir, 10, util.MinFreeSpace{}, dir, "", nil) + defer location.Close() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + + infos := make([]*needleInfo, 32) + for i := 1; i <= 32; i++ { + doSomeWritesDeletes(i, v, t, infos) + } + v.Close() + + store := &Store{} + if err := store.CompactVolumeFiles(needle.VolumeId(1), "", location, NeedleMapInMemory, 0, 0, 0); err != nil { + t.Fatalf("CompactVolumeFiles: %v", err) + } + + reloaded, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume reload: %v", err) + } + defer reloaded.Close() + + if _, err := os.Stat(filepath.Join(dir, "1.cpd")); !os.IsNotExist(err) { + t.Fatalf("expected no .cpd after successful offline compaction, got err=%v", err) + } + if _, err := os.Stat(filepath.Join(dir, "1.cpx")); !os.IsNotExist(err) { + t.Fatalf("expected no .cpx after successful offline compaction, got err=%v", err) + } +} + +func TestCleanupCompactRemovesTempFiles(t *testing.T) { + dir := t.TempDir() + location := NewDiskLocation(dir, 10, util.MinFreeSpace{}, dir, "", nil) + defer location.Close() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + + infos := make([]*needleInfo, 16) + for i := 1; i <= 16; i++ { + doSomeWritesDeletes(i, v, t, infos) + } + v.Close() + + if err := os.WriteFile(filepath.Join(dir, "1.cpx"), []byte("broken"), 0o644); err != nil { + t.Fatalf("write broken cpx: %v", err) + } + if err := os.WriteFile(filepath.Join(dir, "1.cpd"), []byte("temp"), 0o644); err != nil { + t.Fatalf("write cpd: %v", err) + } + if err := os.Mkdir(filepath.Join(dir, "1.cpldb"), 0o755); err != nil { + t.Fatalf("mkdir cpldb: %v", err) + } + + tempVolume, err := loadVolumeWithoutWorker(dir, dir, "", needle.VolumeId(1), NeedleMapInMemory, 0) + if err != nil { + t.Fatalf("loadVolumeWithoutWorker: %v", err) + } + tempVolume.location = location + defer tempVolume.doClose() + + if err := tempVolume.cleanupCompact(); err != nil { + t.Fatalf("cleanupCompact: %v", err) + } + + if _, err := os.Stat(filepath.Join(dir, "1.cpd")); !os.IsNotExist(err) { + t.Fatalf("expected cleanup to remove .cpd, got err=%v", err) + } + if _, err := os.Stat(filepath.Join(dir, "1.cpx")); !os.IsNotExist(err) { + t.Fatalf("expected cleanup to remove .cpx, got err=%v", err) + } + if _, err := os.Stat(filepath.Join(dir, "1.cpldb")); !os.IsNotExist(err) { + t.Fatalf("expected cleanup to remove .cpldb, got err=%v", err) + } +} + func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) _, size, _, err := v.writeNeedle2(n, true, false)