diff --git a/weed/storage/needle/43.dat b/weed/storage/needle/43.dat new file mode 100644 index 000000000..7ef8c28ca Binary files /dev/null and b/weed/storage/needle/43.dat differ diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 882ed0233..77c7e88dc 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -111,6 +111,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { } n.Data = bytes[index : index+int(n.DataSize)] index = index + int(n.DataSize) + } + _, err = n.readNeedleDataVersion2NonData(bytes[index:]) + return +} +func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err error) { + lenBytes := len(bytes) + if index < lenBytes { n.Flags = bytes[index] index = index + 1 } @@ -119,7 +126,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { index = index + 1 if int(n.NameSize)+index > lenBytes { stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 2) + return index, fmt.Errorf("index out of range %d", 2) } n.Name = bytes[index : index+int(n.NameSize)] index = index + int(n.NameSize) @@ -129,7 +136,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { index = index + 1 if int(n.MimeSize)+index > lenBytes { stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 3) + return index, fmt.Errorf("index out of range %d", 3) } n.Mime = bytes[index : index+int(n.MimeSize)] index = index + int(n.MimeSize) @@ -137,7 +144,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { if index < lenBytes && n.HasLastModifiedDate() { if LastModifiedBytesLength+index > lenBytes { stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 4) + return index, fmt.Errorf("index out of range %d", 4) } n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength @@ -145,7 +152,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { if index < lenBytes && n.HasTtl() { if TtlBytesLength+index > lenBytes { stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 5) + return index, fmt.Errorf("index out of range %d", 5) } n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) index = index + TtlBytesLength @@ -153,19 +160,19 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { if index < lenBytes && n.HasPairs() { if 2+index > lenBytes { stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 6) + return index, fmt.Errorf("index out of range %d", 6) } n.PairsSize = util.BytesToUint16(bytes[index : index+2]) index += 2 if int(n.PairsSize)+index > lenBytes { stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 7) + return index, fmt.Errorf("index out of range %d", 7) } end := index + int(n.PairsSize) n.Pairs = bytes[index:end] index = end } - return nil + return index, nil } func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) { diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go new file mode 100644 index 000000000..4600f4314 --- /dev/null +++ b/weed/storage/needle/needle_read_page.go @@ -0,0 +1,77 @@ +package needle + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + . "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "io" +) + +// ReadNeedleData uses a needle without n.Data to read the content +// volumeOffset: the offset within the volume +// needleOffset: the offset within the needle Data +func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) { + + sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset) + if sizeToRead <= 0 { + return 0, io.EOF + } + startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset + + count, err = r.ReadAt(data[:sizeToRead], startOffset) + if err != nil { + fileSize, _, _ := r.GetStat() + glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err) + } + return + +} + +// ReadNeedleMeta fills all metadata except the n.Data +func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (checksumValue uint32, err error) { + + bytes := make([]byte, NeedleHeaderSize+DataSizeSize) + + count, err := r.ReadAt(bytes, offset) + if count != NeedleHeaderSize+DataSizeSize || err != nil { + return 0, err + } + n.ParseNeedleHeader(bytes) + n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize]) + + startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize) + dataSize := GetActualSize(size, version) + stopOffset := offset + dataSize + metaSize := stopOffset - startOffset + fmt.Printf("offset %d dataSize %d\n", offset, dataSize) + fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize) + metaSlice := make([]byte, int(metaSize)) + + count, err = r.ReadAt(metaSlice, startOffset) + if err != nil && int64(count) == metaSize { + err = nil + } + if err != nil { + return 0, err + } + + var index int + index, err = n.readNeedleDataVersion2NonData(metaSlice) + + checksumValue = util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize]) + if version == Version3 { + n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize]) + } + + return checksumValue, err + +} + +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/weed/storage/needle/needle_read_test.go b/weed/storage/needle/needle_read_test.go new file mode 100644 index 000000000..b519b0241 --- /dev/null +++ b/weed/storage/needle/needle_read_test.go @@ -0,0 +1,99 @@ +package needle + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/stretchr/testify/assert" + "io" + "os" + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func TestPageRead(t *testing.T) { + baseFileName := "43" + offset := int64(8) + size := types.Size(1153890) // actual file size 1153862 + + datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0644) + if err != nil { + t.Fatalf("Open Volume Data File [ERROR]: %v", err) + } + datBackend := backend.NewDiskFile(datFile) + defer datBackend.Close() + { + n := new(Needle) + + bytes, err := ReadNeedleBlob(datBackend, offset, size, Version3) + if err != nil { + t.Fatalf("readNeedleBlob: %v", err) + } + if err = n.ReadBytes(bytes, offset, size, Version3); err != nil { + t.Fatalf("readNeedleBlob: %v", err) + } + + fmt.Printf("bytes len %d\n", len(bytes)) + fmt.Printf("name %s size %d\n", n.Name, n.Size) + + fmt.Printf("id %d\n", n.Id) + fmt.Printf("DataSize %d\n", n.DataSize) + fmt.Printf("Flags %v\n", n.Flags) + fmt.Printf("NameSize %d\n", n.NameSize) + fmt.Printf("MimeSize %d\n", n.MimeSize) + fmt.Printf("PairsSize %d\n", n.PairsSize) + fmt.Printf("LastModified %d\n", n.LastModified) + fmt.Printf("AppendAtNs %d\n", n.AppendAtNs) + fmt.Printf("Checksum %d\n", n.Checksum) + } + + { + n, bytes, bodyLength, err := ReadNeedleHeader(datBackend, Version3, offset) + if err != nil { + t.Fatalf("ReadNeedleHeader: %v", err) + } + fmt.Printf("bytes len %d\n", len(bytes)) + fmt.Printf("name %s size %d bodyLength:%d\n", n.Name, n.Size, bodyLength) + } + + { + n := new(Needle) + checksumValue, err := n.ReadNeedleMeta(datBackend, offset, size, Version3) + if err != nil { + t.Fatalf("ReadNeedleHeader: %v", err) + } + fmt.Printf("name %s size %d\n", n.Name, n.Size) + fmt.Printf("id %d\n", n.Id) + fmt.Printf("DataSize %d\n", n.DataSize) + fmt.Printf("Flags %v\n", n.Flags) + fmt.Printf("NameSize %d\n", n.NameSize) + fmt.Printf("MimeSize %d\n", n.MimeSize) + fmt.Printf("PairsSize %d\n", n.PairsSize) + fmt.Printf("LastModified %d\n", n.LastModified) + fmt.Printf("AppendAtNs %d\n", n.AppendAtNs) + fmt.Printf("Checksum %d\n", n.Checksum) + fmt.Printf("Checksum value %d\n", checksumValue) + + buf := make([]byte, 1024) + crc := CRC(0) + for x := int64(0); ; x += 1024 { + count, err := n.ReadNeedleData(datBackend, offset, buf, x) + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("ReadNeedleData: %v", err) + } + if count > 0 { + crc = crc.Update(buf[0:count]) + } else { + break + } + } + fmt.Printf("read checksum value %d\n", crc.Value()) + + assert.Equal(t, checksumValue, crc.Value(), "validate checksum value") + + } + +} diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go index 137b97d7f..98287fe6a 100644 --- a/weed/storage/types/needle_types.go +++ b/weed/storage/types/needle_types.go @@ -33,6 +33,7 @@ type Cookie uint32 const ( SizeSize = 4 // uint32 size NeedleHeaderSize = CookieSize + NeedleIdSize + SizeSize + DataSizeSize = 4 NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize TimestampSize = 8 // int64 size NeedlePaddingSize = 8