diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 65a37805b..914c432d2 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -23,14 +23,13 @@ jobs: runs-on: ubuntu-22.04 timeout-minutes: 30 steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Set up Docker Buildx uses: docker/setup-buildx-action@v4 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b15556c1a..334e75ca0 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,13 +19,12 @@ jobs: name: Go Vet runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Get dependencies run: | cd weed; go get -v -t -d ./... @@ -42,13 +41,12 @@ jobs: name: Build runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Build run: cd weed; go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v . @@ -56,12 +54,11 @@ jobs: name: Test runs-on: ubuntu-latest steps: - - name: Set up Go 1.x - uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2 - with: - go-version: ^1.13 - id: go - name: Check out code into the Go module directory - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2 + uses: actions/checkout@v6 + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' - name: Test run: cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./... diff --git a/.github/workflows/metadata-subscribe-tests.yml b/.github/workflows/metadata-subscribe-tests.yml index bfeac2042..569d0e43c 100644 --- a/.github/workflows/metadata-subscribe-tests.yml +++ b/.github/workflows/metadata-subscribe-tests.yml @@ -30,7 +30,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '10m' jobs: @@ -43,10 +42,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Build SeaweedFS run: | diff --git a/.github/workflows/sftp-tests.yml b/.github/workflows/sftp-tests.yml index fe513835b..b5277ed50 100644 --- a/.github/workflows/sftp-tests.yml +++ b/.github/workflows/sftp-tests.yml @@ -24,7 +24,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '15m' jobs: @@ -37,10 +36,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Install dependencies run: | diff --git a/.github/workflows/volume-server-integration-tests.yml b/.github/workflows/volume-server-integration-tests.yml index fcb266ea8..70c6e21c8 100644 --- a/.github/workflows/volume-server-integration-tests.yml +++ b/.github/workflows/volume-server-integration-tests.yml @@ -28,7 +28,6 @@ permissions: contents: read env: - GO_VERSION: '1.24' TEST_TIMEOUT: '30m' jobs: @@ -46,10 +45,10 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Set up Go ${{ env.GO_VERSION }} + - name: Set up Go uses: actions/setup-go@v6 with: - go-version: ${{ env.GO_VERSION }} + go-version-file: 'go.mod' - name: Build SeaweedFS binary run: | diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9ff7d5e5b..b8cb85516 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -300,9 +300,13 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er var wg sync.WaitGroup wg.Add(2) go func() { - for _, v := range delVolsMap { + for k, v := range delVolsMap { if err := v.Destroy(false); err != nil { errChain <- err + } else { + l.volumesLock.Lock() + delete(l.volumes, k) + l.volumesLock.Unlock() } } wg.Done() @@ -383,14 +387,10 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { deltaVols := make(map[needle.VolumeId]*Volume, 0) for k, v := range l.volumes { - if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting { + if v.Collection == collectionName && !v.isCompactionInProgress.Load() { deltaVols[k] = v } } - - for k := range deltaVols { - delete(l.volumes, k) - } return deltaVols } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index dd8ecbdce..48149f4d9 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -5,6 +5,7 @@ import ( "path" "strconv" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -45,8 +46,7 @@ type Volume struct { lastCompactRevision uint16 ldbTimeout int64 - isCompacting bool - isCommitCompacting bool + isCompactionInProgress atomic.Bool volumeInfoRWLock sync.RWMutex volumeInfo *volume_server_pb.VolumeInfo @@ -224,6 +224,16 @@ func (v *Volume) SyncToDisk() { // Close cleanly shuts down this volume func (v *Volume) Close() { + // Wait for any in-progress compaction to finish and claim the flag so no + // new compaction can start. This must happen BEFORE acquiring + // dataFileAccessLock to avoid deadlocking with CommitCompact which holds + // the flag while waiting for the lock. + for !v.isCompactionInProgress.CompareAndSwap(false, true) { + time.Sleep(521 * time.Millisecond) + glog.Warningf("Volume Close wait for compaction %d", v.Id) + } + defer v.isCompactionInProgress.Store(false) + v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -231,11 +241,6 @@ func (v *Volume) Close() { } func (v *Volume) doClose() { - for v.isCommitCompacting { - time.Sleep(521 * time.Millisecond) - glog.Warningf("Volume Close wait for compaction %d", v.Id) - } - if v.nm != nil { if err := v.nm.Sync(); err != nil { glog.Warningf("Volume Close fail to sync volume idx %d", v.Id) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 2c20ce634..e97342597 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -64,22 +64,26 @@ func (v *Volume) CompactByVolumeData(opts *CompactOptions) error { //v.accessLock.Lock() //defer v.accessLock.Unlock() //glog.V(3).Infof("Got Compaction lock...") - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { glog.V(0).Infof("Volume %d is already compacting...", v.Id) return nil } - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) + if v.DataBackend == nil { + return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) + } + nm := v.nm + if nm == nil { + return fmt.Errorf("volume %d needle map is nil", v.Id) + } if err := v.DataBackend.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume %d", v.Id) } - if err := v.nm.Sync(); err != nil { + if err := nm.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume idx %d", v.Id) } @@ -102,14 +106,11 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error { } glog.V(3).Infof("Compact2 volume %d ...", v.Id) - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { glog.V(0).Infof("Volume %d is already compacting2 ...", v.Id) return nil } - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision @@ -117,10 +118,14 @@ func (v *Volume) CompactByIndex(opts *CompactOptions) error { if v.DataBackend == nil { return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) } + nm := v.nm + if nm == nil { + return fmt.Errorf("volume %d needle map is nil", v.Id) + } if err := v.DataBackend.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err) } - if err := v.nm.Sync(); err != nil { + if err := nm.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err) } @@ -139,14 +144,11 @@ func (v *Volume) CommitCompact() error { } glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) - if v.isCommitCompacting { - glog.V(0).Infof("Volume %d is already commit compacting ...", v.Id) + if !v.isCompactionInProgress.CompareAndSwap(false, true) { + glog.V(0).Infof("Volume %d is already compacting ...", v.Id) return nil } - v.isCommitCompacting = true - defer func() { - v.isCommitCompacting = false - }() + defer v.isCompactionInProgress.Store(false) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -530,7 +532,7 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { if err != nil { return err } - if v.Ttl.String() == "" { + if v.Ttl.String() == "" && v.nm != nil { dstDatSize, _, err := dstDatBackend.GetStat() if err != nil { return err diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index 8cb00bc15..2f832e1f7 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -70,7 +70,7 @@ func (v *Volume) Destroy(onlyEmpty bool) (err error) { return } } - if v.isCompacting || v.isCommitCompacting { + if !v.isCompactionInProgress.CompareAndSwap(false, true) { err = fmt.Errorf("volume %d is compacting", v.Id) return }