You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
5.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  10. "github.com/chrislusf/seaweedfs/weed/wdclient"
  11. )
  12. type ChunkReadAt struct {
  13. masterClient *wdclient.MasterClient
  14. chunkViews []*ChunkView
  15. lookupFileId func(fileId string) (targetUrl string, err error)
  16. readerLock sync.Mutex
  17. fileSize int64
  18. lastChunkFileId string
  19. lastChunkData []byte
  20. isPrefetching bool
  21. chunkCache chunk_cache.ChunkCache
  22. }
  23. // var _ = io.ReaderAt(&ChunkReadAt{})
  24. type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
  25. func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
  26. return func(fileId string) (targetUrl string, err error) {
  27. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  28. vid := VolumeId(fileId)
  29. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  30. VolumeIds: []string{vid},
  31. })
  32. if err != nil {
  33. return err
  34. }
  35. locations := resp.LocationsMap[vid]
  36. if locations == nil || len(locations.Locations) == 0 {
  37. glog.V(0).Infof("failed to locate %s", fileId)
  38. return fmt.Errorf("failed to locate %s", fileId)
  39. }
  40. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  41. targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
  42. return nil
  43. })
  44. return
  45. }
  46. }
  47. func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
  48. return &ChunkReadAt{
  49. chunkViews: chunkViews,
  50. lookupFileId: LookupFn(filerClient),
  51. chunkCache: chunkCache,
  52. fileSize: fileSize,
  53. }
  54. }
  55. func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
  56. c.readerLock.Lock()
  57. defer c.readerLock.Unlock()
  58. glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
  59. return c.doReadAt(p[n:], offset+int64(n))
  60. }
  61. func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
  62. var buffer []byte
  63. startOffset, remaining := offset, int64(len(p))
  64. var nextChunkView *ChunkView
  65. for i, chunk := range c.chunkViews {
  66. if remaining <= 0 {
  67. break
  68. }
  69. if i+1 < len(c.chunkViews) {
  70. nextChunkView = c.chunkViews[i+1]
  71. } else {
  72. nextChunkView = nil
  73. }
  74. if startOffset < chunk.LogicOffset {
  75. gap := int(chunk.LogicOffset - startOffset)
  76. glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
  77. n += int(min(int64(gap), remaining))
  78. startOffset, remaining = chunk.LogicOffset, remaining-int64(gap)
  79. if remaining <= 0 {
  80. break
  81. }
  82. }
  83. // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  84. chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
  85. if chunkStart >= chunkStop {
  86. continue
  87. }
  88. glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
  89. buffer, err = c.readFromWholeChunkData(chunk, nextChunkView)
  90. if err != nil {
  91. glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
  92. return
  93. }
  94. bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
  95. copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart])
  96. n += copied
  97. startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
  98. }
  99. glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
  100. if err == nil && remaining > 0 && c.fileSize > startOffset {
  101. delta := int(min(remaining, c.fileSize-startOffset))
  102. glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize)
  103. n += delta
  104. }
  105. if err == nil && offset+int64(len(p)) > c.fileSize {
  106. err = io.EOF
  107. }
  108. // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
  109. return
  110. }
  111. func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkView *ChunkView) (chunkData []byte, err error) {
  112. glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
  113. if c.lastChunkFileId == chunkView.FileId {
  114. return c.lastChunkData, nil
  115. }
  116. chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
  117. if chunkData != nil {
  118. glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData)))
  119. } else {
  120. glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId)
  121. chunkData, err = c.doFetchFullChunkData(chunkView)
  122. if err != nil {
  123. return
  124. }
  125. c.chunkCache.SetChunk(chunkView.FileId, chunkData)
  126. }
  127. c.lastChunkData = chunkData
  128. c.lastChunkFileId = chunkView.FileId
  129. if nextChunkView != nil && !c.isPrefetching {
  130. c.isPrefetching = true
  131. go func() {
  132. if chunkData, err := c.doFetchFullChunkData(nextChunkView); err == nil {
  133. c.chunkCache.SetChunk(nextChunkView.FileId, chunkData)
  134. }
  135. c.isPrefetching = false
  136. }()
  137. }
  138. return
  139. }
  140. func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
  141. data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
  142. return data, err
  143. }