123 lines
3.4 KiB

6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "google.golang.org/grpc"
  8. "math/rand"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. )
  13. type Location struct {
  14. Url string `json:"url,omitempty"`
  15. PublicUrl string `json:"publicUrl,omitempty"`
  16. DataCenter string `json:"dataCenter,omitempty"`
  17. GrpcPort int `json:"grpcPort,omitempty"`
  18. }
  19. func (l *Location) ServerAddress() pb.ServerAddress {
  20. return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort)
  21. }
  22. type LookupResult struct {
  23. VolumeOrFileId string `json:"volumeOrFileId,omitempty"`
  24. Locations []Location `json:"locations,omitempty"`
  25. Jwt string `json:"jwt,omitempty"`
  26. Error string `json:"error,omitempty"`
  27. }
  28. func (lr *LookupResult) String() string {
  29. return fmt.Sprintf("VolumeOrFileId:%s, Locations:%v, Error:%s", lr.VolumeOrFileId, lr.Locations, lr.Error)
  30. }
  31. var (
  32. vc VidCache // caching of volume locations, re-check if after 10 minutes
  33. )
  34. func LookupFileId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, fileId string) (fullUrl string, jwt string, err error) {
  35. parts := strings.Split(fileId, ",")
  36. if len(parts) != 2 {
  37. return "", jwt, errors.New("Invalid fileId " + fileId)
  38. }
  39. lookup, lookupError := LookupVolumeId(masterFn, grpcDialOption, parts[0])
  40. if lookupError != nil {
  41. return "", jwt, lookupError
  42. }
  43. if len(lookup.Locations) == 0 {
  44. return "", jwt, errors.New("File Not Found")
  45. }
  46. return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, lookup.Jwt, nil
  47. }
  48. func LookupVolumeId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid string) (*LookupResult, error) {
  49. results, err := LookupVolumeIds(masterFn, grpcDialOption, []string{vid})
  50. return results[vid], err
  51. }
  52. // LookupVolumeIds find volume locations by cache and actual lookup
  53. func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]*LookupResult, error) {
  54. ret := make(map[string]*LookupResult)
  55. var unknown_vids []string
  56. //check vid cache first
  57. for _, vid := range vids {
  58. locations, cacheErr := vc.Get(vid)
  59. if cacheErr == nil {
  60. ret[vid] = &LookupResult{VolumeOrFileId: vid, Locations: locations}
  61. } else {
  62. unknown_vids = append(unknown_vids, vid)
  63. }
  64. }
  65. //return success if all volume ids are known
  66. if len(unknown_vids) == 0 {
  67. return ret, nil
  68. }
  69. //only query unknown_vids
  70. err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  71. req := &master_pb.LookupVolumeRequest{
  72. VolumeOrFileIds: unknown_vids,
  73. }
  74. resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
  75. if grpcErr != nil {
  76. return grpcErr
  77. }
  78. //set newly checked vids to cache
  79. for _, vidLocations := range resp.VolumeIdLocations {
  80. var locations []Location
  81. for _, loc := range vidLocations.Locations {
  82. locations = append(locations, Location{
  83. Url: loc.Url,
  84. PublicUrl: loc.PublicUrl,
  85. DataCenter: loc.DataCenter,
  86. GrpcPort: int(loc.GrpcPort),
  87. })
  88. }
  89. if vidLocations.Error != "" {
  90. vc.Set(vidLocations.VolumeOrFileId, locations, 10*time.Minute)
  91. }
  92. ret[vidLocations.VolumeOrFileId] = &LookupResult{
  93. VolumeOrFileId: vidLocations.VolumeOrFileId,
  94. Locations: locations,
  95. Jwt: vidLocations.Auth,
  96. Error: vidLocations.Error,
  97. }
  98. }
  99. return nil
  100. })
  101. if err != nil {
  102. return nil, err
  103. }
  104. return ret, nil
  105. }