Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/8596/head
Chris Lu 3 days ago
parent
commit
b20eae697e
  1. 13
      .github/workflows/e2e.yml
  2. 33
      .github/workflows/go.yml
  3. 5
      .github/workflows/metadata-subscribe-tests.yml
  4. 5
      .github/workflows/sftp-tests.yml
  5. 5
      .github/workflows/volume-server-integration-tests.yml
  6. 55
      weed/admin/plugin/plugin_scheduler.go
  7. 7
      weed/plugin/worker/vacuum_handler.go
  8. 12
      weed/storage/disk_location.go
  9. 19
      weed/storage/volume.go
  10. 40
      weed/storage/volume_vacuum.go
  11. 2
      weed/storage/volume_write.go
  12. 51
      weed/worker/tasks/vacuum/vacuum_task.go

13
.github/workflows/e2e.yml

@ -23,14 +23,13 @@ jobs:
runs-on: ubuntu-22.04
timeout-minutes: 30
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v4

33
.github/workflows/go.yml

@ -19,13 +19,12 @@ jobs:
name: Go Vet
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: Get dependencies
run: |
cd weed; go get -v -t -d ./...
@ -42,13 +41,12 @@ jobs:
name: Build
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: Build
run: cd weed; go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v .
@ -56,12 +54,11 @@ jobs:
name: Test
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: Test
run: cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./...

5
.github/workflows/metadata-subscribe-tests.yml

@ -30,7 +30,6 @@ permissions:
contents: read
env:
GO_VERSION: '1.24'
TEST_TIMEOUT: '10m'
jobs:
@ -43,10 +42,10 @@ jobs:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go ${{ env.GO_VERSION }}
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: ${{ env.GO_VERSION }}
go-version-file: 'go.mod'
- name: Build SeaweedFS
run: |

5
.github/workflows/sftp-tests.yml

@ -24,7 +24,6 @@ permissions:
contents: read
env:
GO_VERSION: '1.24'
TEST_TIMEOUT: '15m'
jobs:
@ -37,10 +36,10 @@ jobs:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go ${{ env.GO_VERSION }}
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: ${{ env.GO_VERSION }}
go-version-file: 'go.mod'
- name: Install dependencies
run: |

5
.github/workflows/volume-server-integration-tests.yml

@ -28,7 +28,6 @@ permissions:
contents: read
env:
GO_VERSION: '1.24'
TEST_TIMEOUT: '30m'
jobs:
@ -46,10 +45,10 @@ jobs:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go ${{ env.GO_VERSION }}
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: ${{ env.GO_VERSION }}
go-version-file: 'go.mod'
- name: Build SeaweedFS binary
run: |

55
weed/admin/plugin/plugin_scheduler.go

