You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer2
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. )
  14. type ChunkReadAt struct {
  15. masterClient *wdclient.MasterClient
  16. chunkViews []*ChunkView
  17. buffer []byte
  18. bufferOffset int64
  19. lookupFileId func(fileId string) (targetUrl string, err error)
  20. readerLock sync.Mutex
  21. chunkCache *chunk_cache.ChunkCache
  22. }
  23. // var _ = io.ReaderAt(&ChunkReadAt{})
  24. type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
  25. func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
  26. return func(fileId string) (targetUrl string, err error) {
  27. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  28. vid := VolumeId(fileId)
  29. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  30. VolumeIds: []string{vid},
  31. })
  32. if err != nil {
  33. return err
  34. }
  35. locations := resp.LocationsMap[vid]
  36. if locations == nil || len(locations.Locations) == 0 {
  37. glog.V(0).Infof("failed to locate %s", fileId)
  38. return fmt.Errorf("failed to locate %s", fileId)
  39. }
  40. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  41. targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
  42. return nil
  43. })
  44. return
  45. }
  46. }
  47. func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
  48. return &ChunkReadAt{
  49. chunkViews: chunkViews,
  50. lookupFileId: LookupFn(filerClient),
  51. bufferOffset: -1,
  52. chunkCache: chunkCache,
  53. }
  54. }
  55. func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
  56. c.readerLock.Lock()
  57. defer c.readerLock.Unlock()
  58. for n < len(p) && err == nil {
  59. readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
  60. n += readCount
  61. err = readErr
  62. if readCount == 0 {
  63. return n, io.EOF
  64. }
  65. }
  66. return
  67. }
  68. func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
  69. var found bool
  70. for _, chunk := range c.chunkViews {
  71. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  72. found = true
  73. if c.bufferOffset != chunk.LogicOffset {
  74. c.buffer, err = c.fetchChunkData(chunk)
  75. if err != nil {
  76. glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
  77. }
  78. c.bufferOffset = chunk.LogicOffset
  79. }
  80. break
  81. }
  82. }
  83. if !found {
  84. return 0, io.EOF
  85. }
  86. if err == nil {
  87. n = copy(p, c.buffer[offset-c.bufferOffset:])
  88. }
  89. // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  90. return
  91. }
  92. func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err error) {
  93. glog.V(4).Infof("fetchChunkData %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  94. hasDataInCache := false
  95. chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
  96. if chunkData != nil {
  97. glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  98. hasDataInCache = true
  99. } else {
  100. chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
  101. if err != nil {
  102. return nil, err
  103. }
  104. }
  105. if int64(len(chunkData)) < chunkView.Offset+int64(chunkView.Size) {
  106. glog.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData))
  107. return nil, fmt.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData))
  108. }
  109. data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)]
  110. if !hasDataInCache {
  111. c.chunkCache.SetChunk(chunkView.FileId, chunkData)
  112. }
  113. return data, nil
  114. }
  115. func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  116. urlString, err := c.lookupFileId(fileId)
  117. if err != nil {
  118. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
  119. return nil, err
  120. }
  121. var buffer bytes.Buffer
  122. err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
  123. buffer.Write(data)
  124. })
  125. if err != nil {
  126. glog.V(0).Infof("read %s failed, err: %v", fileId, err)
  127. return nil, err
  128. }
  129. return buffer.Bytes(), nil
  130. }