You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
4.2 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strings"
  8. "sync"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func VolumeId(fileId string) string {
  14. lastCommaIndex := strings.LastIndex(fileId, ",")
  15. if lastCommaIndex > 0 {
  16. return fileId[:lastCommaIndex]
  17. }
  18. return fileId
  19. }
  20. type FilerClient interface {
  21. WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
  22. AdjustedUrl(hostAndPort string) string
  23. }
  24. func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
  25. var vids []string
  26. for _, chunkView := range chunkViews {
  27. vids = append(vids, VolumeId(chunkView.FileId))
  28. }
  29. vid2Locations := make(map[string]*filer_pb.Locations)
  30. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  31. glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
  32. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  33. VolumeIds: vids,
  34. })
  35. if err != nil {
  36. return err
  37. }
  38. vid2Locations = resp.LocationsMap
  39. return nil
  40. })
  41. if err != nil {
  42. return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
  43. }
  44. var wg sync.WaitGroup
  45. for _, chunkView := range chunkViews {
  46. wg.Add(1)
  47. go func(chunkView *ChunkView) {
  48. defer wg.Done()
  49. glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
  50. locations := vid2Locations[VolumeId(chunkView.FileId)]
  51. if locations == nil || len(locations.Locations) == 0 {
  52. glog.V(0).Infof("failed to locate %s", chunkView.FileId)
  53. err = fmt.Errorf("failed to locate %s", chunkView.FileId)
  54. return
  55. }
  56. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  57. var n int64
  58. n, err = util.ReadUrl(
  59. fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId),
  60. chunkView.Offset,
  61. int(chunkView.Size),
  62. buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
  63. !chunkView.IsFullChunk)
  64. if err != nil {
  65. glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
  66. err = fmt.Errorf("failed to read http://%s/%s: %v",
  67. volumeServerAddress, chunkView.FileId, err)
  68. return
  69. }
  70. glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
  71. totalRead += n
  72. }(chunkView)
  73. }
  74. wg.Wait()
  75. return
  76. }
  77. func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
  78. dir, name := fullFilePath.DirAndName()
  79. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  80. request := &filer_pb.LookupDirectoryEntryRequest{
  81. Directory: dir,
  82. Name: name,
  83. }
  84. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  85. resp, err := client.LookupDirectoryEntry(context.Background(), request)
  86. if err != nil {
  87. if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
  88. return nil
  89. }
  90. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  91. return err
  92. }
  93. if resp.Entry == nil {
  94. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  95. return nil
  96. }
  97. entry = resp.Entry
  98. return nil
  99. })
  100. return
  101. }
  102. func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
  103. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  104. lastEntryName := ""
  105. request := &filer_pb.ListEntriesRequest{
  106. Directory: string(fullDirPath),
  107. Prefix: prefix,
  108. StartFromFileName: lastEntryName,
  109. Limit: math.MaxUint32,
  110. }
  111. glog.V(3).Infof("read directory: %v", request)
  112. stream, err := client.ListEntries(context.Background(), request)
  113. if err != nil {
  114. return fmt.Errorf("list %s: %v", fullDirPath, err)
  115. }
  116. var prevEntry *filer_pb.Entry
  117. for {
  118. resp, recvErr := stream.Recv()
  119. if recvErr != nil {
  120. if recvErr == io.EOF {
  121. if prevEntry != nil {
  122. fn(prevEntry, true)
  123. }
  124. break
  125. } else {
  126. return recvErr
  127. }
  128. }
  129. if prevEntry != nil {
  130. fn(prevEntry, false)
  131. }
  132. prevEntry = resp.Entry
  133. }
  134. return nil
  135. })
  136. return
  137. }