You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.8 KiB

  1. package storage
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "io"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
  14. v.dataFileAccessLock.Lock()
  15. defer v.dataFileAccessLock.Unlock()
  16. df, ok := v.DataBackend.(*backend.DiskFile)
  17. if !ok {
  18. return fmt.Errorf("unexpected volume backend")
  19. }
  20. offset, _, _ := v.DataBackend.GetStat()
  21. header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
  22. CookieToBytes(header[0:CookieSize], n.Cookie)
  23. NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
  24. n.Size = 4 + Size(dataSize) + 1
  25. SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
  26. n.DataSize = dataSize
  27. // needle header
  28. df.Write(header[0:NeedleHeaderSize])
  29. // data size and data
  30. util.Uint32toBytes(header[0:4], n.DataSize)
  31. df.Write(header[0:4])
  32. // write and calculate CRC
  33. crcWriter := needle.NewCRCwriter(df)
  34. io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
  35. // flags
  36. util.Uint8toBytes(header[0:1], n.Flags)
  37. df.Write(header[0:1])
  38. // data checksum
  39. util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
  40. df.Write(header[0:needle.NeedleChecksumSize])
  41. // write timestamp, padding
  42. n.AppendAtNs = uint64(time.Now().UnixNano())
  43. util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
  44. padding := needle.PaddingLength(n.Size, needle.Version3)
  45. df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
  46. // add to needle map
  47. if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
  48. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  49. }
  50. return
  51. }
  52. func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
  53. v.dataFileAccessLock.Lock()
  54. defer v.dataFileAccessLock.Unlock()
  55. nv, ok := v.nm.Get(n.Id)
  56. if !ok || nv.Offset.IsZero() {
  57. return ErrorNotFound
  58. }
  59. sr := &StreamReader{
  60. readerAt: v.DataBackend,
  61. offset: nv.Offset.ToActualOffset(),
  62. }
  63. bufReader := bufio.NewReader(sr)
  64. bufReader.Discard(NeedleHeaderSize)
  65. sizeBuf := make([]byte, 4)
  66. bufReader.Read(sizeBuf)
  67. if _, err = writer.Write(sizeBuf); err != nil {
  68. return err
  69. }
  70. dataSize := util.BytesToUint32(sizeBuf)
  71. _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
  72. return
  73. }
  74. type StreamReader struct {
  75. offset int64
  76. readerAt io.ReaderAt
  77. }
  78. func (sr *StreamReader) Read(p []byte) (n int, err error) {
  79. n, err = sr.readerAt.ReadAt(p, sr.offset)
  80. if err != nil {
  81. return
  82. }
  83. sr.offset += int64(n)
  84. return
  85. }