You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
3.8 KiB

6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "strings"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. )
  13. type Location struct {
  14. Url string `json:"url,omitempty"`
  15. PublicUrl string `json:"publicUrl,omitempty"`
  16. DataCenter string `json:"dataCenter,omitempty"`
  17. GrpcPort int `json:"grpcPort,omitempty"`
  18. DataInRemote bool `json:"dataInRemote,omitempty"`
  19. }
  20. func (l *Location) ServerAddress() pb.ServerAddress {
  21. return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort)
  22. }
  23. type LookupResult struct {
  24. VolumeOrFileId string `json:"volumeOrFileId,omitempty"`
  25. Locations []Location `json:"locations,omitempty"`
  26. Jwt string `json:"jwt,omitempty"`
  27. Error string `json:"error,omitempty"`
  28. }
  29. func (lr *LookupResult) String() string {
  30. return fmt.Sprintf("VolumeOrFileId:%s, Locations:%v, Error:%s", lr.VolumeOrFileId, lr.Locations, lr.Error)
  31. }
  32. var (
  33. vc VidCache // caching of volume locations, re-check if after 10 minutes
  34. )
  35. func LookupFileId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, fileId string) (fullUrl string, jwt string, err error) {
  36. var location string
  37. var foundLocal bool
  38. parts := strings.Split(fileId, ",")
  39. if len(parts) != 2 {
  40. return "", jwt, errors.New("Invalid fileId " + fileId)
  41. }
  42. lookup, lookupError := LookupVolumeId(masterFn, grpcDialOption, parts[0])
  43. if lookupError != nil {
  44. return "", jwt, lookupError
  45. }
  46. if len(lookup.Locations) == 0 {
  47. return "", jwt, errors.New("File Not Found")
  48. }
  49. for _, loc := range lookup.Locations {
  50. if !loc.DataInRemote {
  51. location = "http://" + loc.Url + "/" + fileId
  52. foundLocal = true
  53. break
  54. }
  55. }
  56. if !foundLocal {
  57. location = "http://" + lookup.Locations[rand.IntN(len(lookup.Locations))].Url + "/" + fileId
  58. }
  59. return location, lookup.Jwt, nil
  60. }
  61. func LookupVolumeId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid string) (*LookupResult, error) {
  62. results, err := LookupVolumeIds(masterFn, grpcDialOption, []string{vid})
  63. return results[vid], err
  64. }
  65. // LookupVolumeIds find volume locations by cache and actual lookup
  66. func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]*LookupResult, error) {
  67. ret := make(map[string]*LookupResult)
  68. var unknown_vids []string
  69. //check vid cache first
  70. for _, vid := range vids {
  71. locations, cacheErr := vc.Get(vid)
  72. if cacheErr == nil {
  73. ret[vid] = &LookupResult{VolumeOrFileId: vid, Locations: locations}
  74. } else {
  75. unknown_vids = append(unknown_vids, vid)
  76. }
  77. }
  78. //return success if all volume ids are known
  79. if len(unknown_vids) == 0 {
  80. return ret, nil
  81. }
  82. //only query unknown_vids
  83. err := WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  84. req := &master_pb.LookupVolumeRequest{
  85. VolumeOrFileIds: unknown_vids,
  86. }
  87. resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
  88. if grpcErr != nil {
  89. return grpcErr
  90. }
  91. //set newly checked vids to cache
  92. for _, vidLocations := range resp.VolumeIdLocations {
  93. var locations []Location
  94. for _, loc := range vidLocations.Locations {
  95. locations = append(locations, Location{
  96. Url: loc.Url,
  97. PublicUrl: loc.PublicUrl,
  98. DataCenter: loc.DataCenter,
  99. GrpcPort: int(loc.GrpcPort),
  100. DataInRemote: loc.DataInRemote,
  101. })
  102. }
  103. if vidLocations.Error != "" {
  104. vc.Set(vidLocations.VolumeOrFileId, locations, 10*time.Minute)
  105. }
  106. ret[vidLocations.VolumeOrFileId] = &LookupResult{
  107. VolumeOrFileId: vidLocations.VolumeOrFileId,
  108. Locations: locations,
  109. Jwt: vidLocations.Auth,
  110. Error: vidLocations.Error,
  111. }
  112. }
  113. return nil
  114. })
  115. if err != nil {
  116. return nil, err
  117. }
  118. return ret, nil
  119. }