@ -32,6 +32,7 @@ const (
defaultClusterContextTimeout = 10 * time.Second
defaultWaitingBacklogFloor = 8
defaultWaitingBacklogMultiplier = 4
maxEstimatedRuntimeCap = 8 * time.Hour
)
type schedulerPolicy struct {
@ -293,6 +294,26 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
r.setSchedulerLoopState(jobType, "executing")
// Scan proposals for the maximum estimated_runtime_seconds so the
// execution phase gets enough time for large jobs (e.g. vacuum on
// big volumes). If any proposal needs more time than the remaining
// JobTypeMaxRuntime, extend the execution context accordingly.
var maxEstimatedRuntime time.Duration
for _, p := range filtered {
if p.Parameters != nil {
if est, ok := p.Parameters["estimated_runtime_seconds"]; ok {
if v := est.GetInt64Value(); v > 0 {
if d := time.Duration(v) * time.Second; d > maxEstimatedRuntime {
maxEstimatedRuntime = d
}
}
}
}
}
if maxEstimatedRuntime > maxEstimatedRuntimeCap {
maxEstimatedRuntime = maxEstimatedRuntimeCap
}
remaining = time.Until(start.Add(maxRuntime))
if remaining <= 0 {
r.appendActivity(JobActivity{
@ -306,6 +327,17 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
return detected
}
// If the longest estimated job exceeds the remaining JobTypeMaxRuntime,
// create a new execution context with enough headroom instead of using
// jobCtx which would cancel too early.
execCtx := jobCtx
execCancel := context.CancelFunc(func() {})
if maxEstimatedRuntime > 0 && maxEstimatedRuntime > remaining {
execCtx, execCancel = context.WithTimeout(context.Background(), maxEstimatedRuntime)
remaining = maxEstimatedRuntime
}
defer execCancel()
execPolicy := policy
if execPolicy.ExecutionTimeout <= 0 {
execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout
@ -314,10 +346,10 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
execPolicy.ExecutionTimeout = remaining
}
successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy)
successCount, errorCount, canceledCount := r.dispatchScheduledProposals(execCtx, jobType, filtered, clusterContext, execPolicy)
status := "success"
if jobCtx.Err() != nil {
if execCtx.Err() != nil {
status = "timeout"
} else if errorCount > 0 || canceledCount > 0 {
status = "error"
@ -937,7 +969,24 @@ func (r *Plugin) executeScheduledJobWithExecutor(
if parent == nil {
parent = context.Background()
}
execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout)
// Use the job's estimated runtime if provided and larger than the
// default execution timeout. This lets handlers like vacuum scale
// the timeout based on volume size so large volumes are not killed.
timeout := policy.ExecutionTimeout
if job.Parameters != nil {
if est, ok := job.Parameters["estimated_runtime_seconds"]; ok {
if v := est.GetInt64Value(); v > 0 {
estimated := time.Duration(v) * time.Second
if estimated > maxEstimatedRuntimeCap {
estimated = maxEstimatedRuntimeCap
}
if estimated > timeout {
timeout = estimated
}
}
}
}
execCtx, cancel := context.WithTimeout(parent, timeout)
_, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt))
cancel()
if err == nil {

7
weed/plugin/worker/vacuum_handler.go

@ -544,6 +544,10 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo
summary = summary + " on " + result.Server
}
// Estimate runtime: 5 min/GB (compact + commit + overhead)
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
estimatedRuntimeSeconds := volumeSizeGB * 5 * 60
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: dedupeKey,
@ -564,6 +568,9 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
},
"estimated_runtime_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
},
},
Labels: map[string]string{
"task_type": "vacuum",

12
weed/storage/disk_location.go

@ -300,9 +300,13 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
var wg sync.WaitGroup
wg.Add(2)
go func() {
for _, v := range delVolsMap {
for k, v := range delVolsMap {
if err := v.Destroy(false); err != nil {
errChain <- err
} else {
l.volumesLock.Lock()
delete(l.volumes, k)
l.volumesLock.Unlock()
}
}
wg.Done()
@ -383,14 +387,10 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
deltaVols := make(map[needle.VolumeId]*Volume, 0)
for k, v := range l.volumes {
if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
if v.Collection == collectionName && !v.isCompactionInProgress.Load() {
deltaVols[k] = v
}
}
for k := range deltaVols {
delete(l.volumes, k)
}
return deltaVols
}

19
weed/storage/volume.go

@ -5,6 +5,7 @@ import (
"path"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -45,8 +46,7 @@ type Volume struct {
lastCompactRevision uint16
ldbTimeout int64
isCompacting bool
isCommitCompacting bool
isCompactionInProgress atomic.Bool
volumeInfoRWLock sync.RWMutex
volumeInfo *volume_server_pb.VolumeInfo
@ -224,6 +224,16 @@ func (v *Volume) SyncToDisk() {
// Close cleanly shuts down this volume
func (v *Volume) Close() {
// Wait for any in-progress compaction to finish and claim the flag so no
// new compaction can start. This must happen BEFORE acquiring
// dataFileAccessLock to avoid deadlocking with CommitCompact which holds
// the flag while waiting for the lock.
for !v.isCompactionInProgress.CompareAndSwap(false, true) {
time.Sleep(521 * time.Millisecond)
glog.Warningf("Volume Close wait for compaction %d", v.Id)
}
defer v.isCompactionInProgress.Store(false)
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@ -231,11 +241,6 @@ func (v *Volume) Close() {
}
func (v *Volume) doClose() {
for v.isCommitCompacting {
time.Sleep(521 * time.Millisecond)
glog.Warningf("Volume Close wait for compaction %d", v.Id)
}
if v.nm != nil {
if err := v.nm.Sync(); err != nil {
glog.Warningf("Volume Close fail to sync volume idx %d", v.Id)

40
weed/storage/volume_vacuum.go

@ -64,22 +64,26 @@ func (v *Volume) CompactByVolumeData(opts *CompactOptions) error {
//v.accessLock.Lock()
//defer v.accessLock.Unlock()
//glog.V(3).Infof("Got Compaction lock...")
if v.isCompacting || v.isCommitCompacting {
if !v.isCompactionInProgress.CompareAndSwap(false, true) {
glog.V(0).Infof("Volume %d is already compacting...", v.Id)
return nil
}
v.isCompacting = true
defer func() {
v.isCompacting = false
}()
defer v.isCompactionInProgress.Store(false)
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
if v.DataBackend == nil {
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
}
nm := v.nm
if nm == nil {
return fmt.Errorf("volume %d needle map is nil", v.Id)
}
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact failed to sync volume %d", v.Id)
}
if err := v.nm.Sync(); err != nil {
if err := nm.Sync(); err != nil {
glog.V(0).Infof("compact failed to sync volume idx %d", v.Id)
}
@ -102,14 +106,11 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error {
}
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
if v.isCompacting || v.isCommitCompacting {
if !v.isCompactionInProgress.CompareAndSwap(false, true) {
glog.V(0).Infof("Volume %d is already compacting2 ...", v.Id)
return nil
}
v.isCompacting = true
defer func() {
v.isCompacting = false
}()
defer v.isCompactionInProgress.Store(false)
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
@ -117,10 +118,14 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error {
if v.DataBackend == nil {
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
}
nm := v.nm
if nm == nil {
return fmt.Errorf("volume %d needle map is nil", v.Id)
}
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err)
}
if err := v.nm.Sync(); err != nil {
if err := nm.Sync(); err != nil {
glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err)
}
@ -139,14 +144,11 @@ func (v *Volume) CommitCompact() error {
}
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
if v.isCommitCompacting {
glog.V(0).Infof("Volume %d is already commit compacting ...", v.Id)
if !v.isCompactionInProgress.CompareAndSwap(false, true) {
glog.V(0).Infof("Volume %d is already compacting ...", v.Id)
return nil
}
v.isCommitCompacting = true
defer func() {
v.isCommitCompacting = false
}()
defer v.isCompactionInProgress.Store(false)
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@ -530,7 +532,7 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) {
if err != nil {
return err
}
if v.Ttl.String() == "" {
if v.Ttl.String() == "" && v.nm != nil {
dstDatSize, _, err := dstDatBackend.GetStat()
if err != nil {
return err

2
weed/storage/volume_write.go

@ -70,7 +70,7 @@ func (v *Volume) Destroy(onlyEmpty bool) (err error) {
return
}
}
if v.isCompacting || v.isCommitCompacting {
if !v.isCompactionInProgress.CompareAndSwap(false, true) {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}

51
weed/worker/tasks/vacuum/vacuum_task.go

@ -25,6 +25,7 @@ type VacuumTask struct {
garbageThreshold float64
progress float64
grpcDialOption grpc.DialOption
volumeSize uint64
}
// NewVacuumTask creates a new unified vacuum task instance
@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
}
t.garbageThreshold = vacuumParams.GarbageThreshold
t.volumeSize = params.VolumeSize
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
@ -62,7 +64,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
// Step 1: Check volume status and garbage ratio
t.ReportProgress(10.0)
t.GetLogger().Info("Checking volume status")
eligible, currentGarbageRatio, err := t.checkVacuumEligibility()
eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx)
if err != nil {
return fmt.Errorf("failed to check vacuum eligibility: %v", err)
}
@ -83,14 +85,14 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
"threshold": t.garbageThreshold,
}).Info("Performing vacuum operation")
if err := t.performVacuum(); err != nil {
if err := t.performVacuum(ctx); err != nil {
return fmt.Errorf("failed to perform vacuum: %v", err)
}
// Step 3: Verify vacuum results
t.ReportProgress(90.0)
t.GetLogger().Info("Verifying vacuum results")
if err := t.verifyVacuumResults(); err != nil {
if err := t.verifyVacuumResults(ctx); err != nil {
glog.Warningf("Vacuum verification failed: %v", err)
// Don't fail the task - vacuum operation itself succeeded
}
@ -146,15 +148,28 @@ func (t *VacuumTask) GetProgress() float64 {
return t.progress
}
// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the
// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for
// check, 3 minutes for compact).
func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration {
if t.volumeSize == 0 {
glog.V(1).Infof("volume %d has no size metric, using minimum timeout", t.volumeID)
}
sizeGB := int64(t.volumeSize/1024/1024/1024) + 1
return base * time.Duration(sizeGB)
}
// Helper methods for real vacuum operations
// checkVacuumEligibility checks if the volume meets vacuum criteria
func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) {
var garbageRatio float64
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
defer cancel()
resp, err := client.VacuumVolumeCheck(checkCtx, &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: t.volumeID,
})
if err != nil {
@ -178,12 +193,14 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
}
// performVacuum executes the actual vacuum operation
func (t *VacuumTask) performVacuum() error {
func (t *VacuumTask) performVacuum(ctx context.Context) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
// Step 1: Compact the volume
// Step 1: Compact the volume (3 min per GB, matching topology vacuum)
t.GetLogger().Info("Compacting volume")
stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute))
defer compactCancel()
stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: t.volumeID,
})
if err != nil {
@ -202,18 +219,22 @@ func (t *VacuumTask) performVacuum() error {
glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
}
// Step 2: Commit the vacuum
// Step 2: Commit the vacuum (1 min per GB)
t.GetLogger().Info("Committing vacuum operation")
_, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
defer commitCancel()
_, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum commit failed: %v", err)
}
// Step 3: Cleanup old files
// Step 3: Cleanup old files (1 min per GB)
t.GetLogger().Info("Cleaning up vacuum files")
_, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
defer cleanupCancel()
_, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: t.volumeID,
})
if err != nil {
@ -226,10 +247,12 @@ func (t *VacuumTask) performVacuum() error {
}
// verifyVacuumResults checks the volume status after vacuum
func (t *VacuumTask) verifyVacuumResults() error {
func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
defer cancel()
resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: t.volumeID,
})
if err != nil {

Loading…
Cancel
Save