diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 905969e36..416c8f4cb 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -61,36 +61,17 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc() return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } - switch version { - case Version1: + if version == Version1 { n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] - // fallthrough to checksum logic below - case Version2: - err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)]) - if err != nil && err != io.EOF { - return err - } - case Version3: + } else { err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)]) if err != nil && err != io.EOF { return err } - tsOffset := NeedleHeaderSize + size + NeedleChecksumSize - n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize]) - default: - return fmt.Errorf("unsupported version %d", version) } - if size > 0 { - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() && checksum != uint32(newChecksum) { - // the crc.Value() function is to be deprecated. this double checking is for backward compatibility - // with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb - // and switch appeared in version 3.09. - stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc() - return errors.New("CRC error! Data On Disk Corrupted") - } - n.Checksum = newChecksum + err = n.readNeedleTail(bytes[NeedleHeaderSize+size:], version) + if err != nil { + return err } return nil } @@ -260,14 +241,11 @@ func (n *Needle) ReadNeedleBodyBytes(needleBody []byte, version Version) (err er switch version { case Version1: n.Data = needleBody[:n.Size] - n.Checksum = NewCRC(n.Data) + err = n.readNeedleTail(needleBody[n.Size:], version) case Version2, Version3: err = n.readNeedleDataVersion2(needleBody[0:n.Size]) - n.Checksum = NewCRC(n.Data) - - if version == Version3 { - tsOffset := n.Size + NeedleChecksumSize - n.AppendAtNs = util.BytesToUint64(needleBody[tsOffset : tsOffset+TimestampSize]) + if err == nil { + err = n.readNeedleTail(needleBody[n.Size:], version) } default: err = fmt.Errorf("unsupported version %d!", version) diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index 4e1032de8..82142de4a 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -2,11 +2,12 @@ package needle import ( "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "io" ) // ReadNeedleData uses a needle without n.Data to read the content @@ -78,10 +79,7 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size index, err = n.readNeedleDataVersion2NonData(metaSlice) } - n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize])) - if version == Version3 { - n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize]) - } + err = n.readNeedleTail(metaSlice[index:], version) return err } diff --git a/weed/storage/needle/needle_read_tail.go b/weed/storage/needle/needle_read_tail.go new file mode 100644 index 000000000..f53a5654f --- /dev/null +++ b/weed/storage/needle/needle_read_tail.go @@ -0,0 +1,35 @@ +package needle + +import ( + "errors" + + "github.com/seaweedfs/seaweedfs/weed/stats" + . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func (n *Needle) readNeedleTail(needleBody []byte, version Version) error { + + // for all versions, we need to read the checksum + if len(n.Data) > 0 { + expectedChecksum := CRC(util.BytesToUint32(needleBody[0:NeedleChecksumSize])) + dataChecksum := NewCRC(n.Data) + if expectedChecksum != dataChecksum { + // the crc.Value() function is to be deprecated. this double checking is for backward compatibility + // with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb + // and switch appeared in version 3.09. + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc() + return errors.New("CRC error! Data On Disk Corrupted") + } + n.Checksum = dataChecksum + } else { + // when data is skipped from reading, just read the checksum + n.Checksum = CRC(util.BytesToUint32(needleBody[0:NeedleChecksumSize])) + } + + if version == Version3 { + tsOffset := NeedleChecksumSize + n.AppendAtNs = util.BytesToUint64(needleBody[tsOffset : tsOffset+TimestampSize]) + } + return nil +}