diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 1907efad3..905969e36 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -3,12 +3,13 @@ package needle import ( "errors" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "io" ) const ( @@ -52,7 +53,6 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { n.ParseNeedleHeader(bytes) if n.Size != size { - // cookie is not always passed in for this API. Use size to do preliminary checking. if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc() glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) @@ -64,11 +64,21 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio switch version { case Version1: n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] - case Version2, Version3: - err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) - } - if err != nil && err != io.EOF { - return err + // fallthrough to checksum logic below + case Version2: + err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)]) + if err != nil && err != io.EOF { + return err + } + case Version3: + err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)]) + if err != nil && err != io.EOF { + return err + } + tsOffset := NeedleHeaderSize + size + NeedleChecksumSize + n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize]) + default: + return fmt.Errorf("unsupported version %d", version) } if size > 0 { checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) @@ -82,10 +92,6 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio } n.Checksum = newChecksum } - if version == Version3 { - tsOffset := NeedleHeaderSize + size + NeedleChecksumSize - n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize]) - } return nil } @@ -95,7 +101,6 @@ func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size Size, if err != nil { return err } - err = n.ReadBytes(bytes, offset, size, version) if err == ErrorSizeMismatch && OffsetSize == 4 { offset = offset + int64(MaxPossibleVolumeSize)