You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
3.6 KiB

  1. package filesys
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "github.com/chrislusf/seaweedfs/weed/filer2"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/pb_cache"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/chrislusf/seaweedfs/weed/wdclient"
  14. )
  15. type ChunkReadAt struct {
  16. masterClient *wdclient.MasterClient
  17. chunkViews []*filer2.ChunkView
  18. buffer []byte
  19. bufferOffset int64
  20. lookupFileId func(fileId string) (targetUrl string, err error)
  21. readerLock sync.Mutex
  22. chunkCache *pb_cache.ChunkCache
  23. }
  24. // var _ = io.ReaderAt(&ChunkReadAt{})
  25. func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*filer2.ChunkView, chunkCache *pb_cache.ChunkCache) *ChunkReadAt {
  26. return &ChunkReadAt{
  27. chunkViews: chunkViews,
  28. lookupFileId: func(fileId string) (targetUrl string, err error) {
  29. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  30. vid := filer2.VolumeId(fileId)
  31. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  32. VolumeIds: []string{vid},
  33. })
  34. if err != nil {
  35. return err
  36. }
  37. locations := resp.LocationsMap[vid]
  38. if locations == nil || len(locations.Locations) == 0 {
  39. glog.V(0).Infof("failed to locate %s", fileId)
  40. return fmt.Errorf("failed to locate %s", fileId)
  41. }
  42. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  43. targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
  44. return nil
  45. })
  46. return
  47. },
  48. bufferOffset: -1,
  49. }
  50. }
  51. func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
  52. c.readerLock.Lock()
  53. defer c.readerLock.Unlock()
  54. for n < len(p) && err == nil {
  55. readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
  56. n += readCount
  57. err = readErr
  58. if readCount == 0 {
  59. return n, nil
  60. }
  61. }
  62. return
  63. }
  64. func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
  65. var found bool
  66. for _, chunk := range c.chunkViews {
  67. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  68. found = true
  69. if c.bufferOffset != chunk.LogicOffset {
  70. c.buffer, err = c.fetchChunkData(chunk)
  71. c.bufferOffset = chunk.LogicOffset
  72. }
  73. break
  74. }
  75. }
  76. if !found {
  77. return 0, io.EOF
  78. }
  79. n = copy(p, c.buffer[offset-c.bufferOffset:])
  80. // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  81. return
  82. }
  83. func (c *ChunkReadAt) fetchChunkData(chunkView *filer2.ChunkView) ([]byte, error) {
  84. fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  85. chunkData := c.chunkCache.GetChunk(chunkView.FileId)
  86. if chunkData != nil {
  87. glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  88. return chunkData, nil
  89. }
  90. urlString, err := c.lookupFileId(chunkView.FileId)
  91. if err != nil {
  92. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  93. return nil, err
  94. }
  95. var buffer bytes.Buffer
  96. err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
  97. buffer.Write(data)
  98. })
  99. if err != nil {
  100. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  101. return nil, err
  102. }
  103. glog.V(3).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  104. chunkData = buffer.Bytes()
  105. c.chunkCache.SetChunk(chunkView.FileId, chunkData)
  106. return chunkData, nil
  107. }