You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
4.1 KiB

6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. "google.golang.org/grpc"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/security"
  9. )
  10. type VolumeAssignRequest struct {
  11. Count uint64
  12. Replication string
  13. Collection string
  14. Ttl string
  15. DiskType string
  16. DataCenter string
  17. Rack string
  18. DataNode string
  19. WritableVolumeCount uint32
  20. }
  21. type AssignResult struct {
  22. Fid string `json:"fid,omitempty"`
  23. Url string `json:"url,omitempty"`
  24. PublicUrl string `json:"publicUrl,omitempty"`
  25. Count uint64 `json:"count,omitempty"`
  26. Error string `json:"error,omitempty"`
  27. Auth security.EncodedJwt `json:"auth,omitempty"`
  28. }
  29. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  30. var requests []*VolumeAssignRequest
  31. requests = append(requests, primaryRequest)
  32. requests = append(requests, alternativeRequests...)
  33. var lastError error
  34. ret := &AssignResult{}
  35. for i, request := range requests {
  36. if request == nil {
  37. continue
  38. }
  39. lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  40. req := &master_pb.AssignRequest{
  41. Count: request.Count,
  42. Replication: request.Replication,
  43. Collection: request.Collection,
  44. Ttl: request.Ttl,
  45. DiskType: request.DiskType,
  46. DataCenter: request.DataCenter,
  47. Rack: request.Rack,
  48. DataNode: request.DataNode,
  49. WritableVolumeCount: request.WritableVolumeCount,
  50. }
  51. resp, grpcErr := masterClient.Assign(context.Background(), req)
  52. if grpcErr != nil {
  53. return grpcErr
  54. }
  55. ret.Count = resp.Count
  56. ret.Fid = resp.Fid
  57. ret.Url = resp.Url
  58. ret.PublicUrl = resp.PublicUrl
  59. ret.Error = resp.Error
  60. ret.Auth = security.EncodedJwt(resp.Auth)
  61. if resp.Error != "" {
  62. return fmt.Errorf("assignRequest: %v", resp.Error)
  63. }
  64. return nil
  65. })
  66. if lastError != nil {
  67. continue
  68. }
  69. if ret.Count <= 0 {
  70. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  71. continue
  72. }
  73. break
  74. }
  75. return ret, lastError
  76. }
  77. func LookupJwt(master string, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
  78. WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  79. resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  80. VolumeOrFileIds: []string{fileId},
  81. })
  82. if grpcErr != nil {
  83. return grpcErr
  84. }
  85. if len(resp.VolumeIdLocations) == 0 {
  86. return nil
  87. }
  88. token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
  89. return nil
  90. })
  91. return
  92. }
  93. type StorageOption struct {
  94. Replication string
  95. DiskType string
  96. Collection string
  97. DataCenter string
  98. Rack string
  99. TtlSeconds int32
  100. Fsync bool
  101. VolumeGrowthCount uint32
  102. }
  103. func (so *StorageOption) TtlString() string {
  104. return needle.SecondsToTTL(so.TtlSeconds)
  105. }
  106. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  107. ar = &VolumeAssignRequest{
  108. Count: uint64(count),
  109. Replication: so.Replication,
  110. Collection: so.Collection,
  111. Ttl: so.TtlString(),
  112. DiskType: so.DiskType,
  113. DataCenter: so.DataCenter,
  114. Rack: so.Rack,
  115. WritableVolumeCount: so.VolumeGrowthCount,
  116. }
  117. if so.DataCenter != "" || so.Rack != "" {
  118. altRequest = &VolumeAssignRequest{
  119. Count: uint64(count),
  120. Replication: so.Replication,
  121. Collection: so.Collection,
  122. Ttl: so.TtlString(),
  123. DiskType: so.DiskType,
  124. DataCenter: "",
  125. Rack: "",
  126. WritableVolumeCount: so.VolumeGrowthCount,
  127. }
  128. }
  129. return
  130. }