You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
4.4 KiB

6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. "google.golang.org/grpc"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/security"
  9. )
  10. type VolumeAssignRequest struct {
  11. Count uint64
  12. Replication string
  13. Collection string
  14. Ttl string
  15. DiskType string
  16. DataCenter string
  17. Rack string
  18. DataNode string
  19. WritableVolumeCount uint32
  20. }
  21. type AssignResultReplica struct {
  22. Url string `json:"url,omitempty"`
  23. PublicUrl string `json:"publicUrl,omitempty"`
  24. }
  25. type AssignResult struct {
  26. Fid string `json:"fid,omitempty"`
  27. Url string `json:"url,omitempty"`
  28. PublicUrl string `json:"publicUrl,omitempty"`
  29. Count uint64 `json:"count,omitempty"`
  30. Error string `json:"error,omitempty"`
  31. Auth security.EncodedJwt `json:"auth,omitempty"`
  32. Replicas []AssignResultReplica `json:"replicas,omitempty"`
  33. }
  34. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  35. var requests []*VolumeAssignRequest
  36. requests = append(requests, primaryRequest)
  37. requests = append(requests, alternativeRequests...)
  38. var lastError error
  39. ret := &AssignResult{}
  40. for i, request := range requests {
  41. if request == nil {
  42. continue
  43. }
  44. lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  45. req := &master_pb.AssignRequest{
  46. Count: request.Count,
  47. Replication: request.Replication,
  48. Collection: request.Collection,
  49. Ttl: request.Ttl,
  50. DiskType: request.DiskType,
  51. DataCenter: request.DataCenter,
  52. Rack: request.Rack,
  53. DataNode: request.DataNode,
  54. WritableVolumeCount: request.WritableVolumeCount,
  55. }
  56. resp, grpcErr := masterClient.Assign(context.Background(), req)
  57. if grpcErr != nil {
  58. return grpcErr
  59. }
  60. ret.Count = resp.Count
  61. ret.Fid = resp.Fid
  62. ret.Url = resp.Url
  63. ret.PublicUrl = resp.PublicUrl
  64. ret.Error = resp.Error
  65. ret.Auth = security.EncodedJwt(resp.Auth)
  66. for _, r := range ret.Replicas {
  67. ret.Replicas = append(ret.Replicas, AssignResultReplica{
  68. Url: r.Url,
  69. PublicUrl: r.PublicUrl,
  70. })
  71. }
  72. if resp.Error != "" {
  73. return fmt.Errorf("assignRequest: %v", resp.Error)
  74. }
  75. return nil
  76. })
  77. if lastError != nil {
  78. continue
  79. }
  80. if ret.Count <= 0 {
  81. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  82. continue
  83. }
  84. break
  85. }
  86. return ret, lastError
  87. }
  88. func LookupJwt(master string, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
  89. WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  90. resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  91. VolumeOrFileIds: []string{fileId},
  92. })
  93. if grpcErr != nil {
  94. return grpcErr
  95. }
  96. if len(resp.VolumeIdLocations) == 0 {
  97. return nil
  98. }
  99. token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
  100. return nil
  101. })
  102. return
  103. }
  104. type StorageOption struct {
  105. Replication string
  106. DiskType string
  107. Collection string
  108. DataCenter string
  109. Rack string
  110. TtlSeconds int32
  111. Fsync bool
  112. VolumeGrowthCount uint32
  113. }
  114. func (so *StorageOption) TtlString() string {
  115. return needle.SecondsToTTL(so.TtlSeconds)
  116. }
  117. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  118. ar = &VolumeAssignRequest{
  119. Count: uint64(count),
  120. Replication: so.Replication,
  121. Collection: so.Collection,
  122. Ttl: so.TtlString(),
  123. DiskType: so.DiskType,
  124. DataCenter: so.DataCenter,
  125. Rack: so.Rack,
  126. WritableVolumeCount: so.VolumeGrowthCount,
  127. }
  128. if so.DataCenter != "" || so.Rack != "" {
  129. altRequest = &VolumeAssignRequest{
  130. Count: uint64(count),
  131. Replication: so.Replication,
  132. Collection: so.Collection,
  133. Ttl: so.TtlString(),
  134. DiskType: so.DiskType,
  135. DataCenter: "",
  136. Rack: "",
  137. WritableVolumeCount: so.VolumeGrowthCount,
  138. }
  139. }
  140. return
  141. }