From 7fbdb9b7b714d0567eedc8410426aaa468fc7eba Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 20 Mar 2026 23:52:12 -0700 Subject: [PATCH] feat(shell): add volume.tier.compact command to reclaim cloud storage space (#8715) * feat(shell): add volume.tier.compact command to reclaim cloud storage space Adds a new shell command that automates compaction of cloud tier volumes. When files are deleted from remote-tiered volumes, space is not reclaimed on the cloud storage. This command orchestrates: download from remote, compact locally, and re-upload to reclaim deleted space. Closes #8563 * fix: log cleanup errors in compactVolumeOnServer instead of discarding them Helps operators diagnose leftover temp files (.cpd/.cpx) if cleanup fails after a compaction or commit failure. * fix: return aggregate error from loop and use regex for collection filter - Track and return error count when one or more volumes fail to compact, so callers see partial failures instead of always getting nil. - Use compileCollectionPattern for -collection in -volumeId mode too, so regex patterns work consistently with the flag description. Empty pattern (no -collection given) matches all collections. --- weed/shell/command_volume_tier_compact.go | 319 ++++++++++++++++++ .../shell/command_volume_tier_compact_test.go | 255 ++++++++++++++ 2 files changed, 574 insertions(+) create mode 100644 weed/shell/command_volume_tier_compact.go create mode 100644 weed/shell/command_volume_tier_compact_test.go diff --git a/weed/shell/command_volume_tier_compact.go b/weed/shell/command_volume_tier_compact.go new file mode 100644 index 000000000..1dd501833 --- /dev/null +++ b/weed/shell/command_volume_tier_compact.go @@ -0,0 +1,319 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "google.golang.org/grpc" + + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeTierCompact{}) +} + +type commandVolumeTierCompact struct { +} + +func (c *commandVolumeTierCompact) Name() string { + return "volume.tier.compact" +} + +func (c *commandVolumeTierCompact) Help() string { + return `compact remote volumes to reclaim space on cloud storage + + volume.tier.compact [-volumeId=] + volume.tier.compact [-collection=""] [-garbageThreshold=0.3] + + e.g.: + volume.tier.compact -volumeId=7 + volume.tier.compact -collection="mybucket" -garbageThreshold=0.2 + + This command compacts cloud tier volumes by: + 1. Downloading the .dat file from remote storage to local + 2. Running compaction to remove deleted data + 3. Uploading the compacted .dat file back to remote storage + + This reclaims space on remote storage that was used by deleted files. + +` +} + +func (c *commandVolumeTierCompact) HasTag(CommandTag) bool { + return false +} + +type remoteVolumeInfo struct { + vid needle.VolumeId + collection string + remoteStorageName string + serverAddress pb.ServerAddress + serverUrl string +} + +func (c *commandVolumeTierCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := tierCommand.Int("volumeId", 0, "the volume id") + collection := tierCommand.String("collection", "", "the collection name (supports regex)") + garbageThreshold := tierCommand.Float64("garbageThreshold", 0.3, "compact when garbage ratio exceeds this value") + if err = tierCommand.Parse(args); err != nil { + return nil + } + + if err = commandEnv.confirmIsLocked(args); err != nil { + return + } + + vid := needle.VolumeId(*volumeId) + + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return err + } + + // find remote volumes + var remoteVolumes []remoteVolumeInfo + if vid != 0 { + rv, found, findErr := findRemoteVolumeInTopology(topologyInfo, vid, *collection) + if findErr != nil { + return findErr + } + if !found { + return fmt.Errorf("remote volume %d not found", vid) + } + remoteVolumes = append(remoteVolumes, rv) + } else { + remoteVolumes, err = collectRemoteVolumesWithInfo(topologyInfo, *collection) + if err != nil { + return err + } + } + + if len(remoteVolumes) == 0 { + fmt.Fprintf(writer, "no remote volumes found\n") + return nil + } + + fmt.Fprintf(writer, "found %d remote volume(s) to check for compaction\n", len(remoteVolumes)) + + var failedCount int + for _, rv := range remoteVolumes { + if err = doVolumeTierCompact(commandEnv, writer, rv, *garbageThreshold); err != nil { + fmt.Fprintf(writer, "error compacting volume %d: %v\n", rv.vid, err) + failedCount++ + } + } + + if failedCount > 0 { + return fmt.Errorf("%d of %d volume(s) failed to compact", failedCount, len(remoteVolumes)) + } + return nil +} + +func findRemoteVolumeInTopology(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, collectionPattern string) (remoteVolumeInfo, bool, error) { + // when collectionPattern is provided, compile and use as regex filter + var matchesCollection func(string) bool + if collectionPattern != "" { + collectionRegex, err := compileCollectionPattern(collectionPattern) + if err != nil { + return remoteVolumeInfo{}, false, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err) + } + matchesCollection = collectionRegex.MatchString + } else { + matchesCollection = func(string) bool { return true } + } + + var result remoteVolumeInfo + found := false + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + if found { + return + } + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if needle.VolumeId(v.Id) == vid && v.RemoteStorageName != "" && v.RemoteStorageKey != "" { + if !matchesCollection(v.Collection) { + continue + } + result = remoteVolumeInfo{ + vid: vid, + collection: v.Collection, + remoteStorageName: v.RemoteStorageName, + serverAddress: pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), + serverUrl: dn.Id, + } + found = true + return + } + } + } + }) + return result, found, nil +} + +func collectRemoteVolumesWithInfo(topoInfo *master_pb.TopologyInfo, collectionPattern string) ([]remoteVolumeInfo, error) { + collectionRegex, err := compileCollectionPattern(collectionPattern) + if err != nil { + return nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err) + } + + seen := make(map[uint32]bool) + var result []remoteVolumeInfo + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.RemoteStorageName == "" || v.RemoteStorageKey == "" { + continue + } + if !collectionRegex.MatchString(v.Collection) { + continue + } + if seen[v.Id] { + continue + } + seen[v.Id] = true + result = append(result, remoteVolumeInfo{ + vid: needle.VolumeId(v.Id), + collection: v.Collection, + remoteStorageName: v.RemoteStorageName, + serverAddress: pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), + serverUrl: dn.Id, + }) + } + } + }) + + return result, nil +} + +func doVolumeTierCompact(commandEnv *CommandEnv, writer io.Writer, rv remoteVolumeInfo, garbageThreshold float64) error { + grpcDialOption := commandEnv.option.GrpcDialOption + + // step 1: check garbage level + garbageRatio, err := checkVolumeGarbage(grpcDialOption, rv.vid, rv.serverAddress) + if err != nil { + return fmt.Errorf("check garbage for volume %d: %v", rv.vid, err) + } + + if garbageRatio < garbageThreshold { + fmt.Fprintf(writer, "volume %d garbage ratio %.4f below threshold %.4f, skipping\n", + rv.vid, garbageRatio, garbageThreshold) + return nil + } + + fmt.Fprintf(writer, "volume %d garbage ratio %.4f, starting compaction...\n", rv.vid, garbageRatio) + + // step 2: download .dat from remote to local + // this deletes the remote file and reloads the volume as local + fmt.Fprintf(writer, " downloading volume %d from %s to local...\n", rv.vid, rv.remoteStorageName) + err = downloadDatFromRemoteTier(grpcDialOption, writer, rv.vid, rv.collection, rv.serverAddress) + if err != nil { + return fmt.Errorf("download volume %d from remote: %v", rv.vid, err) + } + + // step 3: compact the local volume + fmt.Fprintf(writer, " compacting volume %d...\n", rv.vid) + err = compactVolumeOnServer(grpcDialOption, writer, rv.vid, rv.serverAddress) + if err != nil { + // compaction failed, but volume is now local without remote reference + // upload the uncompacted volume back to restore cloud tier state + fmt.Fprintf(writer, " compaction failed: %v\n", err) + fmt.Fprintf(writer, " re-uploading volume %d to %s without compaction...\n", rv.vid, rv.remoteStorageName) + uploadErr := uploadDatToRemoteTier(grpcDialOption, writer, rv.vid, rv.collection, rv.serverAddress, rv.remoteStorageName, false) + if uploadErr != nil { + return fmt.Errorf("compaction failed (%v) and re-upload also failed (%v), volume %d remains local", + err, uploadErr, rv.vid) + } + return fmt.Errorf("compaction failed (%v), volume %d re-uploaded to %s without compaction", + err, rv.vid, rv.remoteStorageName) + } + + // step 4: upload compacted volume back to remote + fmt.Fprintf(writer, " uploading compacted volume %d to %s...\n", rv.vid, rv.remoteStorageName) + err = uploadDatToRemoteTier(grpcDialOption, writer, rv.vid, rv.collection, rv.serverAddress, rv.remoteStorageName, false) + if err != nil { + return fmt.Errorf("upload compacted volume %d to %s: %v (volume remains local with compacted data)", + rv.vid, rv.remoteStorageName, err) + } + + fmt.Fprintf(writer, "volume %d compacted and uploaded to %s successfully\n", rv.vid, rv.remoteStorageName) + return nil +} + +func checkVolumeGarbage(grpcDialOption grpc.DialOption, vid needle.VolumeId, server pb.ServerAddress) (float64, error) { + var garbageRatio float64 + err := operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + VolumeId: uint32(vid), + }) + if err != nil { + return err + } + garbageRatio = resp.GarbageRatio + return nil + }) + return garbageRatio, err +} + +func compactVolumeOnServer(grpcDialOption grpc.DialOption, writer io.Writer, vid needle.VolumeId, server pb.ServerAddress) error { + // compact + err := operation.WithVolumeServerClient(true, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + VolumeId: uint32(vid), + }) + if err != nil { + return err + } + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } + return recvErr + } + fmt.Fprintf(writer, " compacted %d bytes\n", resp.ProcessedBytes) + } + return nil + }) + if err != nil { + if cleanupErr := vacuumVolumeCleanup(grpcDialOption, vid, server); cleanupErr != nil { + fmt.Fprintf(writer, " cleanup after compaction failure also failed: %v\n", cleanupErr) + } + return fmt.Errorf("compact: %v", err) + } + + // commit + err = operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + if cleanupErr := vacuumVolumeCleanup(grpcDialOption, vid, server); cleanupErr != nil { + fmt.Fprintf(writer, " cleanup after commit failure also failed: %v\n", cleanupErr) + } + return fmt.Errorf("commit: %v", err) + } + + return nil +} + +func vacuumVolumeCleanup(grpcDialOption grpc.DialOption, vid needle.VolumeId, server pb.ServerAddress) error { + return operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: uint32(vid), + }) + return err + }) +} diff --git a/weed/shell/command_volume_tier_compact_test.go b/weed/shell/command_volume_tier_compact_test.go new file mode 100644 index 000000000..1de7802e3 --- /dev/null +++ b/weed/shell/command_volume_tier_compact_test.go @@ -0,0 +1,255 @@ +package shell + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/stretchr/testify/assert" +) + +func buildTestTopologyWithRemoteVolumes() *master_pb.TopologyInfo { + return &master_pb.TopologyInfo{ + Id: "topo", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1:8080", + GrpcPort: 18080, + DiskInfos: map[string]*master_pb.DiskInfo{ + "": { + VolumeInfos: []*master_pb.VolumeInformationMessage{ + { + Id: 1, + Collection: "col1", + RemoteStorageName: "s3.default", + RemoteStorageKey: "col1/1.dat", + DeleteCount: 100, + DeletedByteCount: 1000, + Size: 5000, + }, + { + Id: 2, + Collection: "col1", + RemoteStorageName: "", + RemoteStorageKey: "", + Size: 3000, + }, + { + Id: 3, + Collection: "col2", + RemoteStorageName: "s3.archive", + RemoteStorageKey: "col2/3.dat", + DeleteCount: 50, + DeletedByteCount: 500, + Size: 2000, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Id: "dc2", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server2:8080", + GrpcPort: 18080, + DiskInfos: map[string]*master_pb.DiskInfo{ + "": { + VolumeInfos: []*master_pb.VolumeInformationMessage{ + { + Id: 4, + Collection: "bucket1", + RemoteStorageName: "s3.default", + RemoteStorageKey: "bucket1/4.dat", + Size: 8000, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func TestFindRemoteVolumeInTopology(t *testing.T) { + topo := buildTestTopologyWithRemoteVolumes() + + tests := []struct { + name string + vid needle.VolumeId + collection string + wantFound bool + wantDest string + }{ + { + name: "find existing remote volume", + vid: 1, + wantFound: true, + wantDest: "s3.default", + }, + { + name: "find remote volume with different backend", + vid: 3, + wantFound: true, + wantDest: "s3.archive", + }, + { + name: "find remote volume on another server", + vid: 4, + wantFound: true, + wantDest: "s3.default", + }, + { + name: "local volume not found as remote", + vid: 2, + wantFound: false, + }, + { + name: "non-existent volume not found", + vid: 999, + wantFound: false, + }, + { + name: "filter by matching collection exact", + vid: 1, + collection: "^col1$", + wantFound: true, + wantDest: "s3.default", + }, + { + name: "filter by matching collection regex", + vid: 1, + collection: "col.*", + wantFound: true, + wantDest: "s3.default", + }, + { + name: "filter by non-matching collection", + vid: 1, + collection: "^col2$", + wantFound: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rv, found, err := findRemoteVolumeInTopology(topo, tt.vid, tt.collection) + assert.NoError(t, err) + assert.Equal(t, tt.wantFound, found) + if found { + assert.Equal(t, tt.vid, rv.vid) + assert.Equal(t, tt.wantDest, rv.remoteStorageName) + } + }) + } +} + +func TestFindRemoteVolumeInTopologyInvalidPattern(t *testing.T) { + topo := buildTestTopologyWithRemoteVolumes() + + _, _, err := findRemoteVolumeInTopology(topo, 1, "[invalid") + assert.Error(t, err) +} + +func TestCollectRemoteVolumesWithInfo(t *testing.T) { + topo := buildTestTopologyWithRemoteVolumes() + + tests := []struct { + name string + collectionPattern string + wantCount int + wantVids []needle.VolumeId + }{ + { + name: "empty pattern matches empty collection only", + collectionPattern: "", + wantCount: 0, + }, + { + name: "match all collections", + collectionPattern: ".*", + wantCount: 3, // volumes 1, 3, 4 + wantVids: []needle.VolumeId{1, 3, 4}, + }, + { + name: "match specific collection", + collectionPattern: "^col1$", + wantCount: 1, + wantVids: []needle.VolumeId{1}, + }, + { + name: "match collection prefix", + collectionPattern: "col.*", + wantCount: 2, // volumes 1, 3 + wantVids: []needle.VolumeId{1, 3}, + }, + { + name: "match bucket collection", + collectionPattern: "bucket1", + wantCount: 1, + wantVids: []needle.VolumeId{4}, + }, + { + name: "no match", + collectionPattern: "^nonexistent$", + wantCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := collectRemoteVolumesWithInfo(topo, tt.collectionPattern) + assert.NoError(t, err) + assert.Equal(t, tt.wantCount, len(result)) + + if tt.wantVids != nil { + gotVids := make(map[needle.VolumeId]bool) + for _, rv := range result { + gotVids[rv.vid] = true + } + for _, vid := range tt.wantVids { + assert.True(t, gotVids[vid], "expected volume %d in results", vid) + } + } + }) + } +} + +func TestCollectRemoteVolumesWithInfoCaptures(t *testing.T) { + topo := buildTestTopologyWithRemoteVolumes() + + result, err := collectRemoteVolumesWithInfo(topo, "^col1$") + assert.NoError(t, err) + assert.Equal(t, 1, len(result)) + + rv := result[0] + assert.Equal(t, needle.VolumeId(1), rv.vid) + assert.Equal(t, "col1", rv.collection) + assert.Equal(t, "s3.default", rv.remoteStorageName) + assert.Equal(t, "server1:8080", rv.serverUrl) + assert.NotEmpty(t, rv.serverAddress) +} + +func TestCollectRemoteVolumesWithInfoInvalidPattern(t *testing.T) { + topo := buildTestTopologyWithRemoteVolumes() + + _, err := collectRemoteVolumesWithInfo(topo, "[invalid") + assert.Error(t, err) +}