You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
4.2 KiB

5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strings"
  8. "sync"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func VolumeId(fileId string) string {
  14. lastCommaIndex := strings.LastIndex(fileId, ",")
  15. if lastCommaIndex > 0 {
  16. return fileId[:lastCommaIndex]
  17. }
  18. return fileId
  19. }
  20. type FilerClient interface {
  21. WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error
  22. }
  23. func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
  24. var vids []string
  25. for _, chunkView := range chunkViews {
  26. vids = append(vids, VolumeId(chunkView.FileId))
  27. }
  28. vid2Locations := make(map[string]*filer_pb.Locations)
  29. err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  30. glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
  31. resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
  32. VolumeIds: vids,
  33. })
  34. if err != nil {
  35. return err
  36. }
  37. vid2Locations = resp.LocationsMap
  38. return nil
  39. })
  40. if err != nil {
  41. return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
  42. }
  43. var wg sync.WaitGroup
  44. for _, chunkView := range chunkViews {
  45. wg.Add(1)
  46. go func(chunkView *ChunkView) {
  47. defer wg.Done()
  48. glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
  49. locations := vid2Locations[VolumeId(chunkView.FileId)]
  50. if locations == nil || len(locations.Locations) == 0 {
  51. glog.V(0).Infof("failed to locate %s", chunkView.FileId)
  52. err = fmt.Errorf("failed to locate %s", chunkView.FileId)
  53. return
  54. }
  55. var n int64
  56. n, err = util.ReadUrl(
  57. fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
  58. chunkView.Offset,
  59. int(chunkView.Size),
  60. buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
  61. !chunkView.IsFullChunk)
  62. if err != nil {
  63. glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
  64. err = fmt.Errorf("failed to read http://%s/%s: %v",
  65. locations.Locations[0].Url, chunkView.FileId, err)
  66. return
  67. }
  68. glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
  69. totalRead += n
  70. }(chunkView)
  71. }
  72. wg.Wait()
  73. return
  74. }
  75. func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) {
  76. dir, name := FullPath(fullFilePath).DirAndName()
  77. err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  78. request := &filer_pb.LookupDirectoryEntryRequest{
  79. Directory: dir,
  80. Name: name,
  81. }
  82. glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  83. resp, err := client.LookupDirectoryEntry(ctx, request)
  84. if err != nil {
  85. if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
  86. return nil
  87. }
  88. glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err)
  89. return err
  90. }
  91. if resp.Entry == nil {
  92. glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  93. return nil
  94. }
  95. entry = resp.Entry
  96. return nil
  97. })
  98. return
  99. }
  100. func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
  101. err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  102. lastEntryName := ""
  103. request := &filer_pb.ListEntriesRequest{
  104. Directory: fullDirPath,
  105. Prefix: prefix,
  106. StartFromFileName: lastEntryName,
  107. Limit: math.MaxUint32,
  108. }
  109. glog.V(3).Infof("read directory: %v", request)
  110. stream, err := client.ListEntries(ctx, request)
  111. if err != nil {
  112. return fmt.Errorf("list %s: %v", fullDirPath, err)
  113. }
  114. var prevEntry *filer_pb.Entry
  115. for {
  116. resp, recvErr := stream.Recv()
  117. if recvErr != nil {
  118. if recvErr == io.EOF {
  119. if prevEntry != nil {
  120. fn(prevEntry, true)
  121. }
  122. break
  123. } else {
  124. return recvErr
  125. }
  126. }
  127. if prevEntry != nil {
  128. fn(prevEntry, false)
  129. }
  130. prevEntry = resp.Entry
  131. }
  132. return nil
  133. })
  134. return
  135. }