|
@ -3,12 +3,13 @@ package needle |
|
|
import ( |
|
|
import ( |
|
|
"errors" |
|
|
"errors" |
|
|
"fmt" |
|
|
"fmt" |
|
|
|
|
|
"io" |
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/stats" |
|
|
"github.com/seaweedfs/seaweedfs/weed/stats" |
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend" |
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend" |
|
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types" |
|
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
"io" |
|
|
|
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
const ( |
|
|
const ( |
|
@ -52,7 +53,6 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi |
|
|
func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { |
|
|
func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { |
|
|
n.ParseNeedleHeader(bytes) |
|
|
n.ParseNeedleHeader(bytes) |
|
|
if n.Size != size { |
|
|
if n.Size != size { |
|
|
// cookie is not always passed in for this API. Use size to do preliminary checking.
|
|
|
|
|
|
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { |
|
|
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { |
|
|
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc() |
|
|
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc() |
|
|
glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) |
|
|
glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) |
|
@ -64,12 +64,22 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio |
|
|
switch version { |
|
|
switch version { |
|
|
case Version1: |
|
|
case Version1: |
|
|
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] |
|
|
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] |
|
|
case Version2, Version3: |
|
|
|
|
|
err = n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) |
|
|
|
|
|
|
|
|
// fallthrough to checksum logic below
|
|
|
|
|
|
case Version2: |
|
|
|
|
|
err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)]) |
|
|
|
|
|
if err != nil && err != io.EOF { |
|
|
|
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
case Version3: |
|
|
|
|
|
err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)]) |
|
|
if err != nil && err != io.EOF { |
|
|
if err != nil && err != io.EOF { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize |
|
|
|
|
|
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize]) |
|
|
|
|
|
default: |
|
|
|
|
|
return fmt.Errorf("unsupported version %d", version) |
|
|
|
|
|
} |
|
|
if size > 0 { |
|
|
if size > 0 { |
|
|
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) |
|
|
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) |
|
|
newChecksum := NewCRC(n.Data) |
|
|
newChecksum := NewCRC(n.Data) |
|
@ -82,10 +92,6 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio |
|
|
} |
|
|
} |
|
|
n.Checksum = newChecksum |
|
|
n.Checksum = newChecksum |
|
|
} |
|
|
} |
|
|
if version == Version3 { |
|
|
|
|
|
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize |
|
|
|
|
|
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize]) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -95,7 +101,6 @@ func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size Size, |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
err = n.ReadBytes(bytes, offset, size, version) |
|
|
err = n.ReadBytes(bytes, offset, size, version) |
|
|
if err == ErrorSizeMismatch && OffsetSize == 4 { |
|
|
if err == ErrorSizeMismatch && OffsetSize == 4 { |
|
|
offset = offset + int64(MaxPossibleVolumeSize) |
|
|
offset = offset + int64(MaxPossibleVolumeSize) |
|
|