Browse Source

volume: checksum remove the hashing step in Value()

pull/3140/head
chrislu 3 years ago
parent
commit
056c480eb0
  1. 3
      weed/storage/needle/crc.go
  2. 3
      weed/storage/needle/needle_read.go
  3. 19
      weed/storage/needle/needle_read_page.go
  4. 2
      weed/storage/needle/needle_read_test.go
  5. 4
      weed/storage/needle/needle_write.go
  6. 9
      weed/storage/store.go
  7. 8
      weed/storage/volume_read.go

3
weed/storage/needle/crc.go

@ -21,6 +21,7 @@ func (c CRC) Update(b []byte) CRC {
return CRC(crc32.Update(uint32(c), table, b))
}
// Value Deprecated. Just use the raw uint32 value to compare.
func (c CRC) Value() uint32 {
return uint32(c>>15|c<<17) + 0xa282ead8
}
@ -51,4 +52,4 @@ func (c *CRCwriter) Write(p []byte) (n int, err error) {
return
}
func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash
func (c *CRCwriter) Sum() uint32 { return uint32(c.crc) } // final hash

3
weed/storage/needle/needle_read.go

@ -72,7 +72,8 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
if size > 0 {
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() {
if checksum != newChecksum.Value() && checksum != uint32(newChecksum) {
// the crc.Value() function is to be deprecated. this double checking is for backward compatible.
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorCRC).Inc()
return errors.New("CRC error! Data On Disk Corrupted")
}

19
weed/storage/needle/needle_read_page.go

@ -10,7 +10,7 @@ import (
)
// ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer
func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64, expectedChecksumValue uint32) (err error) {
func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64) (err error) {
crc := CRC(0)
for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) {
count, err := n.ReadNeedleData(r, volumeOffset, buf, x)
@ -31,8 +31,9 @@ func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset i
break
}
}
if needleOffset == 0 && size == int64(n.DataSize) && expectedChecksumValue != crc.Value() {
return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc.Value(), expectedChecksumValue)
if needleOffset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) {
// the crc.Value() function is to be deprecated. this double checking is for backward compatible.
return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum)
}
return nil
}
@ -58,18 +59,18 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64
}
// ReadNeedleMeta fills all metadata except the n.Data
func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (checksumValue uint32, err error) {
func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
count, err := r.ReadAt(bytes, offset)
if count != NeedleHeaderSize+DataSizeSize || err != nil {
return 0, err
return err
}
n.ParseNeedleHeader(bytes)
if n.Size != size {
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
return 0, ErrorSizeMismatch
return ErrorSizeMismatch
}
}
@ -86,18 +87,18 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
err = nil
}
if err != nil {
return 0, err
return err
}
var index int
index, err = n.readNeedleDataVersion2NonData(metaSlice)
checksumValue = util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize])
n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize]))
if version == Version3 {
n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
}
return checksumValue, err
return err
}

2
weed/storage/needle/needle_read_test.go

@ -57,7 +57,7 @@ func TestPageRead(t *testing.T) {
{
n := new(Needle)
checksumValue, err := n.ReadNeedleMeta(datBackend, offset, size, Version3)
err := n.ReadNeedleMeta(datBackend, offset, size, Version3)
if err != nil {
t.Fatalf("ReadNeedleHeader: %v", err)
}

4
weed/storage/needle/needle_write.go

@ -31,7 +31,7 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (
writeBytes.Write(header)
writeBytes.Write(n.Data)
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
util.Uint32toBytes(header[0:NeedleChecksumSize], uint32(n.Checksum))
writeBytes.Write(header[0 : NeedleChecksumSize+padding])
return size, actualSize, nil
case Version2, Version3:
@ -97,7 +97,7 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (
}
}
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
util.Uint32toBytes(header[0:NeedleChecksumSize], uint32(n.Checksum))
if version == Version2 {
writeBytes.Write(header[0 : NeedleChecksumSize+padding])
} else {

9
weed/storage/store.go

@ -27,13 +27,14 @@ const (
)
type ReadOption struct {
// request
ReadDeleted bool
AttemptMetaOnly bool
MustMetaOnly bool
IsMetaOnly bool // read status
ChecksumValue uint32 // read status
VolumeRevision uint16
IsOutOfRange bool // whether need to read over MaxPossibleVolumeSize
// response
IsMetaOnly bool // read status
VolumeRevision uint16
IsOutOfRange bool // whether read over MaxPossibleVolumeSize
}
/*

8
weed/storage/volume_read.go

@ -41,10 +41,10 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
}
if readOption != nil && readOption.AttemptMetaOnly && readSize > PagedReadLimit {
readOption.VolumeRevision = v.SuperBlock.CompactionRevision
readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
readOption.IsOutOfRange = true
readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
}
if err != nil {
return 0, err
@ -105,7 +105,7 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
// the volume is compacted
readOption.IsOutOfRange = false
readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
}
buf := mem.Allocate(1024 * 1024)
defer mem.Free(buf)
@ -114,7 +114,7 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
actualOffset += int64(MaxPossibleVolumeSize)
}
return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size, readOption.ChecksumValue)
return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size)
}
// read fills in Needle content by looking up n.Id from NeedleMapper

Loading…
Cancel
Save