diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 65a37805b..914c432d2 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -23,14 +23,13 @@ jobs: runs-on: ubuntu-22.04 timeout-minutes: 30 steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Set up Docker Buildx uses: docker/setup-buildx-action@v4 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b15556c1a..334e75ca0 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,13 +19,12 @@ jobs: name: Go Vet runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Get dependencies run: | cd weed; go get -v -t -d ./... @@ -42,13 +41,12 @@ jobs: name: Build runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Build run: cd weed; go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v . @@ -56,12 +54,11 @@ jobs: name: Test runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Test run: cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./... diff --git a/.github/workflows/metadata-subscribe-tests.yml b/.github/workflows/metadata-subscribe-tests.yml index bfeac2042..569d0e43c 100644 --- a/.github/workflows/metadata-subscribe-tests.yml +++ b/.github/workflows/metadata-subscribe-tests.yml @@ -30,7 +30,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '10m' jobs: @@ -43,10 +42,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Build SeaweedFS run: | diff --git a/.github/workflows/sftp-tests.yml b/.github/workflows/sftp-tests.yml index fe513835b..b5277ed50 100644 --- a/.github/workflows/sftp-tests.yml +++ b/.github/workflows/sftp-tests.yml @@ -24,7 +24,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '15m' jobs: @@ -37,10 +36,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Install dependencies run: | diff --git a/.github/workflows/volume-server-integration-tests.yml b/.github/workflows/volume-server-integration-tests.yml index fcb266ea8..70c6e21c8 100644 --- a/.github/workflows/volume-server-integration-tests.yml +++ b/.github/workflows/volume-server-integration-tests.yml @@ -28,7 +28,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '30m' jobs: @@ -46,10 +45,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Build SeaweedFS binary run: | diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 5b6eba7ab..00c43283e 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -32,6 +32,7 @@ const ( defaultClusterContextTimeout = 10 * time.Second defaultWaitingBacklogFloor = 8 defaultWaitingBacklogMultiplier = 4 + maxEstimatedRuntimeCap = 8 * time.Hour ) type schedulerPolicy struct { @@ -293,6 +294,26 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo r.setSchedulerLoopState(jobType, "executing") + // Scan proposals for the maximum estimated_runtime_seconds so the + // execution phase gets enough time for large jobs (e.g. vacuum on + // big volumes). If any proposal needs more time than the remaining + // JobTypeMaxRuntime, extend the execution context accordingly. + var maxEstimatedRuntime time.Duration + for _, p := range filtered { + if p.Parameters != nil { + if est, ok := p.Parameters["estimated_runtime_seconds"]; ok { + if v := est.GetInt64Value(); v > 0 { + if d := time.Duration(v) * time.Second; d > maxEstimatedRuntime { + maxEstimatedRuntime = d + } + } + } + } + } + if maxEstimatedRuntime > maxEstimatedRuntimeCap { + maxEstimatedRuntime = maxEstimatedRuntimeCap + } + remaining = time.Until(start.Add(maxRuntime)) if remaining <= 0 { r.appendActivity(JobActivity{ @@ -306,6 +327,17 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo return detected } + // If the longest estimated job exceeds the remaining JobTypeMaxRuntime, + // create a new execution context with enough headroom instead of using + // jobCtx which would cancel too early. + execCtx := jobCtx + execCancel := context.CancelFunc(func() {}) + if maxEstimatedRuntime > 0 && maxEstimatedRuntime > remaining { + execCtx, execCancel = context.WithTimeout(context.Background(), maxEstimatedRuntime) + remaining = maxEstimatedRuntime + } + defer execCancel() + execPolicy := policy if execPolicy.ExecutionTimeout <= 0 { execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout @@ -314,10 +346,10 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo execPolicy.ExecutionTimeout = remaining } - successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy) + successCount, errorCount, canceledCount := r.dispatchScheduledProposals(execCtx, jobType, filtered, clusterContext, execPolicy) status := "success" - if jobCtx.Err() != nil { + if execCtx.Err() != nil { status = "timeout" } else if errorCount > 0 || canceledCount > 0 { status = "error" @@ -937,7 +969,24 @@ func (r *Plugin) executeScheduledJobWithExecutor( if parent == nil { parent = context.Background() } - execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout) + // Use the job's estimated runtime if provided and larger than the + // default execution timeout. This lets handlers like vacuum scale + // the timeout based on volume size so large volumes are not killed. + timeout := policy.ExecutionTimeout + if job.Parameters != nil { + if est, ok := job.Parameters["estimated_runtime_seconds"]; ok { + if v := est.GetInt64Value(); v > 0 { + estimated := time.Duration(v) * time.Second + if estimated > maxEstimatedRuntimeCap { + estimated = maxEstimatedRuntimeCap + } + if estimated > timeout { + timeout = estimated + } + } + } + } + execCtx, cancel := context.WithTimeout(parent, timeout) _, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt)) cancel() if err == nil { diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index d1a93d299..1ad58b30d 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -544,6 +544,10 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo summary = summary + " on " + result.Server } + // Estimate runtime: 5 min/GB (compact + commit + overhead) + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -564,6 +568,9 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "vacuum", diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9ff7d5e5b..b8cb85516 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -300,9 +300,13 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er var wg sync.WaitGroup wg.Add(2) go func() { - for _, v := range delVolsMap { + for k, v := range delVolsMap { if err := v.Destroy(false); err != nil { errChain <- err + } else { + l.volumesLock.Lock() + delete(l.volumes, k) + l.volumesLock.Unlock() } } wg.Done() @@ -383,14 +387,10 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { deltaVols := make(map[needle.VolumeId]*Volume, 0) for k, v := range l.volumes { - if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting { + if v.Collection == collectionName && !v.isCompactionInProgress.Load() { deltaVols[k] = v } } - - for k := range deltaVols { - delete(l.volumes, k) - } return deltaVols } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index dd8ecbdce..48149f4d9 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -5,6 +5,7 @@ import ( "path" "strconv" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -45,8 +46,7 @@ type Volume struct { lastCompactRevision uint16 ldbTimeout int64 - isCompacting bool - isCommitCompacting bool + isCompactionInProgress atomic.Bool volumeInfoRWLock sync.RWMutex volumeInfo *volume_server_pb.VolumeInfo @@ -224,6 +224,16 @@ func (v *Volume) SyncToDisk() { // Close cleanly shuts down this volume func (v *Volume) Close() { + // Wait for any in-progress compaction to finish and claim the flag so no + // new compaction can start. This must happen BEFORE acquiring + // dataFileAccessLock to avoid deadlocking with CommitCompact which holds + // the flag while waiting for the lock. + for !v.isCompactionInProgress.CompareAndSwap(false, true) { + time.Sleep(521 * time.Millisecond) + glog.Warningf("Volume Close wait for compaction %d", v.Id) + } + defer v.isCompactionInProgress.Store(false) + v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -231,11 +241,6 @@ func (v *Volume) Close() { } func (v *Volume) doClose() { - for v.isCommitCompacting { - time.Sleep(521 * time.Millisecond) - glog.Warningf("Volume Close wait for compaction %d", v.Id) - } - if v.nm != nil { if err := v.nm.Sync(); err != nil { glog.Warningf("Volume Close fail to sync volume idx %d", v.Id) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 2c20ce634..e97342597 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -64,22 +64,26 @@ func (v *Volume) CompactByVolumeData(opts *CompactOptions) error { //v.accessLock.Lock() //defer v.accessLock.Unlock() //glog.V(3).Infof("Got Compaction lock...") - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { glog.V(0).Infof("Volume %d is already compacting...", v.Id) return nil } - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) + if v.DataBackend == nil { + return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) + } + nm := v.nm + if nm == nil { + return fmt.Errorf("volume %d needle map is nil", v.Id) + } if err := v.DataBackend.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume %d", v.Id) } - if err := v.nm.Sync(); err != nil { + if err := nm.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume idx %d", v.Id) } @@ -102,14 +106,11 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error { } glog.V(3).Infof("Compact2 volume %d ...", v.Id) - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { glog.V(0).Infof("Volume %d is already compacting2 ...", v.Id) return nil } - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision @@ -117,10 +118,14 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error { if v.DataBackend == nil { return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) } + nm := v.nm + if nm == nil { + return fmt.Errorf("volume %d needle map is nil", v.Id) + } if err := v.DataBackend.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err) } - if err := v.nm.Sync(); err != nil { + if err := nm.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err) } @@ -139,14 +144,11 @@ func (v *Volume) CommitCompact() error { } glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) - if v.isCommitCompacting { - glog.V(0).Infof("Volume %d is already commit compacting ...", v.Id) + if !v.isCompactionInProgress.CompareAndSwap(false, true) { + glog.V(0).Infof("Volume %d is already compacting ...", v.Id) return nil } - v.isCommitCompacting = true - defer func() { - v.isCommitCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -530,7 +532,7 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { if err != nil { return err } - if v.Ttl.String() == "" { + if v.Ttl.String() == "" && v.nm != nil { dstDatSize, _, err := dstDatBackend.GetStat() if err != nil { return err diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index 8cb00bc15..2f832e1f7 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -70,7 +70,7 @@ func (v *Volume) Destroy(onlyEmpty bool) (err error) { return } } - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { err = fmt.Errorf("volume %d is compacting", v.Id) return } diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index 4b890fada..3b226334b 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -25,6 +25,7 @@ type VacuumTask struct { garbageThreshold float64 progress float64 grpcDialOption grpc.DialOption + volumeSize uint64 } // NewVacuumTask creates a new unified vacuum task instance @@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) } t.garbageThreshold = vacuumParams.GarbageThreshold + t.volumeSize = params.VolumeSize t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, @@ -62,7 +64,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) // Step 1: Check volume status and garbage ratio t.ReportProgress(10.0) t.GetLogger().Info("Checking volume status") - eligible, currentGarbageRatio, err := t.checkVacuumEligibility() + eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx) if err != nil { return fmt.Errorf("failed to check vacuum eligibility: %v", err) } @@ -83,14 +85,14 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) "threshold": t.garbageThreshold, }).Info("Performing vacuum operation") - if err := t.performVacuum(); err != nil { + if err := t.performVacuum(ctx); err != nil { return fmt.Errorf("failed to perform vacuum: %v", err) } // Step 3: Verify vacuum results t.ReportProgress(90.0) t.GetLogger().Info("Verifying vacuum results") - if err := t.verifyVacuumResults(); err != nil { + if err := t.verifyVacuumResults(ctx); err != nil { glog.Warningf("Vacuum verification failed: %v", err) // Don't fail the task - vacuum operation itself succeeded } @@ -146,15 +148,28 @@ func (t *VacuumTask) GetProgress() float64 { return t.progress } +// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the +// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for +// check, 3 minutes for compact). +func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration { + if t.volumeSize == 0 { + glog.V(1).Infof("volume %d has no size metric, using minimum timeout", t.volumeID) + } + sizeGB := int64(t.volumeSize/1024/1024/1024) + 1 + return base * time.Duration(sizeGB) +} + // Helper methods for real vacuum operations // checkVacuumEligibility checks if the volume meets vacuum criteria -func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { +func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) { var garbageRatio float64 err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(checkCtx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -178,12 +193,14 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { } // performVacuum executes the actual vacuum operation -func (t *VacuumTask) performVacuum() error { +func (t *VacuumTask) performVacuum(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume + // Step 1: Compact the volume (3 min per GB, matching topology vacuum) t.GetLogger().Info("Compacting volume") - stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) + defer compactCancel() + stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -202,18 +219,22 @@ func (t *VacuumTask) performVacuum() error { glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) } - // Step 2: Commit the vacuum + // Step 2: Commit the vacuum (1 min per GB) t.GetLogger().Info("Committing vacuum operation") - _, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer commitCancel() + _, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: t.volumeID, }) if err != nil { return fmt.Errorf("vacuum commit failed: %v", err) } - // Step 3: Cleanup old files + // Step 3: Cleanup old files (1 min per GB) t.GetLogger().Info("Cleaning up vacuum files") - _, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ + cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cleanupCancel() + _, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -226,10 +247,12 @@ func (t *VacuumTask) performVacuum() error { } // verifyVacuumResults checks the volume status after vacuum -func (t *VacuumTask) verifyVacuumResults() error { +func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil {