Browse Source

correctly report volume with input/output error to master (#6790)

* correctly capture io error and report to master

* code fix

* check io error by error.Is

---------

Co-authored-by: dongxu_feng <dongxu_feng@intsig.net>
pull/6793/head
dongxufeng 3 days ago
committed by GitHub
parent
commit
ff878a542d
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 7
      weed/storage/needle/needle_write.go
  2. 26
      weed/storage/store.go
  3. 3
      weed/storage/volume_write.go

7
weed/storage/needle/needle_write.go

@ -3,12 +3,13 @@ package needle
import (
"bytes"
"fmt"
"math"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
"math"
)
func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) {
@ -118,7 +119,7 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
}(w, end)
offset = uint64(end)
} else {
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
@ -134,7 +135,7 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
if err == nil {
_, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset))
if err != nil {
err = fmt.Errorf("failed to write %d bytes to %s at offset %d: %v", actualSize, w.Name(), offset, err)
err = fmt.Errorf("failed to write %d bytes to %s at offset %d: %w", actualSize, w.Name(), offset, err)
}
}

26
weed/storage/store.go

@ -269,19 +269,23 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxFileKey = curMaxFileKey
}
shouldDeleteVolume := false
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage)
if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id)
shouldDeleteVolume = true
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
shouldDeleteVolume = true
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage)
} else {
glog.V(0).Infof("volume %d is expired", v.Id)
}
if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id)
shouldDeleteVolume = true
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
if !shouldDeleteVolume {
deleteVids = append(deleteVids, v.Id)
shouldDeleteVolume = true
}
} else {
glog.V(0).Infof("volume %d is expired", v.Id)
}
}
}

3
weed/storage/volume_write.go

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"syscall"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
@ -23,7 +24,7 @@ func (v *Volume) checkReadWriteError(err error) {
}
return
}
if err.Error() == "input/output error" {
if errors.Is(err, syscall.EIO) {
v.lastIoError = err
}
}

Loading…
Cancel
Save