You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
3.1 KiB

  1. package needle
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  6. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "io"
  9. )
  10. // ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer
  11. func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, offset int64, buf []byte, writer io.Writer, expectedChecksumValue uint32) (err error) {
  12. crc := CRC(0)
  13. for x := 0; ; x += len(buf) {
  14. count, err := n.ReadNeedleData(r, offset, buf, int64(x))
  15. if err != nil {
  16. if err == io.EOF {
  17. break
  18. }
  19. return fmt.Errorf("ReadNeedleData: %v", err)
  20. }
  21. if count > 0 {
  22. crc = crc.Update(buf[0:count])
  23. if _, err = writer.Write(buf[0:count]); err != nil {
  24. return fmt.Errorf("ReadNeedleData write: %v", err)
  25. }
  26. } else {
  27. break
  28. }
  29. }
  30. if expectedChecksumValue != crc.Value() {
  31. return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc.Value(), expectedChecksumValue)
  32. }
  33. return nil
  34. }
  35. // ReadNeedleData uses a needle without n.Data to read the content
  36. // volumeOffset: the offset within the volume
  37. // needleOffset: the offset within the needle Data
  38. func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) {
  39. sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset)
  40. if sizeToRead <= 0 {
  41. return 0, io.EOF
  42. }
  43. startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
  44. count, err = r.ReadAt(data[:sizeToRead], startOffset)
  45. if err != nil {
  46. fileSize, _, _ := r.GetStat()
  47. glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
  48. }
  49. return
  50. }
  51. // ReadNeedleMeta fills all metadata except the n.Data
  52. func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (checksumValue uint32, err error) {
  53. bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
  54. count, err := r.ReadAt(bytes, offset)
  55. if count != NeedleHeaderSize+DataSizeSize || err != nil {
  56. return 0, err
  57. }
  58. n.ParseNeedleHeader(bytes)
  59. n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
  60. startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
  61. dataSize := GetActualSize(size, version)
  62. stopOffset := offset + dataSize
  63. metaSize := stopOffset - startOffset
  64. fmt.Printf("offset %d dataSize %d\n", offset, dataSize)
  65. fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize)
  66. metaSlice := make([]byte, int(metaSize))
  67. count, err = r.ReadAt(metaSlice, startOffset)
  68. if err != nil && int64(count) == metaSize {
  69. err = nil
  70. }
  71. if err != nil {
  72. return 0, err
  73. }
  74. var index int
  75. index, err = n.readNeedleDataVersion2NonData(metaSlice)
  76. checksumValue = util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize])
  77. if version == Version3 {
  78. n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
  79. }
  80. return checksumValue, err
  81. }
  82. func min(x, y int64) int64 {
  83. if x < y {
  84. return x
  85. }
  86. return y
  87. }