From 2bfc8970d2c5bb55f45432ca6bcc9f6cfc933274 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 8 Sep 2022 18:54:02 -0700 Subject: [PATCH] refactor: move ReadNeedleDataInto into volume_read.go --- weed/storage/needle/needle_read_page.go | 31 ---------- weed/storage/needle/needle_read_test.go | 82 ------------------------- weed/storage/volume_read.go | 32 +++++++++- 3 files changed, 31 insertions(+), 114 deletions(-) delete mode 100644 weed/storage/needle/needle_read_test.go diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index b06622607..47c3461bb 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -1,7 +1,6 @@ package needle import ( - "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" @@ -9,36 +8,6 @@ import ( "io" ) -// ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer -func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64) (err error) { - crc := CRC(0) - for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) { - count, err := n.ReadNeedleData(r, volumeOffset, buf, x) - toWrite := min(int64(count), needleOffset+size-x) - if toWrite > 0 { - crc = crc.Update(buf[0:toWrite]) - if _, err = writer.Write(buf[0:toWrite]); err != nil { - return fmt.Errorf("ReadNeedleData write: %v", err) - } - } - if err != nil { - if err == io.EOF { - err = nil - break - } - return fmt.Errorf("ReadNeedleData: %v", err) - } - if count <= 0 { - break - } - } - if needleOffset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) { - // the crc.Value() function is to be deprecated. this double checking is for backward compatible. - return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum) - } - return nil -} - // ReadNeedleData uses a needle without n.Data to read the content // volumeOffset: the offset within the volume // needleOffset: the offset within the needle Data diff --git a/weed/storage/needle/needle_read_test.go b/weed/storage/needle/needle_read_test.go deleted file mode 100644 index 7aa196060..000000000 --- a/weed/storage/needle/needle_read_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package needle - -import ( - "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/backend" - "io" - "os" - "testing" - - "github.com/seaweedfs/seaweedfs/weed/storage/types" -) - -func TestPageRead(t *testing.T) { - baseFileName := "43" - offset := int64(8) - size := types.Size(1153890) // actual file size 1153862 - - datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0644) - if err != nil { - t.Fatalf("Open Volume Data File [ERROR]: %v", err) - } - datBackend := backend.NewDiskFile(datFile) - defer datBackend.Close() - { - n := new(Needle) - - bytes, err := ReadNeedleBlob(datBackend, offset, size, Version3) - if err != nil { - t.Fatalf("readNeedleBlob: %v", err) - } - if err = n.ReadBytes(bytes, offset, size, Version3); err != nil { - t.Fatalf("readNeedleBlob: %v", err) - } - - fmt.Printf("bytes len %d\n", len(bytes)) - fmt.Printf("name %s size %d\n", n.Name, n.Size) - - fmt.Printf("id %d\n", n.Id) - fmt.Printf("DataSize %d\n", n.DataSize) - fmt.Printf("Flags %v\n", n.Flags) - fmt.Printf("NameSize %d\n", n.NameSize) - fmt.Printf("MimeSize %d\n", n.MimeSize) - fmt.Printf("PairsSize %d\n", n.PairsSize) - fmt.Printf("LastModified %d\n", n.LastModified) - fmt.Printf("AppendAtNs %d\n", n.AppendAtNs) - fmt.Printf("Checksum %d\n", n.Checksum) - } - - { - n, bytes, bodyLength, err := ReadNeedleHeader(datBackend, Version3, offset) - if err != nil { - t.Fatalf("ReadNeedleHeader: %v", err) - } - fmt.Printf("bytes len %d\n", len(bytes)) - fmt.Printf("name %s size %d bodyLength:%d\n", n.Name, n.Size, bodyLength) - } - - { - n := new(Needle) - err := n.ReadNeedleMeta(datBackend, offset, size, Version3) - if err != nil { - t.Fatalf("ReadNeedleHeader: %v", err) - } - fmt.Printf("name %s size %d\n", n.Name, n.Size) - fmt.Printf("id %d\n", n.Id) - fmt.Printf("DataSize %d\n", n.DataSize) - fmt.Printf("Flags %v\n", n.Flags) - fmt.Printf("NameSize %d\n", n.NameSize) - fmt.Printf("MimeSize %d\n", n.MimeSize) - fmt.Printf("PairsSize %d\n", n.PairsSize) - fmt.Printf("LastModified %d\n", n.LastModified) - fmt.Printf("AppendAtNs %d\n", n.AppendAtNs) - fmt.Printf("Checksum %d\n", n.Checksum) - - buf := make([]byte, 1024) - if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, 0, int64(n.DataSize)); err != nil { - t.Fatalf("ReadNeedleDataInto: %v", err) - } - - } - -} diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index b51ab5c82..d4f6a55d6 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -132,7 +132,37 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr actualOffset += int64(MaxPossibleVolumeSize) } - return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size) + // read needle data + crc := needle.CRC(0) + r := v.DataBackend + volumeOffset := actualOffset + needleOffset := offset + for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) { + count, err := n.ReadNeedleData(r, volumeOffset, buf, x) + toWrite := min(count, int(needleOffset+size-x)) + if toWrite > 0 { + crc = crc.Update(buf[0:toWrite]) + if _, err = writer.Write(buf[0:toWrite]); err != nil { + return fmt.Errorf("ReadNeedleData write: %v", err) + } + } + if err != nil { + if err == io.EOF { + err = nil + break + } + return fmt.Errorf("ReadNeedleData: %v", err) + } + if count <= 0 { + break + } + } + if needleOffset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) { + // the crc.Value() function is to be deprecated. this double checking is for backward compatible. + return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum) + } + return nil + } func min(x, y int) int {