You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
4.6 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  10. "github.com/chrislusf/seaweedfs/weed/wdclient"
  11. )
  12. type ChunkReadAt struct {
  13. masterClient *wdclient.MasterClient
  14. chunkViews []*ChunkView
  15. buffer []byte
  16. bufferFileId string
  17. lookupFileId func(fileId string) (targetUrl string, err error)
  18. readerLock sync.Mutex
  19. fileSize int64
  20. chunkCache *chunk_cache.ChunkCache
  21. }
  22. // var _ = io.ReaderAt(&ChunkReadAt{})
  23. type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
  24. func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
  25. return func(fileId string) (targetUrl string, err error) {
  26. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  27. vid := VolumeId(fileId)
  28. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  29. VolumeIds: []string{vid},
  30. })
  31. if err != nil {
  32. return err
  33. }
  34. locations := resp.LocationsMap[vid]
  35. if locations == nil || len(locations.Locations) == 0 {
  36. glog.V(0).Infof("failed to locate %s", fileId)
  37. return fmt.Errorf("failed to locate %s", fileId)
  38. }
  39. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  40. targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
  41. return nil
  42. })
  43. return
  44. }
  45. }
  46. func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
  47. return &ChunkReadAt{
  48. chunkViews: chunkViews,
  49. lookupFileId: LookupFn(filerClient),
  50. chunkCache: chunkCache,
  51. fileSize: fileSize,
  52. }
  53. }
  54. func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
  55. c.readerLock.Lock()
  56. defer c.readerLock.Unlock()
  57. for n < len(p) && err == nil {
  58. readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
  59. n += readCount
  60. err = readErr
  61. }
  62. return
  63. }
  64. func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
  65. var found bool
  66. var chunkStart, chunkStop int64
  67. var chunkView *ChunkView
  68. for _, chunk := range c.chunkViews {
  69. // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  70. chunkStart, chunkStop = max(chunk.LogicOffset, offset), min(chunk.LogicOffset+int64(chunk.Size), offset+int64(len(p)))
  71. chunkView = chunk
  72. if chunkStart < chunkStop {
  73. // fmt.Printf(">>> found [%d,%d), chunk %s [%d,%d)\n", chunkStart, chunkStop, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  74. found = true
  75. c.buffer, err = c.fetchWholeChunkData(chunk)
  76. if err != nil {
  77. glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
  78. }
  79. c.bufferFileId = chunk.FileId
  80. break
  81. }
  82. }
  83. // fmt.Printf("> doReadAt [%d,%d), buffer %s:%d, found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferFileId, int64(len(c.buffer)), found, err)
  84. if err != nil {
  85. return
  86. }
  87. if found {
  88. bufferOffset := chunkStart - chunkView.LogicOffset + chunkView.Offset
  89. /*
  90. skipped, copied := chunkStart-offset, chunkStop-chunkStart
  91. fmt.Printf("+++ copy %d+%d=%d fill:[%d, %d) p[%d,%d) <- buffer:[%d,%d) buffer %s:%d\nchunkView:%+v\n\n",
  92. skipped, copied, skipped+copied,
  93. chunkStart, chunkStop,
  94. chunkStart-offset, chunkStop-offset, bufferOffset, bufferOffset+chunkStop-chunkStart,
  95. c.bufferFileId, len(c.buffer),
  96. chunkView)
  97. */
  98. n = int(chunkStart-offset) +
  99. copy(p[chunkStart-offset:chunkStop-offset], c.buffer[bufferOffset:bufferOffset+chunkStop-chunkStart])
  100. return
  101. }
  102. n = len(p)
  103. if offset+int64(n) >= c.fileSize {
  104. err = io.EOF
  105. }
  106. // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
  107. return
  108. }
  109. func (c *ChunkReadAt) fetchWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) {
  110. glog.V(4).Infof("fetchWholeChunkData %s offset %d [%d,%d)\n", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  111. chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
  112. if chunkData != nil {
  113. glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset + int64(len(chunkData)))
  114. } else {
  115. chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
  116. if err != nil {
  117. return
  118. }
  119. c.chunkCache.SetChunk(chunkView.FileId, chunkData)
  120. }
  121. return
  122. }
  123. func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  124. return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
  125. }