You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

281 lines
7.9 KiB

6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/security"
  8. "github.com/seaweedfs/seaweedfs/weed/stats"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. "google.golang.org/grpc"
  11. "sync"
  12. )
  13. type VolumeAssignRequest struct {
  14. Count uint64
  15. Replication string
  16. Collection string
  17. Ttl string
  18. DiskType string
  19. DataCenter string
  20. Rack string
  21. DataNode string
  22. WritableVolumeCount uint32
  23. }
  24. type AssignResult struct {
  25. Fid string `json:"fid,omitempty"`
  26. Url string `json:"url,omitempty"`
  27. PublicUrl string `json:"publicUrl,omitempty"`
  28. GrpcPort int `json:"grpcPort,omitempty"`
  29. Count uint64 `json:"count,omitempty"`
  30. Error string `json:"error,omitempty"`
  31. Auth security.EncodedJwt `json:"auth,omitempty"`
  32. Replicas []Location `json:"replicas,omitempty"`
  33. }
  34. // This is a proxy to the master server, only for assigning volume ids.
  35. // It runs via grpc to the master server in streaming mode.
  36. // The connection to the master would only be re-established when the last connection has error.
  37. type AssignProxy struct {
  38. grpcConnection *grpc.ClientConn
  39. pool chan *singleThreadAssignProxy
  40. }
  41. func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
  42. ap = &AssignProxy{
  43. pool: make(chan *singleThreadAssignProxy, concurrency),
  44. }
  45. ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption)
  46. if err != nil {
  47. return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err)
  48. }
  49. for i := 0; i < concurrency; i++ {
  50. ap.pool <- &singleThreadAssignProxy{}
  51. }
  52. return ap, nil
  53. }
  54. func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
  55. p := <-ap.pool
  56. defer func() {
  57. ap.pool <- p
  58. }()
  59. return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
  60. }
  61. type singleThreadAssignProxy struct {
  62. assignClient master_pb.Seaweed_StreamAssignClient
  63. sync.Mutex
  64. }
  65. func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
  66. ap.Lock()
  67. defer ap.Unlock()
  68. if ap.assignClient == nil {
  69. client := master_pb.NewSeaweedClient(grpcConnection)
  70. ap.assignClient, err = client.StreamAssign(context.Background())
  71. if err != nil {
  72. ap.assignClient = nil
  73. return nil, fmt.Errorf("fail to create stream assign client: %v", err)
  74. }
  75. }
  76. var requests []*VolumeAssignRequest
  77. requests = append(requests, primaryRequest)
  78. requests = append(requests, alternativeRequests...)
  79. ret = &AssignResult{}
  80. for _, request := range requests {
  81. if request == nil {
  82. continue
  83. }
  84. req := &master_pb.AssignRequest{
  85. Count: request.Count,
  86. Replication: request.Replication,
  87. Collection: request.Collection,
  88. Ttl: request.Ttl,
  89. DiskType: request.DiskType,
  90. DataCenter: request.DataCenter,
  91. Rack: request.Rack,
  92. DataNode: request.DataNode,
  93. WritableVolumeCount: request.WritableVolumeCount,
  94. }
  95. if err = ap.assignClient.Send(req); err != nil {
  96. return nil, fmt.Errorf("StreamAssignSend: %v", err)
  97. }
  98. resp, grpcErr := ap.assignClient.Recv()
  99. if grpcErr != nil {
  100. return nil, grpcErr
  101. }
  102. if resp.Error != "" {
  103. return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
  104. }
  105. ret.Count = resp.Count
  106. ret.Fid = resp.Fid
  107. ret.Url = resp.Location.Url
  108. ret.PublicUrl = resp.Location.PublicUrl
  109. ret.GrpcPort = int(resp.Location.GrpcPort)
  110. ret.Error = resp.Error
  111. ret.Auth = security.EncodedJwt(resp.Auth)
  112. for _, r := range resp.Replicas {
  113. ret.Replicas = append(ret.Replicas, Location{
  114. Url: r.Url,
  115. PublicUrl: r.PublicUrl,
  116. DataCenter: r.DataCenter,
  117. })
  118. }
  119. if ret.Count <= 0 {
  120. continue
  121. }
  122. break
  123. }
  124. return
  125. }
  126. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  127. var requests []*VolumeAssignRequest
  128. requests = append(requests, primaryRequest)
  129. requests = append(requests, alternativeRequests...)
  130. var lastError error
  131. ret := &AssignResult{}
  132. for i, request := range requests {
  133. if request == nil {
  134. continue
  135. }
  136. lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  137. req := &master_pb.AssignRequest{
  138. Count: request.Count,
  139. Replication: request.Replication,
  140. Collection: request.Collection,
  141. Ttl: request.Ttl,
  142. DiskType: request.DiskType,
  143. DataCenter: request.DataCenter,
  144. Rack: request.Rack,
  145. DataNode: request.DataNode,
  146. WritableVolumeCount: request.WritableVolumeCount,
  147. }
  148. resp, grpcErr := masterClient.Assign(context.Background(), req)
  149. if grpcErr != nil {
  150. return grpcErr
  151. }
  152. if resp.Error != "" {
  153. return fmt.Errorf("assignRequest: %v", resp.Error)
  154. }
  155. ret.Count = resp.Count
  156. ret.Fid = resp.Fid
  157. ret.Url = resp.Location.Url
  158. ret.PublicUrl = resp.Location.PublicUrl
  159. ret.GrpcPort = int(resp.Location.GrpcPort)
  160. ret.Error = resp.Error
  161. ret.Auth = security.EncodedJwt(resp.Auth)
  162. for _, r := range resp.Replicas {
  163. ret.Replicas = append(ret.Replicas, Location{
  164. Url: r.Url,
  165. PublicUrl: r.PublicUrl,
  166. DataCenter: r.DataCenter,
  167. })
  168. }
  169. return nil
  170. })
  171. if lastError != nil {
  172. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc()
  173. continue
  174. }
  175. if ret.Count <= 0 {
  176. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  177. continue
  178. }
  179. break
  180. }
  181. return ret, lastError
  182. }
  183. func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
  184. WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  185. resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  186. VolumeOrFileIds: []string{fileId},
  187. })
  188. if grpcErr != nil {
  189. return grpcErr
  190. }
  191. if len(resp.VolumeIdLocations) == 0 {
  192. return nil
  193. }
  194. token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
  195. return nil
  196. })
  197. return
  198. }
  199. type StorageOption struct {
  200. Replication string
  201. DiskType string
  202. Collection string
  203. DataCenter string
  204. Rack string
  205. DataNode string
  206. TtlSeconds int32
  207. VolumeGrowthCount uint32
  208. MaxFileNameLength uint32
  209. Fsync bool
  210. SaveInside bool
  211. }
  212. func (so *StorageOption) TtlString() string {
  213. return needle.SecondsToTTL(so.TtlSeconds)
  214. }
  215. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  216. ar = &VolumeAssignRequest{
  217. Count: uint64(count),
  218. Replication: so.Replication,
  219. Collection: so.Collection,
  220. Ttl: so.TtlString(),
  221. DiskType: so.DiskType,
  222. DataCenter: so.DataCenter,
  223. Rack: so.Rack,
  224. DataNode: so.DataNode,
  225. WritableVolumeCount: so.VolumeGrowthCount,
  226. }
  227. if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
  228. ar.WritableVolumeCount = uint32(count)
  229. altRequest = &VolumeAssignRequest{
  230. Count: uint64(count),
  231. Replication: so.Replication,
  232. Collection: so.Collection,
  233. Ttl: so.TtlString(),
  234. DiskType: so.DiskType,
  235. DataCenter: "",
  236. Rack: "",
  237. DataNode: "",
  238. WritableVolumeCount: so.VolumeGrowthCount,
  239. }
  240. }
  241. return
  242. }