diff --git a/weed/storage/needle/needle_read_options.go b/weed/storage/needle/needle_read_options.go index 6ddc2d88e..77f0f26b8 100644 --- a/weed/storage/needle/needle_read_options.go +++ b/weed/storage/needle/needle_read_options.go @@ -1,8 +1,11 @@ package needle import ( + "io" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" ) // NeedleReadOptions specifies which parts of the Needle to read. @@ -13,11 +16,94 @@ type NeedleReadOptions struct { } // ReadFromFile reads the Needle from the backend file according to the specified options. -// For now, this is equivalent to ReadData (reads everything). +// - If only ReadHeader is true, only the header is read and parsed. +// - If ReadData or ReadMeta is true, reads GetActualSize(size, version) bytes from disk (size is the logical body size). func (n *Needle) ReadFromFile(r backend.BackendStorageFile, offset int64, size Size, version Version, opts NeedleReadOptions) error { - // Always read header and body for now (full read) - bytes, err := ReadNeedleBlob(r, offset, size, version) - if err != nil { + if opts.ReadHeader && !opts.ReadData && !opts.ReadMeta { + // Only read the header + header := make([]byte, NeedleHeaderSize) + count, err := r.ReadAt(header, offset) + if err == io.EOF && count == NeedleHeaderSize { + err = nil + } + if count != NeedleHeaderSize || err != nil { + return err + } + n.ParseNeedleHeader(header) + return nil + } + if opts.ReadHeader && opts.ReadMeta && !opts.ReadData { + // Read header first + header := make([]byte, NeedleHeaderSize) + count, err := r.ReadAt(header, offset) + if err == io.EOF && count == NeedleHeaderSize { + err = nil + } + if count != NeedleHeaderSize || err != nil { + return err + } + n.ParseNeedleHeader(header) + + // Now read meta fields after DataSize+Data + metaOffset := offset + int64(NeedleHeaderSize) + metaIndex := 0 + if version == Version2 || version == Version3 { + // Read DataSize to know how much to skip + dsBuf := make([]byte, 4) + count, err := r.ReadAt(dsBuf, metaOffset) + if err == io.EOF && count == 4 { + err = nil + } + if count != 4 || err != nil { + return err + } + dataSize := int(util.BytesToUint32(dsBuf)) + metaIndex = 4 + dataSize + // Read meta fields (Flags, Name, Mime, etc.) + metaFieldsLen := int(n.Size) - dataSize // upper bound, may be more than needed + metaFieldsBuf := make([]byte, metaFieldsLen) + count, err = r.ReadAt(metaFieldsBuf, metaOffset+int64(metaIndex)) + if err == io.EOF && count == metaFieldsLen { + err = nil + } + if count <= 0 || err != nil { + return err + } + _, err = n.readNeedleDataVersion2NonData(metaFieldsBuf) + if err != nil { + return err + } + // Now read checksum and (for v3) appendAtNs at the end + endMetaOffset := offset + int64(NeedleHeaderSize) + int64(n.Size) + endMetaLen := NeedleChecksumSize + if version == Version3 { + endMetaLen += TimestampSize + } + endMetaBuf := make([]byte, endMetaLen) + count, err = r.ReadAt(endMetaBuf, endMetaOffset) + if err == io.EOF && count == endMetaLen { + err = nil + } + if count != endMetaLen || err != nil { + return err + } + n.Checksum = CRC(util.BytesToUint32(endMetaBuf[:NeedleChecksumSize])) + if version == Version3 { + n.AppendAtNs = util.BytesToUint64(endMetaBuf[NeedleChecksumSize : NeedleChecksumSize+TimestampSize]) + } + return nil + } + // For v1, just skip Data + return nil + } + // Otherwise, read the full on-disk entry size + readLen := int(GetActualSize(size, version)) + bytes := make([]byte, readLen) + count, err := r.ReadAt(bytes, offset) + if err == io.EOF && count == readLen { + err = nil + } + if count != readLen || err != nil { return err } return n.ReadBytes(bytes, offset, size, version) diff --git a/weed/storage/needle/needle_read_options_test.go b/weed/storage/needle/needle_read_options_test.go index e50b77b59..f86bab57a 100644 --- a/weed/storage/needle/needle_read_options_test.go +++ b/weed/storage/needle/needle_read_options_test.go @@ -6,8 +6,6 @@ import ( "reflect" "testing" "time" - - . "github.com/seaweedfs/seaweedfs/weed/storage/types" ) type mockBackend struct { @@ -62,16 +60,20 @@ func TestReadFromFile_EquivalenceWithReadData(t *testing.T) { Ttl: nil, Pairs: []byte("key=value"), PairsSize: 9, - Checksum: 0xCAFEBABE, + Checksum: 0, AppendAtNs: 0xDEADBEEF, } + // remove the TTL bit in the flags + n.Flags = n.Flags &^ FlagHasTtl + // n.Checksum = NewCRC(n.Data) + buf := &bytes.Buffer{} _, _, err := writeNeedleV2(n, 0, buf) if err != nil { t.Fatalf("writeNeedleV2 failed: %v", err) } backend := &mockBackend{data: buf.Bytes()} - size := Size(len(buf.Bytes()) - NeedleHeaderSize - NeedleChecksumSize - int(PaddingLength(Size(len(buf.Bytes())-NeedleHeaderSize-NeedleChecksumSize), Version2))) + size := n.Size // Old method nOld := &Needle{} @@ -90,3 +92,80 @@ func TestReadFromFile_EquivalenceWithReadData(t *testing.T) { t.Errorf("needle mismatch: old=%+v new=%+v", nOld, nNew) } } + +func TestReadFromFile_OptionsMatrix(t *testing.T) { + n := &Needle{ + Cookie: 0x12345678, + Id: 0x1122334455667788, + Data: []byte("hello world"), + Flags: 0xFF, + Name: []byte("filename.txt"), + Mime: []byte("text/plain"), + LastModified: 0x1234567890, + Ttl: nil, + Pairs: []byte("key=value"), + PairsSize: 9, + Checksum: 0, + AppendAtNs: 0xDEADBEEF, + } + n.Flags = n.Flags &^ FlagHasTtl + n.Checksum = NewCRC(n.Data) + + buf := &bytes.Buffer{} + _, _, err := writeNeedleV2(n, 0, buf) + if err != nil { + t.Fatalf("writeNeedleV2 failed: %v", err) + } + backend := &mockBackend{data: buf.Bytes()} + size := n.Size + + t.Run("ReadHeader only", func(t *testing.T) { + nHeader := &Needle{} + opts := NeedleReadOptions{ReadHeader: true} + err := nHeader.ReadFromFile(backend, 0, size, Version2, opts) + if err != nil { + t.Fatalf("ReadFromFile header only failed: %v", err) + } + if nHeader.Cookie != n.Cookie || nHeader.Id != n.Id || nHeader.Size != n.Size { + t.Errorf("header fields mismatch: got %+v want %+v", nHeader, n) + } + if nHeader.Data != nil || nHeader.Name != nil || nHeader.Mime != nil || nHeader.Pairs != nil || nHeader.Checksum != 0 { + t.Errorf("non-header fields should be zero, got %+v", nHeader) + } + }) + + t.Run("ReadHeader+ReadMeta", func(t *testing.T) { + nMeta := &Needle{} + opts := NeedleReadOptions{ReadHeader: true, ReadMeta: true} + err := nMeta.ReadFromFile(backend, 0, size, Version2, opts) + if err != nil { + t.Fatalf("ReadFromFile header+meta failed: %v", err) + } + if nMeta.Data != nil { + t.Errorf("Data should not be set when only meta is read") + } + if nMeta.Name == nil || nMeta.Mime == nil || nMeta.Pairs == nil { + t.Errorf("meta fields should be set, got %+v", nMeta) + } + if nMeta.Cookie != n.Cookie || nMeta.Id != n.Id || nMeta.Size != n.Size { + t.Errorf("header fields mismatch: got %+v want %+v", nMeta, n) + } + }) + + t.Run("ReadHeader+ReadData", func(t *testing.T) { + // this is the same as ReadHeader+ReadData+ReadMeta + }) + + t.Run("ReadHeader+ReadData+ReadMeta", func(t *testing.T) { + nFull := &Needle{} + opts := NeedleReadOptions{ReadHeader: true, ReadData: true, ReadMeta: true} + err := nFull.ReadFromFile(backend, 0, size, Version2, opts) + if err != nil { + t.Fatalf("ReadFromFile header+data+meta failed: %v", err) + } + nFull.AppendAtNs = n.AppendAtNs + if !reflect.DeepEqual(nFull, n) { + t.Errorf("needle mismatch: got %+v want %+v", nFull, n) + } + }) +}