From 2c913dde047a7b86925535ed47a7e4d741e4351d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Nov 2020 00:09:29 -0800 Subject: [PATCH] volume: detect and drop volumes with disk IO error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit from Jethro in slack: is it possible to make the assign request a bit smarter? Currently I’m in the state that a disk failed but all assign request are being send to this volume. It would be cool if the master sees this and stopped using this volume. e=HTTP(http://x:8089/913,045a782b63176edf) not 200 but 500 Internal Server Error Body={"size":740167,"error":"failed to write to local disk: write /mnt/v9/913.dat: input/output error","eTag":"ee4381e202212ff3aee647704c036689"} e=HTTP(http://x:8089/913,045a782c90240077) not 200 but 500 Internal Server Error Body={"size":792779,"error":"failed to write to local disk: write /mnt/v9/913.dat: input/output error","eTag":"c43463ccc11eb6eb2fc306f407a6a953"} e=HTTP(http://x:8089/913,045a782e6b7901ea) not 200 but 500 Internal Server Error Body={"size":3962392,"error":"failed to write to local disk: write /mnt/v9/913.dat: input/output error","eTag":"04c91198e9b276c81f11dbf189af5d28"} --- weed/storage/store.go | 7 ++++++- weed/storage/volume.go | 6 ++++-- weed/storage/volume_read_write.go | 23 +++++++++++++++++++++-- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/weed/storage/store.go b/weed/storage/store.go index 7e5768417..0d4d6e916 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -221,7 +221,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { deleteVids = append(deleteVids, v.Id) } else { - glog.V(0).Infoln("volume", v.Id, "is expired.") + glog.V(0).Infoln("volume %d is expired", v.Id) + } + if v.lastIoError != nil { + deleteVids = append(deleteVids, v.Id) + } else { + glog.Warningf("volume %d has IO error", v.Id) } } collectionVolumeSize[v.Collection] += volumeMessage.Size diff --git a/weed/storage/volume.go b/weed/storage/volume.go index a03846d3d..c726e7f11 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -46,6 +46,8 @@ type Volume struct { volumeInfo *volume_server_pb.VolumeInfo location *DiskLocation + + lastIoError error } func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { @@ -86,10 +88,10 @@ func (v *Volume) IndexFileName() (fileName string) { func (v *Volume) FileName(ext string) (fileName string) { switch ext { case ".idx", ".cpx", ".ldb": - return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))+ext + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext } // .dat, .cpd, .vif - return VolumeFileName(v.dir, v.Collection, int(v.Id))+ext + return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext } func (v *Volume) Version() needle.Version { diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 6dc4cb4a5..c30abf237 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -19,6 +19,18 @@ var ErrorNotFound = errors.New("not found") var ErrorDeleted = errors.New("already deleted") var ErrorSizeMismatch = errors.New("size mismatch") +func (v *Volume) checkReadWriteError(err error) { + if err == nil { + if v.lastIoError != nil { + v.lastIoError = nil + } + return + } + if err.Error() == "input/output error" { + v.lastIoError = err + } +} + // isFileUnchanged checks whether this needle to write is same as last one. // It requires serialized access in the same volume. func (v *Volume) isFileUnchanged(n *needle.Needle) bool { @@ -115,7 +127,9 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) - if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { + offset, size, _, err = n.Append(v.DataBackend, v.Version()) + v.checkReadWriteError(err) + if err != nil { return } @@ -179,7 +193,9 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) - if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { + offset, size, _, err = n.Append(v.DataBackend, v.Version()) + v.checkReadWriteError(err) + if err != nil { return } v.lastAppendAtNs = n.AppendAtNs @@ -214,6 +230,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) { n.Data = nil n.AppendAtNs = uint64(time.Now().UnixNano()) offset, _, _, err := n.Append(v.DataBackend, v.Version()) + v.checkReadWriteError(err) if err != nil { return size, err } @@ -252,6 +269,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) { n.Data = nil n.AppendAtNs = uint64(time.Now().UnixNano()) offset, _, _, err := n.Append(v.DataBackend, v.Version()) + v.checkReadWriteError(err) if err != nil { return size, err } @@ -289,6 +307,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro if err == needle.ErrorSizeMismatch && OffsetSize == 4 { err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) } + v.checkReadWriteError(err) if err != nil { return 0, err }