Browse Source

volume loading: trim out unreachable idx file content

fix https://github.com/chrislusf/seaweedfs/issues/1583
pull/1584/head
Chris Lu 4 years ago
parent
commit
6560ac6466
  1. 21
      weed/storage/volume_checking.go

21
weed/storage/volume_checking.go

@ -2,6 +2,7 @@ package storage
import ( import (
"fmt" "fmt"
"io"
"os" "os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -20,13 +21,25 @@ func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAt
if indexSize == 0 { if indexSize == 0 {
return 0, nil return 0, nil
} }
healthyIndexSize := indexSize
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
// check and fix last 10 entries // check and fix last 10 entries
lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
if err == io.EOF {
healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize
continue
}
if err != ErrorSizeMismatch { if err != ErrorSizeMismatch {
break break
} }
} }
if healthyIndexSize < indexSize {
glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize)
err = indexFile.Truncate(healthyIndexSize)
if err != nil {
glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err)
}
}
return return
} }
@ -73,6 +86,9 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
if err == io.EOF {
return 0, err
}
if err != nil { if err != nil {
return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset) return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
} }
@ -82,8 +98,11 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
if v == needle.Version3 { if v == needle.Version3 {
bytes := make([]byte, TimestampSize) bytes := make([]byte, TimestampSize)
_, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
if err == io.EOF {
return 0, err
}
if err != nil { if err != nil {
return 0, fmt.Errorf("check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
} }
n.AppendAtNs = util.BytesToUint64(bytes) n.AppendAtNs = util.BytesToUint64(bytes)
fileTailOffset := offset + needle.GetActualSize(size, v) fileTailOffset := offset + needle.GetActualSize(size, v)

Loading…
Cancel
Save