You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

102 lines
2.5 KiB

6 years ago
6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/security"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. type VolumeAssignRequest struct {
  12. Count uint64
  13. Replication string
  14. Collection string
  15. Ttl string
  16. DataCenter string
  17. Rack string
  18. DataNode string
  19. }
  20. type AssignResult struct {
  21. Fid string `json:"fid,omitempty"`
  22. Url string `json:"url,omitempty"`
  23. PublicUrl string `json:"publicUrl,omitempty"`
  24. Count uint64 `json:"count,omitempty"`
  25. Error string `json:"error,omitempty"`
  26. Auth security.EncodedJwt `json:"auth,omitempty"`
  27. }
  28. func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  29. var requests []*VolumeAssignRequest
  30. requests = append(requests, primaryRequest)
  31. requests = append(requests, alternativeRequests...)
  32. var lastError error
  33. ret := &AssignResult{}
  34. for i, request := range requests {
  35. if request == nil {
  36. continue
  37. }
  38. lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
  39. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
  40. defer cancel()
  41. req := &master_pb.AssignRequest{
  42. Count: primaryRequest.Count,
  43. Replication: primaryRequest.Replication,
  44. Collection: primaryRequest.Collection,
  45. Ttl: primaryRequest.Ttl,
  46. DataCenter: primaryRequest.DataCenter,
  47. Rack: primaryRequest.Rack,
  48. DataNode: primaryRequest.DataNode,
  49. }
  50. resp, grpcErr := masterClient.Assign(ctx, req)
  51. if grpcErr != nil {
  52. return grpcErr
  53. }
  54. ret.Count = resp.Count
  55. ret.Fid = resp.Fid
  56. ret.Url = resp.Url
  57. ret.PublicUrl = resp.PublicUrl
  58. ret.Error = resp.Error
  59. ret.Auth = security.EncodedJwt(resp.Auth)
  60. return nil
  61. })
  62. if lastError != nil {
  63. continue
  64. }
  65. if ret.Count <= 0 {
  66. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  67. continue
  68. }
  69. }
  70. return ret, lastError
  71. }
  72. func LookupJwt(master string, fileId string) security.EncodedJwt {
  73. tokenStr := ""
  74. if h, e := util.Head(fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)); e == nil {
  75. bearer := h.Get("Authorization")
  76. if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
  77. tokenStr = bearer[7:]
  78. }
  79. }
  80. return security.EncodedJwt(tokenStr)
  81. }