From 4c88fbfd5e72cf64a7c26f515ab9fe708066d6ba Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Mar 2026 13:31:45 -0700 Subject: [PATCH 1/3] Fix nil pointer crash during concurrent vacuum compaction (#8592) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * check for nil needle map before compaction sync When CommitCompact runs concurrently, it sets v.nm = nil under dataFileAccessLock. CompactByIndex does not hold that lock, so v.nm.Sync() can hit a nil pointer. Add an early nil check to return an error instead of crashing. Fixes #8591 * guard copyDataBasedOnIndexFile size check against nil needle map The post-compaction size validation at line 538 accesses v.nm.ContentSize() and v.nm.DeletedSize(). If CommitCompact has concurrently set v.nm to nil, this causes a SIGSEGV. Skip the validation when v.nm is nil since the actual data copy uses local needle maps (oldNm/newNm) and is unaffected. Fixes #8591 * use atomic.Bool for compaction flags to prevent concurrent vacuum races The isCompacting and isCommitCompacting flags were plain bools read and written from multiple goroutines without synchronization. This allowed concurrent vacuums on the same volume to pass the guard checks and run simultaneously, leading to the nil pointer crash. Using atomic.Bool with CompareAndSwap ensures only one compaction or commit can run per volume at a time. Fixes #8591 * use go-version-file in CI workflows instead of hardcoded versions Use go-version-file: 'go.mod' so CI automatically picks up the Go version from go.mod, avoiding future version drift. Reordered checkout before setup-go in go.yml and e2e.yml so go.mod is available. Removed the now-unused GO_VERSION env vars. * capture v.nm locally in CompactByIndex to close TOCTOU race A bare nil check on v.nm followed by v.nm.Sync() has a race window where CommitCompact can set v.nm = nil between the two. Snapshot the pointer into a local variable so the nil check and Sync operate on the same reference. * add dynamic timeouts to plugin worker vacuum gRPC calls All vacuum gRPC calls used context.Background() with no deadline, so the plugin scheduler's execution timeout could kill a job while a large volume compact was still in progress. Use volume-size-scaled timeouts matching the topology vacuum approach: 3 min/GB for compact, 1 min/GB for check, commit, and cleanup. Fixes #8591 * Revert "add dynamic timeouts to plugin worker vacuum gRPC calls" This reverts commit 80951934c37416bc4f6c1472a5d3f8d204a637d9. * unify compaction lifecycle into single atomic flag Replace separate isCompacting and isCommitCompacting flags with a single isCompactionInProgress atomic.Bool. This ensures CompactBy*, CommitCompact, Close, and Destroy are mutually exclusive — only one can run at a time per volume. Key changes: - All entry points use CompareAndSwap(false, true) to claim exclusive access. CompactByVolumeData and CompactByIndex now also guard v.nm and v.DataBackend with local captures. - Close() waits for the flag outside dataFileAccessLock to avoid deadlocking with CommitCompact (which holds the flag while waiting for the lock). It claims the flag before acquiring the lock so no new compaction can start. - Destroy() uses CAS instead of a racy Load check, preventing concurrent compaction from racing with volume teardown. - unmountVolumeByCollection no longer deletes from the map; DeleteCollectionFromDiskLocation removes entries only after successful Destroy, preventing orphaned volumes on failure. Fixes #8591 --- .github/workflows/e2e.yml | 13 +++--- .github/workflows/go.yml | 33 +++++++-------- .../workflows/metadata-subscribe-tests.yml | 5 +-- .github/workflows/sftp-tests.yml | 5 +-- .../volume-server-integration-tests.yml | 5 +-- weed/storage/disk_location.go | 12 +++--- weed/storage/volume.go | 19 +++++---- weed/storage/volume_vacuum.go | 40 ++++++++++--------- weed/storage/volume_write.go | 2 +- 9 files changed, 67 insertions(+), 67 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 65a37805b..914c432d2 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -23,14 +23,13 @@ jobs: runs-on: ubuntu-22.04 timeout-minutes: 30 steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Set up Docker Buildx uses: docker/setup-buildx-action@v4 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b15556c1a..334e75ca0 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,13 +19,12 @@ jobs: name: Go Vet runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Get dependencies run: | cd weed; go get -v -t -d ./... @@ -42,13 +41,12 @@ jobs: name: Build runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Build run: cd weed; go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v . @@ -56,12 +54,11 @@ jobs: name: Test runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Test run: cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./... diff --git a/.github/workflows/metadata-subscribe-tests.yml b/.github/workflows/metadata-subscribe-tests.yml index bfeac2042..569d0e43c 100644 --- a/.github/workflows/metadata-subscribe-tests.yml +++ b/.github/workflows/metadata-subscribe-tests.yml @@ -30,7 +30,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '10m' jobs: @@ -43,10 +42,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Build SeaweedFS run: | diff --git a/.github/workflows/sftp-tests.yml b/.github/workflows/sftp-tests.yml index fe513835b..b5277ed50 100644 --- a/.github/workflows/sftp-tests.yml +++ b/.github/workflows/sftp-tests.yml @@ -24,7 +24,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '15m' jobs: @@ -37,10 +36,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Install dependencies run: | diff --git a/.github/workflows/volume-server-integration-tests.yml b/.github/workflows/volume-server-integration-tests.yml index fcb266ea8..70c6e21c8 100644 --- a/.github/workflows/volume-server-integration-tests.yml +++ b/.github/workflows/volume-server-integration-tests.yml @@ -28,7 +28,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '30m' jobs: @@ -46,10 +45,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Build SeaweedFS binary run: | diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9ff7d5e5b..b8cb85516 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -300,9 +300,13 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er var wg sync.WaitGroup wg.Add(2) go func() { - for _, v := range delVolsMap { + for k, v := range delVolsMap { if err := v.Destroy(false); err != nil { errChain <- err + } else { + l.volumesLock.Lock() + delete(l.volumes, k) + l.volumesLock.Unlock() } } wg.Done() @@ -383,14 +387,10 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { deltaVols := make(map[needle.VolumeId]*Volume, 0) for k, v := range l.volumes { - if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting { + if v.Collection == collectionName && !v.isCompactionInProgress.Load() { deltaVols[k] = v } } - - for k := range deltaVols { - delete(l.volumes, k) - } return deltaVols } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index dd8ecbdce..48149f4d9 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -5,6 +5,7 @@ import ( "path" "strconv" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -45,8 +46,7 @@ type Volume struct { lastCompactRevision uint16 ldbTimeout int64 - isCompacting bool - isCommitCompacting bool + isCompactionInProgress atomic.Bool volumeInfoRWLock sync.RWMutex volumeInfo *volume_server_pb.VolumeInfo @@ -224,6 +224,16 @@ func (v *Volume) SyncToDisk() { // Close cleanly shuts down this volume func (v *Volume) Close() { + // Wait for any in-progress compaction to finish and claim the flag so no + // new compaction can start. This must happen BEFORE acquiring + // dataFileAccessLock to avoid deadlocking with CommitCompact which holds + // the flag while waiting for the lock. + for !v.isCompactionInProgress.CompareAndSwap(false, true) { + time.Sleep(521 * time.Millisecond) + glog.Warningf("Volume Close wait for compaction %d", v.Id) + } + defer v.isCompactionInProgress.Store(false) + v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -231,11 +241,6 @@ func (v *Volume) Close() { } func (v *Volume) doClose() { - for v.isCommitCompacting { - time.Sleep(521 * time.Millisecond) - glog.Warningf("Volume Close wait for compaction %d", v.Id) - } - if v.nm != nil { if err := v.nm.Sync(); err != nil { glog.Warningf("Volume Close fail to sync volume idx %d", v.Id) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 2c20ce634..e97342597 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -64,22 +64,26 @@ func (v *Volume) CompactByVolumeData(opts *CompactOptions) error { //v.accessLock.Lock() //defer v.accessLock.Unlock() //glog.V(3).Infof("Got Compaction lock...") - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { glog.V(0).Infof("Volume %d is already compacting...", v.Id) return nil } - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) + if v.DataBackend == nil { + return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) + } + nm := v.nm + if nm == nil { + return fmt.Errorf("volume %d needle map is nil", v.Id) + } if err := v.DataBackend.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume %d", v.Id) } - if err := v.nm.Sync(); err != nil { + if err := nm.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume idx %d", v.Id) } @@ -102,14 +106,11 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error { } glog.V(3).Infof("Compact2 volume %d ...", v.Id) - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { glog.V(0).Infof("Volume %d is already compacting2 ...", v.Id) return nil } - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision @@ -117,10 +118,14 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error { if v.DataBackend == nil { return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) } + nm := v.nm + if nm == nil { + return fmt.Errorf("volume %d needle map is nil", v.Id) + } if err := v.DataBackend.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err) } - if err := v.nm.Sync(); err != nil { + if err := nm.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err) } @@ -139,14 +144,11 @@ func (v *Volume) CommitCompact() error { } glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) - if v.isCommitCompacting { - glog.V(0).Infof("Volume %d is already commit compacting ...", v.Id) + if !v.isCompactionInProgress.CompareAndSwap(false, true) { + glog.V(0).Infof("Volume %d is already compacting ...", v.Id) return nil } - v.isCommitCompacting = true - defer func() { - v.isCommitCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -530,7 +532,7 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { if err != nil { return err } - if v.Ttl.String() == "" { + if v.Ttl.String() == "" && v.nm != nil { dstDatSize, _, err := dstDatBackend.GetStat() if err != nil { return err diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index 8cb00bc15..2f832e1f7 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -70,7 +70,7 @@ func (v *Volume) Destroy(onlyEmpty bool) (err error) { return } } - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { err = fmt.Errorf("volume %d is compacting", v.Id) return } From b17e2b411a183369bc21eb4b37e57cc90309efb8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Mar 2026 13:48:42 -0700 Subject: [PATCH 2/3] Add dynamic timeouts to plugin worker vacuum gRPC calls (#8593) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add dynamic timeouts to plugin worker vacuum gRPC calls All vacuum gRPC calls used context.Background() with no deadline, so the plugin scheduler's execution timeout could kill a job while a large volume compact was still in progress. Use volume-size-scaled timeouts matching the topology vacuum approach: 3 min/GB for compact, 1 min/GB for check, commit, and cleanup. Fixes #8591 * scale scheduler execution timeout by volume size The scheduler's per-job execution timeout (default 240s) would kill vacuum jobs on large volumes before they finish. Three changes: 1. Vacuum detection now includes estimated_runtime_seconds in job proposals, computed as 5 min/GB of volume size. 2. The scheduler checks for estimated_runtime_seconds in job parameters and uses it as the execution timeout when larger than the default — a generic mechanism any handler can use. 3. Vacuum task gRPC calls now use the passed-in ctx as parent instead of context.Background(), so scheduler cancellation propagates to in-flight RPCs. * extend job type runtime when proposals need more time The JobTypeMaxRuntime (default 30 min) wraps both detection and execution. Its context is the parent of all per-job execution contexts, so even with per-job estimated_runtime_seconds, jobCtx would cancel everything when it expires. After detection, scan proposals for the maximum estimated_runtime_seconds. If any proposal needs more time than the remaining JobTypeMaxRuntime, create a new execution context with enough headroom. This lets large vacuum jobs complete without being killed by the job type deadline while still respecting the configured limit for normal-sized jobs. * log missing volume size metric, remove dead minimum runtime guard Add a debug log in vacuumTimeout when t.volumeSize is 0 so operators can investigate why metrics are missing for a volume. Remove the unreachable estimatedRuntimeSeconds < 180 check in buildVacuumProposal — volumeSizeGB always >= 1 (due to +1 floor), so estimatedRuntimeSeconds is always >= 300. * cap estimated runtime and fix status check context - Cap maxEstimatedRuntime and per-job timeout overrides to 8 hours to prevent unbounded timeouts from bad metrics. - Check execCtx.Err() instead of jobCtx.Err() for status reporting, since dispatch runs under execCtx which may have a longer deadline. A successful dispatch under execCtx was misreported as "timeout" when jobCtx had expired. --- weed/admin/plugin/plugin_scheduler.go | 55 +++++++++++++++++++++++-- weed/plugin/worker/vacuum_handler.go | 7 ++++ weed/worker/tasks/vacuum/vacuum_task.go | 51 ++++++++++++++++------- 3 files changed, 96 insertions(+), 17 deletions(-) diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 5b6eba7ab..00c43283e 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -32,6 +32,7 @@ const ( defaultClusterContextTimeout = 10 * time.Second defaultWaitingBacklogFloor = 8 defaultWaitingBacklogMultiplier = 4 + maxEstimatedRuntimeCap = 8 * time.Hour ) type schedulerPolicy struct { @@ -293,6 +294,26 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo r.setSchedulerLoopState(jobType, "executing") + // Scan proposals for the maximum estimated_runtime_seconds so the + // execution phase gets enough time for large jobs (e.g. vacuum on + // big volumes). If any proposal needs more time than the remaining + // JobTypeMaxRuntime, extend the execution context accordingly. + var maxEstimatedRuntime time.Duration + for _, p := range filtered { + if p.Parameters != nil { + if est, ok := p.Parameters["estimated_runtime_seconds"]; ok { + if v := est.GetInt64Value(); v > 0 { + if d := time.Duration(v) * time.Second; d > maxEstimatedRuntime { + maxEstimatedRuntime = d + } + } + } + } + } + if maxEstimatedRuntime > maxEstimatedRuntimeCap { + maxEstimatedRuntime = maxEstimatedRuntimeCap + } + remaining = time.Until(start.Add(maxRuntime)) if remaining <= 0 { r.appendActivity(JobActivity{ @@ -306,6 +327,17 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo return detected } + // If the longest estimated job exceeds the remaining JobTypeMaxRuntime, + // create a new execution context with enough headroom instead of using + // jobCtx which would cancel too early. + execCtx := jobCtx + execCancel := context.CancelFunc(func() {}) + if maxEstimatedRuntime > 0 && maxEstimatedRuntime > remaining { + execCtx, execCancel = context.WithTimeout(context.Background(), maxEstimatedRuntime) + remaining = maxEstimatedRuntime + } + defer execCancel() + execPolicy := policy if execPolicy.ExecutionTimeout <= 0 { execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout @@ -314,10 +346,10 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo execPolicy.ExecutionTimeout = remaining } - successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy) + successCount, errorCount, canceledCount := r.dispatchScheduledProposals(execCtx, jobType, filtered, clusterContext, execPolicy) status := "success" - if jobCtx.Err() != nil { + if execCtx.Err() != nil { status = "timeout" } else if errorCount > 0 || canceledCount > 0 { status = "error" @@ -937,7 +969,24 @@ func (r *Plugin) executeScheduledJobWithExecutor( if parent == nil { parent = context.Background() } - execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout) + // Use the job's estimated runtime if provided and larger than the + // default execution timeout. This lets handlers like vacuum scale + // the timeout based on volume size so large volumes are not killed. + timeout := policy.ExecutionTimeout + if job.Parameters != nil { + if est, ok := job.Parameters["estimated_runtime_seconds"]; ok { + if v := est.GetInt64Value(); v > 0 { + estimated := time.Duration(v) * time.Second + if estimated > maxEstimatedRuntimeCap { + estimated = maxEstimatedRuntimeCap + } + if estimated > timeout { + timeout = estimated + } + } + } + } + execCtx, cancel := context.WithTimeout(parent, timeout) _, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt)) cancel() if err == nil { diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index d1a93d299..1ad58b30d 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -544,6 +544,10 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo summary = summary + " on " + result.Server } + // Estimate runtime: 5 min/GB (compact + commit + overhead) + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -564,6 +568,9 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "vacuum", diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index 4b890fada..3b226334b 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -25,6 +25,7 @@ type VacuumTask struct { garbageThreshold float64 progress float64 grpcDialOption grpc.DialOption + volumeSize uint64 } // NewVacuumTask creates a new unified vacuum task instance @@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) } t.garbageThreshold = vacuumParams.GarbageThreshold + t.volumeSize = params.VolumeSize t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, @@ -62,7 +64,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) // Step 1: Check volume status and garbage ratio t.ReportProgress(10.0) t.GetLogger().Info("Checking volume status") - eligible, currentGarbageRatio, err := t.checkVacuumEligibility() + eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx) if err != nil { return fmt.Errorf("failed to check vacuum eligibility: %v", err) } @@ -83,14 +85,14 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) "threshold": t.garbageThreshold, }).Info("Performing vacuum operation") - if err := t.performVacuum(); err != nil { + if err := t.performVacuum(ctx); err != nil { return fmt.Errorf("failed to perform vacuum: %v", err) } // Step 3: Verify vacuum results t.ReportProgress(90.0) t.GetLogger().Info("Verifying vacuum results") - if err := t.verifyVacuumResults(); err != nil { + if err := t.verifyVacuumResults(ctx); err != nil { glog.Warningf("Vacuum verification failed: %v", err) // Don't fail the task - vacuum operation itself succeeded } @@ -146,15 +148,28 @@ func (t *VacuumTask) GetProgress() float64 { return t.progress } +// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the +// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for +// check, 3 minutes for compact). +func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration { + if t.volumeSize == 0 { + glog.V(1).Infof("volume %d has no size metric, using minimum timeout", t.volumeID) + } + sizeGB := int64(t.volumeSize/1024/1024/1024) + 1 + return base * time.Duration(sizeGB) +} + // Helper methods for real vacuum operations // checkVacuumEligibility checks if the volume meets vacuum criteria -func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { +func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) { var garbageRatio float64 err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(checkCtx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -178,12 +193,14 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { } // performVacuum executes the actual vacuum operation -func (t *VacuumTask) performVacuum() error { +func (t *VacuumTask) performVacuum(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume + // Step 1: Compact the volume (3 min per GB, matching topology vacuum) t.GetLogger().Info("Compacting volume") - stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) + defer compactCancel() + stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -202,18 +219,22 @@ func (t *VacuumTask) performVacuum() error { glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) } - // Step 2: Commit the vacuum + // Step 2: Commit the vacuum (1 min per GB) t.GetLogger().Info("Committing vacuum operation") - _, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer commitCancel() + _, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: t.volumeID, }) if err != nil { return fmt.Errorf("vacuum commit failed: %v", err) } - // Step 3: Cleanup old files + // Step 3: Cleanup old files (1 min per GB) t.GetLogger().Info("Cleaning up vacuum files") - _, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ + cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cleanupCancel() + _, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -226,10 +247,12 @@ func (t *VacuumTask) performVacuum() error { } // verifyVacuumResults checks the volume status after vacuum -func (t *VacuumTask) verifyVacuumResults() error { +func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil { From 47cad59c70b5a415a350dbd9e487bab21ff35bc5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Mar 2026 15:55:14 -0700 Subject: [PATCH 3/3] Remove misleading Workers sub-menu items from admin sidebar (#8594) * Remove misleading Workers sub-menu items from admin sidebar The sidebar sub-items (Job Detection, Job Queue, Job Execution, Configuration) always navigated to the first job type's tabs (typically EC Encoding) rather than showing cross-job-type views. This was confusing as noted in #8590. Since the in-page tabs already provide this navigation, remove the redundant sidebar sub-items and keep only the top-level Workers link. Fixes #8590 * Update layout_templ.go --- weed/admin/view/layout/layout.templ | 44 ------------ weed/admin/view/layout/layout_templ.go | 98 +++++--------------------- 2 files changed, 19 insertions(+), 123 deletions(-) diff --git a/weed/admin/view/layout/layout.templ b/weed/admin/view/layout/layout.templ index d717548fa..44277f185 100644 --- a/weed/admin/view/layout/layout.templ +++ b/weed/admin/view/layout/layout.templ @@ -269,50 +269,6 @@ templ Layout(view ViewContext, content templ.Component) { } - - - - diff --git a/weed/admin/view/layout/layout_templ.go b/weed/admin/view/layout/layout_templ.go index ed7b510eb..913ed03b0 100644 --- a/weed/admin/view/layout/layout_templ.go +++ b/weed/admin/view/layout/layout_templ.go @@ -269,67 +269,7 @@ func Layout(view ViewContext, content templ.Component) templ.Component { return templ_7745c5c3_Err } } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 27, "
  • ") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - if currentPath == "/plugin/detection" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 28, "Job Detection") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } else { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 29, "Job Detection") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 30, "
  • ") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - if currentPath == "/plugin/queue" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 31, "Job Queue") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } else { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 32, "Job Queue") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 33, "
  • ") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - if currentPath == "/plugin/execution" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 34, "Job Execution") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } else { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 35, "Job Execution") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 36, "
  • ") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - if currentPath == "/plugin/configuration" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 37, "Configuration") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } else { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 38, "Configuration") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } - } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 39, "
  • ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 27, "
    ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } @@ -337,43 +277,43 @@ func Layout(view ViewContext, content templ.Component) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 40, "
    © ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 28, "
    © ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var14 string templ_7745c5c3_Var14, templ_7745c5c3_Err = templ.JoinStringErrs(fmt.Sprintf("%d", time.Now().Year())) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 336, Col: 60} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 292, Col: 60} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var14)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 41, " SeaweedFS Admin v") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 29, " SeaweedFS Admin v") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var15 string templ_7745c5c3_Var15, templ_7745c5c3_Err = templ.JoinStringErrs(version.VERSION_NUMBER) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 336, Col: 102} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 292, Col: 102} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var15)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 42, " ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 30, " ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } if !strings.Contains(version.VERSION, "enterprise") { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 43, " Enterprise Version Available") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 31, " Enterprise Version Available") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 44, "
    ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 32, "") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } @@ -402,69 +342,69 @@ func LoginForm(title string, errorMessage string, csrfToken string) templ.Compon templ_7745c5c3_Var16 = templ.NopComponent } ctx = templ.ClearChildren(ctx) - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 45, "") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 33, "<!doctype html><html lang=\"en\"><head><meta charset=\"UTF-8\"><title>") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var17 string templ_7745c5c3_Var17, templ_7745c5c3_Err = templ.JoinStringErrs(title) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 364, Col: 17} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 320, Col: 17} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var17)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 46, " - Login

    ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 34, " - Login

    ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var18 string templ_7745c5c3_Var18, templ_7745c5c3_Err = templ.JoinStringErrs(title) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 378, Col: 57} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 334, Col: 57} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var18)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 47, "

    Please sign in to continue

    ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 35, "

    Please sign in to continue

    ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } if errorMessage != "" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 48, "
    ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 36, "
    ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var19 string templ_7745c5c3_Var19, templ_7745c5c3_Err = templ.JoinStringErrs(errorMessage) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 385, Col: 45} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/layout/layout.templ`, Line: 341, Col: 45} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var19)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 49, "
    ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 37, "
    ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 50, "
    ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 39, "\">
    ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err }