You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

110 lines
3.3 KiB

  1. package needle
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  6. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "io"
  9. )
  10. // ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer
  11. func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64) (err error) {
  12. crc := CRC(0)
  13. for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) {
  14. count, err := n.ReadNeedleData(r, volumeOffset, buf, x)
  15. if count > 0 {
  16. crc = crc.Update(buf[0:count])
  17. if _, err = writer.Write(buf[0:count]); err != nil {
  18. return fmt.Errorf("ReadNeedleData write: %v", err)
  19. }
  20. }
  21. if err != nil {
  22. if err == io.EOF {
  23. err = nil
  24. break
  25. }
  26. return fmt.Errorf("ReadNeedleData: %v", err)
  27. }
  28. if count <= 0 {
  29. break
  30. }
  31. }
  32. if needleOffset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) {
  33. // the crc.Value() function is to be deprecated. this double checking is for backward compatible.
  34. return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum)
  35. }
  36. return nil
  37. }
  38. // ReadNeedleData uses a needle without n.Data to read the content
  39. // volumeOffset: the offset within the volume
  40. // needleOffset: the offset within the needle Data
  41. func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) {
  42. sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset)
  43. if sizeToRead <= 0 {
  44. return 0, io.EOF
  45. }
  46. startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
  47. count, err = r.ReadAt(data[:sizeToRead], startOffset)
  48. if err != nil {
  49. fileSize, _, _ := r.GetStat()
  50. glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
  51. }
  52. return
  53. }
  54. // ReadNeedleMeta fills all metadata except the n.Data
  55. func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
  56. bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
  57. count, err := r.ReadAt(bytes, offset)
  58. if count != NeedleHeaderSize+DataSizeSize || err != nil {
  59. return err
  60. }
  61. n.ParseNeedleHeader(bytes)
  62. if n.Size != size {
  63. if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
  64. return ErrorSizeMismatch
  65. }
  66. }
  67. n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
  68. startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
  69. dataSize := GetActualSize(size, version)
  70. stopOffset := offset + dataSize
  71. metaSize := stopOffset - startOffset
  72. metaSlice := make([]byte, int(metaSize))
  73. count, err = r.ReadAt(metaSlice, startOffset)
  74. if err != nil && int64(count) == metaSize {
  75. err = nil
  76. }
  77. if err != nil {
  78. return err
  79. }
  80. var index int
  81. index, err = n.readNeedleDataVersion2NonData(metaSlice)
  82. n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize]))
  83. if version == Version3 {
  84. n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
  85. }
  86. return err
  87. }
  88. func min(x, y int64) int64 {
  89. if x < y {
  90. return x
  91. }
  92. return y
  93. }