You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
3.1 KiB

  1. package filer2
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/chrislusf/seaweedfs/weed/wdclient"
  12. )
  13. type ChunkReadAt struct {
  14. masterClient *wdclient.MasterClient
  15. chunkViews []*ChunkView
  16. buffer []byte
  17. bufferOffset int64
  18. lookupFileId func(fileId string) (targetUrl string, err error)
  19. readerLock sync.Mutex
  20. }
  21. // var _ = io.ReaderAt(&ChunkReadAt{})
  22. func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView) *ChunkReadAt {
  23. return &ChunkReadAt{
  24. chunkViews: chunkViews,
  25. lookupFileId: func(fileId string) (targetUrl string, err error) {
  26. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  27. vid := VolumeId(fileId)
  28. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  29. VolumeIds: []string{vid},
  30. })
  31. if err != nil {
  32. return err
  33. }
  34. locations := resp.LocationsMap[vid]
  35. if locations == nil || len(locations.Locations) == 0 {
  36. glog.V(0).Infof("failed to locate %s", fileId)
  37. return fmt.Errorf("failed to locate %s", fileId)
  38. }
  39. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  40. targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
  41. return nil
  42. })
  43. return
  44. },
  45. bufferOffset: -1,
  46. }
  47. }
  48. func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
  49. c.readerLock.Lock()
  50. defer c.readerLock.Unlock()
  51. for n < len(p) && err == nil {
  52. readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
  53. n += readCount
  54. err = readErr
  55. if readCount == 0 {
  56. return n, nil
  57. }
  58. }
  59. return
  60. }
  61. func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
  62. var found bool
  63. for _, chunk := range c.chunkViews {
  64. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  65. found = true
  66. if c.bufferOffset != chunk.LogicOffset {
  67. c.fetchChunkToBuffer(chunk)
  68. }
  69. break
  70. }
  71. }
  72. if !found {
  73. return 0, io.EOF
  74. }
  75. n = copy(p, c.buffer[offset-c.bufferOffset:])
  76. // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  77. return
  78. }
  79. func (c *ChunkReadAt) fetchChunkToBuffer(chunkView *ChunkView) error {
  80. // fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  81. urlString, err := c.lookupFileId(chunkView.FileId)
  82. if err != nil {
  83. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  84. return err
  85. }
  86. var buffer bytes.Buffer
  87. err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
  88. buffer.Write(data)
  89. })
  90. if err != nil {
  91. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  92. return err
  93. }
  94. c.buffer = buffer.Bytes()
  95. c.bufferOffset = chunkView.LogicOffset
  96. glog.V(3).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  97. return nil
  98. }